fix: add metric and extend logs for dual writer (#120146)

* fix: add metric and log

* fix: resource and method

* fix: registration

* fix: init with 0
This commit is contained in:
Mustafa Sencer Özcan
2026-03-12 18:48:11 +01:00
committed by GitHub
parent 25cb656e19
commit fe4c007f4d
7 changed files with 90 additions and 20 deletions

View File

@@ -593,7 +593,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
unifiedMigrator := migrations2.ProvideUnifiedMigrator(resourceClient, migrationRegistry)
unifiedStorageMigrationService := migrations2.ProvideUnifiedStorageMigrationService(unifiedMigrator, legacyDatabaseProvider, cfg, sqlStore, kvStore, resourceClient, migrationRegistry)
migrationStatusReader := migrations2.ProvideMigrationStatusReader(sqlStore, cfg, migrationRegistry)
dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader)
dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader, registerer)
if err != nil {
return nil, err
}
@@ -1286,7 +1286,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
unifiedMigrator := migrations2.ProvideUnifiedMigrator(resourceClient, migrationRegistry)
unifiedStorageMigrationService := migrations2.ProvideUnifiedStorageMigrationService(unifiedMigrator, legacyDatabaseProvider, cfg, sqlStore, kvStore, resourceClient, migrationRegistry)
migrationStatusReader := migrations2.ProvideMigrationStatusReader(sqlStore, cfg, migrationRegistry)
dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader)
dualwriteService, err := dualwrite.ProvideService(featureToggles, kvStore, cfg, unifiedStorageMigrationService, migrationStatusReader, registerer)
if err != nil {
return nil, err
}

View File

