fix (unified-storage): stop registering unified storage metrics in global state (#101322)

* move prometheus.register for unified storage metrics into metrics.go and do most of the plumbing to get it to work

* convert StorageApiMetrics to pointer and check for nil before using it

* rename type and variables to something more sensible

---------

Co-authored-by: Jean-Philippe Quéméner <jeanphilippe.quemener@grafana.com>
This commit is contained in:
Will Assis
2025-02-28 09:39:39 -03:00
committed by GitHub
parent 7fb0d1b3e6
commit f5e5824bab
15 changed files with 108 additions and 106 deletions

View File

@ -501,7 +501,6 @@ github.com/elastic/go-sysinfo v1.11.2/go.mod h1:GKqR8bbMK/1ITnez9NIsIfXQr25aLhRJ
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/elazarl/goproxy v1.3.0/go.mod h1:X/5W/t+gzDyLfHW4DrMdpjqYjpXsURlBt9lpBDxZZZQ= github.com/elazarl/goproxy v1.3.0/go.mod h1:X/5W/t+gzDyLfHW4DrMdpjqYjpXsURlBt9lpBDxZZZQ=
github.com/elazarl/goproxy v1.7.1 h1:1P7LPSxbqtNxusFnXclj6O56pjfq1xOQZ6a0mwwKUlY=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633 h1:H2pdYOb3KQ1/YsqVWoWNLQO+fusocsw354rqGTZtAgw= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633 h1:H2pdYOb3KQ1/YsqVWoWNLQO+fusocsw354rqGTZtAgw=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
@ -585,7 +584,6 @@ github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs0
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
github.com/google/go-jsonnet v0.18.0 h1:/6pTy6g+Jh1a1I2UMoAODkqELFiVIdOxbNwv0DDzoOg= github.com/google/go-jsonnet v0.18.0 h1:/6pTy6g+Jh1a1I2UMoAODkqELFiVIdOxbNwv0DDzoOg=
github.com/google/go-jsonnet v0.18.0/go.mod h1:C3fTzyVJDslXdiTqw/bTFk7vSGyCtH3MGRbDfvEwGd0= github.com/google/go-jsonnet v0.18.0/go.mod h1:C3fTzyVJDslXdiTqw/bTFk7vSGyCtH3MGRbDfvEwGd0=
@ -629,6 +627,7 @@ github.com/grafana/grafana/apps/investigation v0.0.0-20250121113133-e747350fee2d
github.com/grafana/grafana/apps/playlist v0.0.0-20250121113133-e747350fee2d/go.mod h1:DjJe5osrW/BKrzN9hAAOSElNWutj1bcriExa7iDP7kA= github.com/grafana/grafana/apps/playlist v0.0.0-20250121113133-e747350fee2d/go.mod h1:DjJe5osrW/BKrzN9hAAOSElNWutj1bcriExa7iDP7kA=
github.com/grafana/grafana/pkg/aggregator v0.0.0-20250121113133-e747350fee2d/go.mod h1:1sq0guad+G4SUTlBgx7SXfhnzy7D86K/LcVOtiQCiMA= github.com/grafana/grafana/pkg/aggregator v0.0.0-20250121113133-e747350fee2d/go.mod h1:1sq0guad+G4SUTlBgx7SXfhnzy7D86K/LcVOtiQCiMA=
github.com/grafana/grafana/pkg/build v0.0.0-20250220114259-be81314e2118/go.mod h1:STVpVboMYeBAfyn6Zw6XHhTHqUxzMy7pzRiVgk1l0W0= github.com/grafana/grafana/pkg/build v0.0.0-20250220114259-be81314e2118/go.mod h1:STVpVboMYeBAfyn6Zw6XHhTHqUxzMy7pzRiVgk1l0W0=
github.com/grafana/grafana/pkg/build v0.0.0-20250227163402-d78c646f93bb/go.mod h1:Vw0LdoMma64VgIMVpRY3i0D156jddgUGjTQBOcyeF3k=
github.com/grafana/grafana/pkg/semconv v0.0.0-20250121113133-e747350fee2d/go.mod h1:tfLnBpPYgwrBMRz4EXqPCZJyCjEG4Ev37FSlXnocJ2c= github.com/grafana/grafana/pkg/semconv v0.0.0-20250121113133-e747350fee2d/go.mod h1:tfLnBpPYgwrBMRz4EXqPCZJyCjEG4Ev37FSlXnocJ2c=
github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250121113133-e747350fee2d/go.mod h1:CXpwZ3Mkw6xVlGKc0SqUxqXCP3Uv182q6qAQnLaLxRg= github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250121113133-e747350fee2d/go.mod h1:CXpwZ3Mkw6xVlGKc0SqUxqXCP3Uv182q6qAQnLaLxRg=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU=
@ -1187,7 +1186,6 @@ google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=

View File

@ -196,7 +196,7 @@ func newUnifiedClient(cfg *setting.Cfg, sqlStore db.DB) (resource.ResourceClient
Reg: prometheus.NewPedanticRegistry(), Reg: prometheus.NewPedanticRegistry(),
Authzc: authlib.FixedAccessClient(true), // always true! Authzc: authlib.FixedAccessClient(true), // always true!
Docs: nil, // document supplier Docs: nil, // document supplier
}) }, nil)
} }
func newParquetClient(file *os.File) (resource.BulkStoreClient, error) { func newParquetClient(file *os.File) (resource.BulkStoreClient, error) {

View File

@ -16,13 +16,14 @@ import (
"github.com/grafana/grafana/pkg/services/authz" "github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/storage/unified/sql"
) )
// NewModule returns an instance of a ModuleServer, responsible for managing // NewModule returns an instance of a ModuleServer, responsible for managing
// dskit modules (services). // dskit modules (services).
func NewModule(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg) (*ModuleServer, error) { func NewModule(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg, storageMetrics *resource.StorageMetrics) (*ModuleServer, error) {
s, err := newModuleServer(opts, apiOpts, features, cfg) s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -34,7 +35,7 @@ func NewModule(opts Options, apiOpts api.ServerOptions, features featuremgmt.Fea
return s, nil return s, nil
} }
func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg) (*ModuleServer, error) { func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg, storageMetrics *resource.StorageMetrics) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background()) rootCtx, shutdownFn := context.WithCancel(context.Background())
s := &ModuleServer{ s := &ModuleServer{
@ -50,6 +51,7 @@ func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremg
version: opts.Version, version: opts.Version,
commit: opts.Commit, commit: opts.Commit,
buildBranch: opts.BuildBranch, buildBranch: opts.BuildBranch,
storageMetrics: storageMetrics,
} }
return s, nil return s, nil
@ -71,6 +73,7 @@ type ModuleServer struct {
shutdownFinished chan struct{} shutdownFinished chan struct{}
isInitialized bool isInitialized bool
mtx sync.Mutex mtx sync.Mutex
storageMetrics *resource.StorageMetrics
pidFile string pidFile string
version string version string
@ -135,7 +138,7 @@ func (s *ModuleServer) Run() error {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, nil, docBuilders) return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, nil, docBuilders, s.storageMetrics)
}) })
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {

View File

@ -400,6 +400,8 @@ var wireBasicSet = wire.NewSet(
connectors.ProvideOrgRoleMapper, connectors.ProvideOrgRoleMapper,
wire.Bind(new(user.Verifier), new(*userimpl.Verifier)), wire.Bind(new(user.Verifier), new(*userimpl.Verifier)),
authz.WireSet, authz.WireSet,
// Unified storage
resource.ProvideStorageMetrics,
// Kubernetes API server // Kubernetes API server
grafanaapiserver.WireSet, grafanaapiserver.WireSet,
apiregistry.WireSet, apiregistry.WireSet,

View File

@ -52,6 +52,7 @@ import (
"github.com/grafana/grafana/pkg/services/validations" "github.com/grafana/grafana/pkg/services/validations"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified" "github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resource"
search2 "github.com/grafana/grafana/pkg/storage/unified/search" search2 "github.com/grafana/grafana/pkg/storage/unified/search"
) )
@ -155,6 +156,8 @@ var wireExtsBaseCLISet = wire.NewSet(
var wireExtsModuleServerSet = wire.NewSet( var wireExtsModuleServerSet = wire.NewSet(
NewModule, NewModule,
wireExtsBaseCLISet, wireExtsBaseCLISet,
// Unified storage
resource.ProvideStorageMetrics,
) )
var wireExtsStandaloneAPIServerSet = wire.NewSet( var wireExtsStandaloneAPIServerSet = wire.NewSet(

View File

@ -51,7 +51,7 @@ type clientMetrics struct {
} }
// This adds a UnifiedStorage client into the wire dependency tree // This adds a UnifiedStorage client into the wire dependency tree
func ProvideUnifiedStorageClient(opts *Options) (resource.ResourceClient, error) { func ProvideUnifiedStorageClient(opts *Options, storageMetrics *resource.StorageMetrics) (resource.ResourceClient, error) {
// See: apiserver.ApplyGrafanaConfig(cfg, features, o) // See: apiserver.ApplyGrafanaConfig(cfg, features, o)
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver") apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
client, err := newClient(options.StorageOptions{ client, err := newClient(options.StorageOptions{
@ -59,7 +59,7 @@ func ProvideUnifiedStorageClient(opts *Options) (resource.ResourceClient, error)
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")), DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
Address: apiserverCfg.Key("address").MustString(""), // client address Address: apiserverCfg.Key("address").MustString(""), // client address
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""), BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs) }, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics)
if err == nil { if err == nil {
// Used to get the folder stats // Used to get the folder stats
client = federated.NewFederatedClient( client = federated.NewFederatedClient(
@ -79,6 +79,7 @@ func newClient(opts options.StorageOptions,
reg prometheus.Registerer, reg prometheus.Registerer,
authzc types.AccessClient, authzc types.AccessClient,
docs resource.DocumentBuilderSupplier, docs resource.DocumentBuilderSupplier,
storageMetrics *resource.StorageMetrics,
) (resource.ResourceClient, error) { ) (resource.ResourceClient, error) {
ctx := context.Background() ctx := context.Background()
switch opts.StorageType { switch opts.StorageType {
@ -134,7 +135,7 @@ func newClient(opts options.StorageOptions,
if err != nil { if err != nil {
return nil, err return nil, err
} }
server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions) server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,56 +1,37 @@
package resource package resource
import ( import (
"sync"
"time" "time"
"github.com/grafana/dskit/instrument" "github.com/grafana/dskit/instrument"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
var ( type StorageMetrics struct {
once sync.Once
StorageServerMetrics *StorageApiMetrics
)
type StorageApiMetrics struct {
WatchEventLatency *prometheus.HistogramVec WatchEventLatency *prometheus.HistogramVec
PollerLatency prometheus.Histogram PollerLatency prometheus.Histogram
} }
func NewStorageMetrics() *StorageApiMetrics { func ProvideStorageMetrics(reg prometheus.Registerer) *StorageMetrics {
once.Do(func() { return &StorageMetrics{
StorageServerMetrics = &StorageApiMetrics{ WatchEventLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
WatchEventLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "storage_server",
Namespace: "storage_server", Name: "watch_latency_seconds",
Name: "watch_latency_seconds", Help: "Time (in seconds) spent waiting for watch events to be sent",
Help: "Time (in seconds) spent waiting for watch events to be sent", Buckets: instrument.DefBuckets,
Buckets: instrument.DefBuckets, NativeHistogramBucketFactor: 1.1, // enable native histograms
NativeHistogramBucketFactor: 1.1, // enable native histograms NativeHistogramMaxBucketNumber: 160,
NativeHistogramMaxBucketNumber: 160, NativeHistogramMinResetDuration: time.Hour,
NativeHistogramMinResetDuration: time.Hour, }, []string{"resource"}),
}, []string{"resource"}), PollerLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
PollerLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "storage_server",
Namespace: "storage_server", Name: "poller_query_latency_seconds",
Name: "poller_query_latency_seconds", Help: "poller query latency",
Help: "poller query latency", Buckets: instrument.DefBuckets,
Buckets: instrument.DefBuckets, NativeHistogramBucketFactor: 1.1, // enable native histograms
NativeHistogramBucketFactor: 1.1, // enable native histograms NativeHistogramMaxBucketNumber: 160,
NativeHistogramMaxBucketNumber: 160, NativeHistogramMinResetDuration: time.Hour,
NativeHistogramMinResetDuration: time.Hour, }),
}), }
}
})
return StorageServerMetrics
}
func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) {
s.WatchEventLatency.Collect(ch)
s.PollerLatency.Collect(ch)
}
func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) {
s.WatchEventLatency.Describe(ch)
s.PollerLatency.Describe(ch)
} }

View File

@ -182,6 +182,8 @@ type ResourceServerOptions struct {
// Registerer to register prometheus Metrics for the Resource server // Registerer to register prometheus Metrics for the Resource server
Reg prometheus.Registerer Reg prometheus.Registerer
storageMetrics *StorageMetrics
} }
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
@ -230,25 +232,22 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
} }
logger := slog.Default().With("logger", "resource-server") logger := slog.Default().With("logger", "resource-server")
// register metrics
if err := prometheus.Register(NewStorageMetrics()); err != nil {
logger.Warn("failed to register storage metrics", "error", err)
}
// Make this cancelable // Make this cancelable
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s := &server{ s := &server{
tracer: opts.Tracer, tracer: opts.Tracer,
log: logger, log: logger,
backend: opts.Backend, backend: opts.Backend,
blob: blobstore, blob: blobstore,
diagnostics: opts.Diagnostics, diagnostics: opts.Diagnostics,
access: opts.AccessClient, access: opts.AccessClient,
writeHooks: opts.WriteHooks, writeHooks: opts.WriteHooks,
lifecycle: opts.Lifecycle, lifecycle: opts.Lifecycle,
now: opts.Now, now: opts.Now,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
storageMetrics: opts.storageMetrics,
} }
if opts.Search.Resources != nil { if opts.Search.Resources != nil {
@ -271,17 +270,18 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
var _ ResourceServer = &server{} var _ ResourceServer = &server{}
type server struct { type server struct {
tracer trace.Tracer tracer trace.Tracer
log *slog.Logger log *slog.Logger
backend StorageBackend backend StorageBackend
blob BlobSupport blob BlobSupport
search *searchSupport search *searchSupport
diagnostics DiagnosticsServer diagnostics DiagnosticsServer
access claims.AccessClient access claims.AccessClient
writeHooks WriteAccessHooks writeHooks WriteAccessHooks
lifecycle LifecycleHooks lifecycle LifecycleHooks
now func() int64 now func() int64
mostRecentRV atomic.Int64 // The most recent resource version seen by the server mostRecentRV atomic.Int64 // The most recent resource version seen by the server
storageMetrics *StorageMetrics
// Background watch task -- this has permissions for everything // Background watch task -- this has permissions for everything
ctx context.Context ctx context.Context
@ -1073,10 +1073,12 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
return err return err
} }
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds if s.storageMetrics != nil {
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6 // record latency - resource version is a unix timestamp in microseconds so we convert to seconds
if latencySeconds > 0 { latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
StorageServerMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds) if latencySeconds > 0 {
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
}
} }
} }
} }

