diff --git a/pkg/server/wire_gen.go b/pkg/server/wire_gen.go index ad232c40736..8a4ee16741d 100644 --- a/pkg/server/wire_gen.go +++ b/pkg/server/wire_gen.go @@ -593,7 +593,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api unifiedMigrator := migrations2.ProvideUnifiedMigrator(resourceClient, migrationRegistry) unifiedStorageMigrationService := migrations2.ProvideUnifiedStorageMigrationService(unifiedMigrator, legacyDatabaseProvider, cfg, sqlStore, kvStore, resourceClient, migrationRegistry) migrationStatusReader := migrations2.ProvideMigrationStatusReader(sqlStore, cfg, migrationRegistry) - dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader) + dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader, registerer) if err != nil { return nil, err } @@ -1286,7 +1286,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac unifiedMigrator := migrations2.ProvideUnifiedMigrator(resourceClient, migrationRegistry) unifiedStorageMigrationService := migrations2.ProvideUnifiedStorageMigrationService(unifiedMigrator, legacyDatabaseProvider, cfg, sqlStore, kvStore, resourceClient, migrationRegistry) migrationStatusReader := migrations2.ProvideMigrationStatusReader(sqlStore, cfg, migrationRegistry) - dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader) + dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader, registerer) if err != nil { return nil, err } diff --git a/pkg/storage/legacysql/dualwrite/dualwriter.go b/pkg/storage/legacysql/dualwrite/dualwriter.go index bd283218253..b9a0c9e8081 100644 --- a/pkg/storage/legacysql/dualwrite/dualwriter.go +++ b/pkg/storage/legacysql/dualwrite/dualwriter.go @@ -13,6 +13,7 @@ import ( metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/util/dryrun" @@ -42,6 +43,8 @@ type dualWriter struct { unified grafanarest.Storage readUnified bool errorIsOK bool // in "mode1" we try writing both -- but don't block on unified write errors + gr schema.GroupResource + metrics *dualWriterMetrics } func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { @@ -51,7 +54,7 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp attribute.Bool("readUnified", d.readUnified))) defer span.End() - log := logging.FromContext(ctx).With("method", "Get", "name", name) + log := logging.FromContext(ctx).With("method", "Get", "name", name, "resource", d.gr.String()) // If we read from unified, we can just do that and return. if d.readUnified { return d.unified.Get(ctx, name, options) @@ -69,6 +72,7 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp defer cancel() if _, err := d.unified.Get(ctxBg, name, options); err != nil { log.Error("failed background GET to unified", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "GET").Inc() } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) return legacyGet, nil @@ -93,7 +97,7 @@ func (d *dualWriter) List(ctx context.Context, options *metainternalversion.List var ( legacyOptions = options.DeepCopy() unifiedOptions = options.DeepCopy() - log = logging.FromContext(ctx).With("method", "List", "options", options) + log = logging.FromContext(ctx).With("method", "List", "options", options, "resource", d.gr.String()) ) legacyToken, unifiedToken, err := parseContinueTokens(options.Continue) @@ -160,12 +164,14 @@ func (d *dualWriter) List(ctx context.Context, options *metainternalversion.List unifiedList, err := d.unified.List(ctxBg, unifiedOptions) if err != nil { log.Error("failed background LIST to unified", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "LIST").Inc() return } unifiedMeta, err := meta.ListAccessor(unifiedList) if err != nil { log.Error("failed background LIST to unified", "err", fmt.Errorf("failed to access unified List MetaData: %w", err)) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "LIST").Inc() } out <- unifiedMeta.GetContinue() }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) @@ -209,7 +215,7 @@ func (d *dualWriter) Create(ctx context.Context, in runtime.Object, createValida return d.unified.Create(ctx, in, createValidation, options) } - log := logging.FromContext(ctx).With("method", "Create") + log := logging.FromContext(ctx).With("method", "Create", "resource", d.gr.String()) accIn, err := utils.MetaAccessor(in) if err != nil { @@ -302,6 +308,7 @@ func (d *dualWriter) Create(ctx context.Context, in runtime.Object, createValida defer cancel() if _, err := d.unified.Create(ctxBg, createdCopy, createValidation, options); err != nil { log.With("objectInfo", objectInfo(createdCopy)).Error("failed to CREATE object in unified storage", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "CREATE").Inc() } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) } else { @@ -347,7 +354,7 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r return d.unified.Delete(ctx, name, deleteValidation, options) } - log := logging.FromContext(ctx).With("method", "Delete", "name", name) + log := logging.FromContext(ctx).With("method", "Delete", "name", name, "resource", d.gr.String()) ctx = utils.SetFolderRemovePermissions(ctx, false) objFromLegacy, asyncLegacy, err := d.legacy.Delete(ctx, name, deleteValidation, options) @@ -373,6 +380,7 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r defer cancel() if _, _, err := d.unified.Delete(ctxBg, name, deleteValidation, options); err != nil && !apierrors.IsNotFound(err) { log.Error("failed background DELETE in unified storage", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "DELETE").Inc() } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) return objFromLegacy, asyncLegacy, nil @@ -406,7 +414,7 @@ func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.Updat return d.unified.Update(ctx, name, dryRunInfo, createValidation, updateValidation, dryRunForceCreate, options) } - log := logging.FromContext(ctx).With("method", "Update", "name", name) + log := logging.FromContext(ctx).With("method", "Update", "name", name, "resource", d.gr.String()) // update in legacy first, and then unistore. Will return a failure if either fails. // // we want to update in legacy first, otherwise if the update from unistore was successful, @@ -463,6 +471,7 @@ func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.Updat defer cancel() if _, _, err := d.unified.Update(ctxBg, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil { log.With("objectInfo", objectInfo(objFromLegacy)).Error("failed background UPDATE to unified storage", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "UPDATE").Inc() } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) return objFromLegacy, createdLegacy, nil @@ -498,7 +507,7 @@ func (d *dualWriter) DeleteCollection(ctx context.Context, deleteValidation rest return d.unified.DeleteCollection(ctx, deleteValidation, options, listOptions) } - log := logging.FromContext(ctx).With("method", "DeleteCollection", "resourceVersion", listOptions.ResourceVersion) + log := logging.FromContext(ctx).With("method", "DeleteCollection", "resourceVersion", listOptions.ResourceVersion, "resource", d.gr.String()) // delete from legacy first, and anything that is successful can be deleted in unistore too. // @@ -521,6 +530,7 @@ func (d *dualWriter) DeleteCollection(ctx context.Context, deleteValidation rest defer cancel() if _, err := d.unified.DeleteCollection(ctxBg, deleteValidation, options, listOptions); err != nil { log.With("objectInfo", objectInfo(deletedLegacy)).Error("failed background DELETE collection to unified storage", "err", err) + d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "DELETE_COLLECTION").Inc() } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) return deletedLegacy, nil diff --git a/pkg/storage/legacysql/dualwrite/metrics.go b/pkg/storage/legacysql/dualwrite/metrics.go new file mode 100644 index 00000000000..dc0e9e54c41 --- /dev/null +++ b/pkg/storage/legacysql/dualwrite/metrics.go @@ -0,0 +1,43 @@ +package dualwrite + +import ( + "sync" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + registerMetricsOnce sync.Once + backgroundErrorsMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "dualwriter_background_errors_total", + Help: "Total number of failed background operations in unified storage", + }, []string{"resource", "method"}) + + backgroundErrorMethods = []string{"GET", "LIST", "CREATE", "DELETE", "UPDATE", "DELETE_COLLECTION"} +) + +type dualWriterMetrics struct { + backgroundErrors *prometheus.CounterVec +} + +func provideDualWriterMetrics(reg prometheus.Registerer) *dualWriterMetrics { + registerMetricsOnce.Do(func() { + if reg != nil { + if err := reg.Register(backgroundErrorsMetric); err != nil { + log.New("dualwrite").Warn("failed to register dualwriter metrics", "error", err) + } + } + }) + return &dualWriterMetrics{ + backgroundErrors: backgroundErrorsMetric, + } +} + +// initResource initializes all method counter combinations to zero for the given resource, +// so that the metric is distinguishable from "not scraped" (absent) vs "no errors" (zero). +func (m *dualWriterMetrics) initResource(resource string) { + for _, method := range backgroundErrorMethods { + m.backgroundErrors.WithLabelValues(resource, method).Add(0) + } +} diff --git a/pkg/storage/legacysql/dualwrite/runtime_test.go b/pkg/storage/legacysql/dualwrite/runtime_test.go index 590f2b99355..4ec9dbce94a 100644 --- a/pkg/storage/legacysql/dualwrite/runtime_test.go +++ b/pkg/storage/legacysql/dualwrite/runtime_test.go @@ -5,6 +5,7 @@ import ( "errors" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -77,7 +78,7 @@ func TestRuntime_Create(t *testing.T) { tt.setupStorageFn(us.Mock, tt.input) } - m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader()) + m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry()) require.NoError(t, err) dw, err := m.NewStorage(kind, ls, us) require.NoError(t, err) @@ -150,7 +151,7 @@ func TestRuntime_Get(t *testing.T) { tt.setupStorageFn(us.Mock, name) } - m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader()) + m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry()) require.NoError(t, err) dw, err := m.NewStorage(kind, ls, us) require.NoError(t, err) @@ -235,7 +236,7 @@ func TestRuntime_CreateWhileMigrating(t *testing.T) { } // Shared provider across all tests - dual, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader()) + dual, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry()) require.NoError(t, err) for _, tt := range tests { diff --git a/pkg/storage/legacysql/dualwrite/service.go b/pkg/storage/legacysql/dualwrite/service.go index 6886057835a..53dea8f6a56 100644 --- a/pkg/storage/legacysql/dualwrite/service.go +++ b/pkg/storage/legacysql/dualwrite/service.go @@ -6,6 +6,7 @@ import ( "time" gocache "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/grafana/grafana-app-sdk/logging" @@ -67,7 +68,7 @@ func ProvideStaticServiceForTests(cfg *setting.Cfg) Service { if cfg == nil { cfg = &setting.Cfg{} } - return &staticService{cfg: cfg} + return &staticService{cfg: cfg, metrics: provideDualWriterMetrics(prometheus.NewRegistry())} } func ProvideService( @@ -76,6 +77,7 @@ func ProvideService( cfg *setting.Cfg, migrator unifiedmigrations.UnifiedStorageMigrationService, statusReader unifiedmigrations.MigrationStatusReader, + reg prometheus.Registerer, ) (Service, error) { // Ensure migrations have run before starting dualwrite err := migrator.Run(context.Background()) @@ -87,9 +89,11 @@ func ProvideService( enabled := features.IsEnabledGlobally(featuremgmt.FlagManagedDualWriter) || features.IsEnabledGlobally(featuremgmt.FlagProvisioning) // required for git provisioning + metrics := provideDualWriterMetrics(reg) + if cfg != nil { if !enabled { - return &staticService{cfg: cfg, statusReader: statusReader}, nil + return &staticService{cfg: cfg, statusReader: statusReader, metrics: metrics}, nil } foldersMode := cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode @@ -97,7 +101,7 @@ func ProvideService( // If both are fully on unified (Mode5), the dynamic service is not needed. if foldersMode == rest.Mode5 && dashboardsMode == rest.Mode5 { - return &staticService{cfg: cfg, statusReader: statusReader}, nil + return &staticService{cfg: cfg, statusReader: statusReader, metrics: metrics}, nil } if (foldersMode >= rest.Mode4 || dashboardsMode >= rest.Mode4) && foldersMode != dashboardsMode { @@ -120,6 +124,7 @@ func ProvideService( enabled: enabled, statusReader: statusReader, resourceModesCache: gocache.New(cacheTTL, cacheCleanup), + metrics: metrics, }, nil } @@ -128,6 +133,7 @@ type service struct { enabled bool statusReader unifiedmigrations.MigrationStatusReader resourceModesCache *gocache.Cache + metrics *dualWriterMetrics } // getStorageMode returns the cached StorageMode for a non-managed resource. @@ -154,12 +160,13 @@ func (m *service) NewStorage(gr schema.GroupResource, legacy rest.Storage, unifi } if m.enabled && status.Runtime { + m.metrics.initResource(gr.String()) // Dynamic storage behavior return &runtimeDualWriter{ service: m, legacy: legacy, unified: unified, - dualwrite: &dualWriter{legacy: legacy, unified: unified}, // not used for read + dualwrite: &dualWriter{legacy: legacy, unified: unified, gr: gr, metrics: m.metrics}, // not used for read gr: gr, }, nil } @@ -170,7 +177,8 @@ func (m *service) NewStorage(gr schema.GroupResource, legacy rest.Storage, unifi case unifiedmigrations.StorageModeUnified: return unified, nil case unifiedmigrations.StorageModeDualWrite: - return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true}, nil + m.metrics.initResource(gr.String()) + return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true, gr: gr, metrics: m.metrics}, nil default: return legacy, nil } diff --git a/pkg/storage/legacysql/dualwrite/service_test.go b/pkg/storage/legacysql/dualwrite/service_test.go index 513dcbabe60..39ceae9dff5 100644 --- a/pkg/storage/legacysql/dualwrite/service_test.go +++ b/pkg/storage/legacysql/dualwrite/service_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime/schema" @@ -18,7 +19,7 @@ import ( func TestService(t *testing.T) { t.Run("dynamic", func(t *testing.T) { ctx := context.Background() - mode, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagProvisioning), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader()) + mode, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagProvisioning), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry()) require.NoError(t, err) // Use a managed resource so KV-based Status path is exercised. @@ -140,6 +141,7 @@ func TestService(t *testing.T) { NewFakeConfig(), NewFakeMigrator(), statusReader, + prometheus.NewRegistry(), ) require.NoError(t, err) @@ -180,6 +182,7 @@ func TestService(t *testing.T) { NewFakeConfig(), NewFakeMigrator(), statusReader, + prometheus.NewRegistry(), ) require.NoError(t, err) @@ -217,6 +220,7 @@ func TestService(t *testing.T) { NewFakeConfig(), NewFakeMigrator(), statusReader, + prometheus.NewRegistry(), ) require.NoError(t, err) @@ -241,6 +245,7 @@ func TestService(t *testing.T) { NewFakeConfig(), NewFakeMigrator(), reader, + prometheus.NewRegistry(), ) require.NoError(t, err) @@ -339,7 +344,7 @@ func TestService(t *testing.T) { "dashboards.dashboard.grafana.app", storageModeFromConfigMode(tc.cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode), "folders.folder.grafana.app", storageModeFromConfigMode(tc.cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode), ) - svc, err := ProvideService(tc.flags, kvstore.NewFakeKVStore(), &tc.cfg, NewFakeMigrator(), statusReader) + svc, err := ProvideService(tc.flags, kvstore.NewFakeKVStore(), &tc.cfg, NewFakeMigrator(), statusReader, prometheus.NewRegistry()) if tc.error != "" { require.ErrorContains(t, err, tc.error) require.Nil(t, svc, "expect a nil service when an error exts") diff --git a/pkg/storage/legacysql/dualwrite/static.go b/pkg/storage/legacysql/dualwrite/static.go index 0d17074af37..bc55ff80903 100644 --- a/pkg/storage/legacysql/dualwrite/static.go +++ b/pkg/storage/legacysql/dualwrite/static.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/grafana/grafana-app-sdk/logging" @@ -20,7 +21,7 @@ func NewStaticStorage( legacy rest.Storage, unified rest.Storage, ) (rest.Storage, error) { - m := &staticService{} + m := &staticService{metrics: provideDualWriterMetrics(prometheus.NewRegistry())} m.SetMode(gr, mode) return m.NewStorage(gr, legacy, unified) } @@ -28,6 +29,7 @@ func NewStaticStorage( type staticService struct { cfg *setting.Cfg statusReader unifiedmigrations.MigrationStatusReader + metrics *dualWriterMetrics // resourceModesCache holds the resolved StorageMode per resource resourceModesCache sync.Map // map[string]unifiedmigrations.StorageMode @@ -79,7 +81,8 @@ func (m *staticService) NewStorage(gr schema.GroupResource, legacy rest.Storage, case unifiedmigrations.StorageModeUnified: return unified, nil case unifiedmigrations.StorageModeDualWrite: - return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true}, nil + m.metrics.initResource(gr.String()) + return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true, gr: gr, metrics: m.metrics}, nil default: return legacy, nil }