@@ -13,6 +13,7 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
@@ -42,6 +43,8 @@ type dualWriter struct {
unified grafanarest.Storage
readUnified bool
errorIsOK bool // in "mode1" we try writing both -- but don't block on unified write errors
gr schema.GroupResource
metrics *dualWriterMetrics
}
func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
@@ -51,7 +54,7 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp
attribute.Bool("readUnified", d.readUnified)))
defer span.End()
log := logging.FromContext(ctx).With("method", "Get", "name", name)
log := logging.FromContext(ctx).With("method", "Get", "name", name, "resource", d.gr.String())
// If we read from unified, we can just do that and return.
if d.readUnified {
return d.unified.Get(ctx, name, options)
@@ -69,6 +72,7 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp
defer cancel()
if _, err := d.unified.Get(ctxBg, name, options); err != nil {
log.Error("failed background GET to unified", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "GET").Inc()
}
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
return legacyGet, nil
@@ -93,7 +97,7 @@ func (d *dualWriter) List(ctx context.Context, options *metainternalversion.List
var (
legacyOptions = options.DeepCopy()
unifiedOptions = options.DeepCopy()
log = logging.FromContext(ctx).With("method", "List", "options", options)
log = logging.FromContext(ctx).With("method", "List", "options", options, "resource", d.gr.String())
)
legacyToken, unifiedToken, err := parseContinueTokens(options.Continue)
@@ -160,12 +164,14 @@ func (d *dualWriter) List(ctx context.Context, options *metainternalversion.List
unifiedList, err := d.unified.List(ctxBg, unifiedOptions)
if err != nil {
log.Error("failed background LIST to unified", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "LIST").Inc()
return
}
unifiedMeta, err := meta.ListAccessor(unifiedList)
if err != nil {
log.Error("failed background LIST to unified", "err",
fmt.Errorf("failed to access unified List MetaData: %w", err))
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "LIST").Inc()
}
out <- unifiedMeta.GetContinue()
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
@@ -209,7 +215,7 @@ func (d *dualWriter) Create(ctx context.Context, in runtime.Object, createValida
return d.unified.Create(ctx, in, createValidation, options)
}
log := logging.FromContext(ctx).With("method", "Create")
log := logging.FromContext(ctx).With("method", "Create", "resource", d.gr.String())
accIn, err := utils.MetaAccessor(in)
if err != nil {
@@ -302,6 +308,7 @@ func (d *dualWriter) Create(ctx context.Context, in runtime.Object, createValida
defer cancel()
if _, err := d.unified.Create(ctxBg, createdCopy, createValidation, options); err != nil {
log.With("objectInfo", objectInfo(createdCopy)).Error("failed to CREATE object in unified storage", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "CREATE").Inc()
}
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
} else {
@@ -347,7 +354,7 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r
return d.unified.Delete(ctx, name, deleteValidation, options)
}
log := logging.FromContext(ctx).With("method", "Delete", "name", name)
log := logging.FromContext(ctx).With("method", "Delete", "name", name, "resource", d.gr.String())
ctx = utils.SetFolderRemovePermissions(ctx, false)
objFromLegacy, asyncLegacy, err := d.legacy.Delete(ctx, name, deleteValidation, options)
@@ -373,6 +380,7 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r
defer cancel()
if _, _, err := d.unified.Delete(ctxBg, name, deleteValidation, options); err != nil && !apierrors.IsNotFound(err) {
log.Error("failed background DELETE in unified storage", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "DELETE").Inc()
}
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
return objFromLegacy, asyncLegacy, nil
@@ -406,7 +414,7 @@ func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.Updat
return d.unified.Update(ctx, name, dryRunInfo, createValidation, updateValidation, dryRunForceCreate, options)
}
log := logging.FromContext(ctx).With("method", "Update", "name", name)
log := logging.FromContext(ctx).With("method", "Update", "name", name, "resource", d.gr.String())
// update in legacy first, and then unistore. Will return a failure if either fails.
//
// we want to update in legacy first, otherwise if the update from unistore was successful,
@@ -463,6 +471,7 @@ func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.Updat
defer cancel()
if _, _, err := d.unified.Update(ctxBg, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil {
log.With("objectInfo", objectInfo(objFromLegacy)).Error("failed background UPDATE to unified storage", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "UPDATE").Inc()
}
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
return objFromLegacy, createdLegacy, nil
@@ -498,7 +507,7 @@ func (d *dualWriter) DeleteCollection(ctx context.Context, deleteValidation rest
return d.unified.DeleteCollection(ctx, deleteValidation, options, listOptions)
}
log := logging.FromContext(ctx).With("method", "DeleteCollection", "resourceVersion", listOptions.ResourceVersion)
log := logging.FromContext(ctx).With("method", "DeleteCollection", "resourceVersion", listOptions.ResourceVersion, "resource", d.gr.String())
// delete from legacy first, and anything that is successful can be deleted in unistore too.
//
@@ -521,6 +530,7 @@ func (d *dualWriter) DeleteCollection(ctx context.Context, deleteValidation rest
defer cancel()
if _, err := d.unified.DeleteCollection(ctxBg, deleteValidation, options, listOptions); err != nil {
log.With("objectInfo", objectInfo(deletedLegacy)).Error("failed background DELETE collection to unified storage", "err", err)
d.metrics.backgroundErrors.WithLabelValues(d.gr.String(), "DELETE_COLLECTION").Inc()
}
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
return deletedLegacy, nil

View File

@@ -0,0 +1,43 @@
package dualwrite
import (
"sync"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/prometheus/client_golang/prometheus"
)
var (
registerMetricsOnce sync.Once
backgroundErrorsMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "dualwriter_background_errors_total",
Help: "Total number of failed background operations in unified storage",
}, []string{"resource", "method"})
backgroundErrorMethods = []string{"GET", "LIST", "CREATE", "DELETE", "UPDATE", "DELETE_COLLECTION"}
)
type dualWriterMetrics struct {
backgroundErrors *prometheus.CounterVec
}
func provideDualWriterMetrics(reg prometheus.Registerer) *dualWriterMetrics {
registerMetricsOnce.Do(func() {
if reg != nil {
if err := reg.Register(backgroundErrorsMetric); err != nil {
log.New("dualwrite").Warn("failed to register dualwriter metrics", "error", err)
}
}
})
return &dualWriterMetrics{
backgroundErrors: backgroundErrorsMetric,
}
}
// initResource initializes all method counter combinations to zero for the given resource,
// so that the metric is distinguishable from "not scraped" (absent) vs "no errors" (zero).
func (m *dualWriterMetrics) initResource(resource string) {
for _, method := range backgroundErrorMethods {
m.backgroundErrors.WithLabelValues(resource, method).Add(0)
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -77,7 +78,7 @@ func TestRuntime_Create(t *testing.T) {
tt.setupStorageFn(us.Mock, tt.input)
}
m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader())
m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry())
require.NoError(t, err)
dw, err := m.NewStorage(kind, ls, us)
require.NoError(t, err)
@@ -150,7 +151,7 @@ func TestRuntime_Get(t *testing.T) {
tt.setupStorageFn(us.Mock, name)
}
m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader())
m, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry())
require.NoError(t, err)
dw, err := m.NewStorage(kind, ls, us)
require.NoError(t, err)
@@ -235,7 +236,7 @@ func TestRuntime_CreateWhileMigrating(t *testing.T) {
}
// Shared provider across all tests
dual, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader())
dual, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry())
require.NoError(t, err)
for _, tt := range tests {

View File

@@ -6,6 +6,7 @@ import (
"time"
gocache "github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
@@ -67,7 +68,7 @@ func ProvideStaticServiceForTests(cfg *setting.Cfg) Service {
if cfg == nil {
cfg = &setting.Cfg{}
}
return &staticService{cfg: cfg}
return &staticService{cfg: cfg, metrics: provideDualWriterMetrics(prometheus.NewRegistry())}
}
func ProvideService(
@@ -76,6 +77,7 @@ func ProvideService(
cfg *setting.Cfg,
migrator unifiedmigrations.UnifiedStorageMigrationService,
statusReader unifiedmigrations.MigrationStatusReader,
reg prometheus.Registerer,
) (Service, error) {
// Ensure migrations have run before starting dualwrite
err := migrator.Run(context.Background())
@@ -87,9 +89,11 @@ func ProvideService(
enabled := features.IsEnabledGlobally(featuremgmt.FlagManagedDualWriter) ||
features.IsEnabledGlobally(featuremgmt.FlagProvisioning) // required for git provisioning
metrics := provideDualWriterMetrics(reg)
if cfg != nil {
if !enabled {
return &staticService{cfg: cfg, statusReader: statusReader}, nil
return &staticService{cfg: cfg, statusReader: statusReader, metrics: metrics}, nil
}
foldersMode := cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
@@ -97,7 +101,7 @@ func ProvideService(
// If both are fully on unified (Mode5), the dynamic service is not needed.
if foldersMode == rest.Mode5 && dashboardsMode == rest.Mode5 {
return &staticService{cfg: cfg, statusReader: statusReader}, nil
return &staticService{cfg: cfg, statusReader: statusReader, metrics: metrics}, nil
}
if (foldersMode >= rest.Mode4 || dashboardsMode >= rest.Mode4) && foldersMode != dashboardsMode {
@@ -120,6 +124,7 @@ func ProvideService(
enabled: enabled,
statusReader: statusReader,
resourceModesCache: gocache.New(cacheTTL, cacheCleanup),
metrics: metrics,
}, nil
}
@@ -128,6 +133,7 @@ type service struct {
enabled bool
statusReader unifiedmigrations.MigrationStatusReader
resourceModesCache *gocache.Cache
metrics *dualWriterMetrics
}
// getStorageMode returns the cached StorageMode for a non-managed resource.
@@ -154,12 +160,13 @@ func (m *service) NewStorage(gr schema.GroupResource, legacy rest.Storage, unifi
}
if m.enabled && status.Runtime {
m.metrics.initResource(gr.String())
// Dynamic storage behavior
return &runtimeDualWriter{
service: m,
legacy: legacy,
unified: unified,
dualwrite: &dualWriter{legacy: legacy, unified: unified}, // not used for read
dualwrite: &dualWriter{legacy: legacy, unified: unified, gr: gr, metrics: m.metrics}, // not used for read
gr: gr,
}, nil
}
@@ -170,7 +177,8 @@ func (m *service) NewStorage(gr schema.GroupResource, legacy rest.Storage, unifi
case unifiedmigrations.StorageModeUnified:
return unified, nil
case unifiedmigrations.StorageModeDualWrite:
return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true}, nil
m.metrics.initResource(gr.String())
return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true, gr: gr, metrics: m.metrics}, nil
default:
return legacy, nil
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -18,7 +19,7 @@ import (
func TestService(t *testing.T) {
t.Run("dynamic", func(t *testing.T) {
ctx := context.Background()
mode, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagProvisioning), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader())
mode, err := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagProvisioning), kvstore.NewFakeKVStore(), NewFakeConfig(), NewFakeMigrator(), NewFakeMigrationStatusReader(), prometheus.NewRegistry())
require.NoError(t, err)
// Use a managed resource so KV-based Status path is exercised.
@@ -140,6 +141,7 @@ func TestService(t *testing.T) {
NewFakeConfig(),
NewFakeMigrator(),
statusReader,
prometheus.NewRegistry(),
)
require.NoError(t, err)
@@ -180,6 +182,7 @@ func TestService(t *testing.T) {
NewFakeConfig(),
NewFakeMigrator(),
statusReader,
prometheus.NewRegistry(),
)
require.NoError(t, err)
@@ -217,6 +220,7 @@ func TestService(t *testing.T) {
NewFakeConfig(),
NewFakeMigrator(),
statusReader,
prometheus.NewRegistry(),
)
require.NoError(t, err)
@@ -241,6 +245,7 @@ func TestService(t *testing.T) {
NewFakeConfig(),
NewFakeMigrator(),
reader,
prometheus.NewRegistry(),
)
require.NoError(t, err)
@@ -339,7 +344,7 @@ func TestService(t *testing.T) {
"dashboards.dashboard.grafana.app", storageModeFromConfigMode(tc.cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode),
"folders.folder.grafana.app", storageModeFromConfigMode(tc.cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode),
)
svc, err := ProvideService(tc.flags, kvstore.NewFakeKVStore(), &tc.cfg, NewFakeMigrator(), statusReader)
svc, err := ProvideService(tc.flags, kvstore.NewFakeKVStore(), &tc.cfg, NewFakeMigrator(), statusReader, prometheus.NewRegistry())
if tc.error != "" {
require.ErrorContains(t, err, tc.error)
require.Nil(t, svc, "expect a nil service when an error exts")

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
@@ -20,7 +21,7 @@ func NewStaticStorage(
legacy rest.Storage,
unified rest.Storage,
) (rest.Storage, error) {
m := &staticService{}
m := &staticService{metrics: provideDualWriterMetrics(prometheus.NewRegistry())}
m.SetMode(gr, mode)
return m.NewStorage(gr, legacy, unified)
}
@@ -28,6 +29,7 @@ func NewStaticStorage(
type staticService struct {
cfg *setting.Cfg
statusReader unifiedmigrations.MigrationStatusReader
metrics *dualWriterMetrics
// resourceModesCache holds the resolved StorageMode per resource
resourceModesCache sync.Map // map[string]unifiedmigrations.StorageMode
@@ -79,7 +81,8 @@ func (m *staticService) NewStorage(gr schema.GroupResource, legacy rest.Storage,
case unifiedmigrations.StorageModeUnified:
return unified, nil
case unifiedmigrations.StorageModeDualWrite:
return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true}, nil
m.metrics.initResource(gr.String())
return &dualWriter{legacy: legacy, unified: unified, errorIsOK: true, gr: gr, metrics: m.metrics}, nil
default:
return legacy, nil
}