View File

@ -40,6 +40,7 @@ type BackendOptions struct {
PollingInterval time.Duration PollingInterval time.Duration
WatchBufferSize int WatchBufferSize int
IsHA bool IsHA bool
storageMetrics *resource.StorageMetrics
// testing // testing
SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount
@ -69,6 +70,7 @@ func NewBackend(opts BackendOptions) (Backend, error) {
dbProvider: opts.DBProvider, dbProvider: opts.DBProvider,
pollingInterval: opts.PollingInterval, pollingInterval: opts.PollingInterval,
watchBufferSize: opts.WatchBufferSize, watchBufferSize: opts.WatchBufferSize,
storageMetrics: opts.storageMetrics,
bulkLock: &bulkLock{running: make(map[string]bool)}, bulkLock: &bulkLock{running: make(map[string]bool)},
simulatedNetworkLatency: opts.SimulatedNetworkLatency, simulatedNetworkLatency: opts.SimulatedNetworkLatency,
}, nil }, nil
@ -85,8 +87,9 @@ type backend struct {
initErr error initErr error
// o11y // o11y
log log.Logger log log.Logger
tracer trace.Tracer tracer trace.Tracer
storageMetrics *resource.StorageMetrics
// database // database
dbProvider db.DBProvider dbProvider db.DBProvider

View File

@ -31,6 +31,7 @@ func newNotifier(b *backend) (eventNotifier, error) {
tracer: b.tracer, tracer: b.tracer,
bulkLock: b.bulkLock, bulkLock: b.bulkLock,
listLatestRVs: b.listLatestRVs, listLatestRVs: b.listLatestRVs,
storageMetrics: b.storageMetrics,
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) { historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
var records []*historyPollResponse var records []*historyPollResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {

View File

@ -31,8 +31,9 @@ type pollingNotifier struct {
pollingInterval time.Duration pollingInterval time.Duration
watchBufferSize int watchBufferSize int
log log.Logger log log.Logger
tracer trace.Tracer tracer trace.Tracer
storageMetrics *resource.StorageMetrics
bulkLock *bulkLock bulkLock *bulkLock
listLatestRVs func(ctx context.Context) (groupResourceRV, error) listLatestRVs func(ctx context.Context) (groupResourceRV, error)
@ -46,8 +47,9 @@ type pollingNotifierConfig struct {
pollingInterval time.Duration pollingInterval time.Duration
watchBufferSize int watchBufferSize int
log log.Logger log log.Logger
tracer trace.Tracer tracer trace.Tracer
storageMetrics *resource.StorageMetrics
bulkLock *bulkLock bulkLock *bulkLock
listLatestRVs func(ctx context.Context) (groupResourceRV, error) listLatestRVs func(ctx context.Context) (groupResourceRV, error)
@ -101,6 +103,7 @@ func newPollingNotifier(cfg *pollingNotifierConfig) (*pollingNotifier, error) {
listLatestRVs: cfg.listLatestRVs, listLatestRVs: cfg.listLatestRVs,
historyPoll: cfg.historyPoll, historyPoll: cfg.historyPoll,
done: cfg.done, done: cfg.done,
storageMetrics: cfg.storageMetrics,
}, nil }, nil
} }
@ -172,7 +175,9 @@ func (p *pollingNotifier) poll(ctx context.Context, grp string, res string, sinc
if err != nil { if err != nil {
return 0, fmt.Errorf("poll history: %w", err) return 0, fmt.Errorf("poll history: %w", err)
} }
resource.NewStorageMetrics().PollerLatency.Observe(time.Since(start).Seconds()) if p.storageMetrics != nil {
p.storageMetrics.PollerLatency.Observe(time.Since(start).Seconds())
}
var nextRV int64 var nextRV int64
for _, rec := range records { for _, rec := range records {

View File

@ -18,7 +18,7 @@ import (
// Creates a new ResourceServer // Creates a new ResourceServer
func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, searchOptions resource.SearchOptions) (resource.ResourceServer, error) { tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics) (resource.ResourceServer, error) {
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver") apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
opts := resource.ResourceServerOptions{ opts := resource.ResourceServerOptions{
Tracer: tracer, Tracer: tracer,
@ -47,7 +47,7 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
isHA := isHighAvailabilityEnabled(cfg.SectionWithEnvOverrides("database")) isHA := isHighAvailabilityEnabled(cfg.SectionWithEnvOverrides("database"))
store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA}) store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA, storageMetrics: storageMetrics})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -49,8 +49,9 @@ type service struct {
authenticator interceptors.Authenticator authenticator interceptors.Authenticator
log log.Logger log log.Logger
reg prometheus.Registerer reg prometheus.Registerer
storageMetrics *resource.StorageMetrics
docBuilders resource.DocumentBuilderSupplier docBuilders resource.DocumentBuilderSupplier
} }
@ -62,6 +63,7 @@ func ProvideUnifiedStorageGrpcService(
log log.Logger, log log.Logger,
reg prometheus.Registerer, reg prometheus.Registerer,
docBuilders resource.DocumentBuilderSupplier, docBuilders resource.DocumentBuilderSupplier,
storageMetrics *resource.StorageMetrics,
) (UnifiedStorageGrpcService, error) { ) (UnifiedStorageGrpcService, error) {
tracingCfg, err := tracing.ProvideTracingConfig(cfg) tracingCfg, err := tracing.ProvideTracingConfig(cfg)
if err != nil { if err != nil {
@ -84,15 +86,16 @@ func ProvideUnifiedStorageGrpcService(
authn := grpcutils.NewAuthenticatorWithFallback(cfg, reg, tracing, &grpc.Authenticator{Tracer: tracing}) authn := grpcutils.NewAuthenticatorWithFallback(cfg, reg, tracing, &grpc.Authenticator{Tracer: tracing})
s := &service{ s := &service{
cfg: cfg, cfg: cfg,
features: features, features: features,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
authenticator: authn, authenticator: authn,
tracing: tracing, tracing: tracing,
db: db, db: db,
log: log, log: log,
reg: reg, reg: reg,
docBuilders: docBuilders, docBuilders: docBuilders,
storageMetrics: storageMetrics,
} }
// This will be used when running as a dskit service // This will be used when running as a dskit service
@ -112,7 +115,7 @@ func (s *service) start(ctx context.Context) error {
return err return err
} }
server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions) server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics)
if err != nil { if err != nil {
return err return err
} }

View File

@ -83,7 +83,7 @@ func TestClientServer(t *testing.T) {
features := featuremgmt.WithFeatures() features := featuremgmt.WithFeatures()
svc, err := sql.ProvideUnifiedStorageGrpcService(cfg, features, dbstore, nil, prometheus.NewPedanticRegistry(), nil) svc, err := sql.ProvideUnifiedStorageGrpcService(cfg, features, dbstore, nil, prometheus.NewPedanticRegistry(), nil, nil)
require.NoError(t, err) require.NoError(t, err)
var client resource.ResourceStoreClient var client resource.ResourceStoreClient

View File

@ -104,7 +104,7 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes
var storage sql.UnifiedStorageGrpcService var storage sql.UnifiedStorageGrpcService
if runstore { if runstore {
storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore, storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore,
env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil) env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil, nil)
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
err = storage.StartAsync(ctx) err = storage.StartAsync(ctx)