From 37d39de36de97bffe6302759948acf84cbdac423 Mon Sep 17 00:00:00 2001 From: owensmallwood Date: Mon, 8 Apr 2024 08:35:01 -0600 Subject: [PATCH] Storage Api: Add metrics (#85316) * Storage server runs own instrumentation server if its the sole target. Starts adding some sample metrics for now. * adds metric for failed optimistic locks * refactors metrics registration to own method on service for testability. Adds tests. * Register sql storage server metrics from within the service * fixes test * troubleshooting drone test failures. Maybe timing when starting instrumentation server? * Waits until instrumentation server has started. Updates tests. * defer wont get called unless theres an error. removing. * wait for instrumentation server to be running * linter - close res body * use port 3000 for metrics and removes test metric inc() call * fixes test - updates port * refactors module server to provide an instrumentation server module when there is no ALL or CORE target provided and running as single target * make instrumentation server a dependency of all modules that do not run their own http server * adds module server test * adds tests for instrumentation service and removes old tests that aren't needed * ignore error in test * uses helper to start and run service * when running wait on ctx done or http server err * wait for http server * removes println * updates module server test to be integration test * require no error in goroutine * skips integration test when GRAFANA_TEST_DB not defined * move http server start into start, verify returned content * make test error when run fails * try waiting longer and see if drone tests pass * update integration test mysql creds to match drone * go back to only waiting half second * debug log drone mysql connection string * use same db connection config as drone * try using same hostname as drone * cant use localhost as mysql hostname in drone tests. Need to parse it from the cfg db connection string --------- Co-authored-by: Dan Cech --- pkg/modules/dependencies.go | 11 +-- pkg/server/instrumentation_service.go | 68 ++++++++++++++ pkg/server/instrumentation_service_test.go | 52 +++++++++++ pkg/server/module_server.go | 11 ++- pkg/server/module_server_test.go | 88 +++++++++++++++++++ pkg/services/store/entity/server/service.go | 8 +- pkg/services/store/entity/sqlstash/metrics.go | 41 +++++++++ .../entity/sqlstash/sql_storage_server.go | 8 +- 8 files changed, 277 insertions(+), 10 deletions(-) create mode 100644 pkg/server/instrumentation_service.go create mode 100644 pkg/server/instrumentation_service_test.go create mode 100644 pkg/server/module_server_test.go create mode 100644 pkg/services/store/entity/sqlstash/metrics.go diff --git a/pkg/modules/dependencies.go b/pkg/modules/dependencies.go index 066a3f595b8..01d54255440 100644 --- a/pkg/modules/dependencies.go +++ b/pkg/modules/dependencies.go @@ -4,14 +4,15 @@ const ( // All includes all modules necessary for Grafana to run as a standalone server All string = "all" - Core string = "core" - GrafanaAPIServer string = "grafana-apiserver" - StorageServer string = "storage-server" + Core string = "core" + GrafanaAPIServer string = "grafana-apiserver" + StorageServer string = "storage-server" + InstrumentationServer string = "instrumentation-server" ) var dependencyMap = map[string][]string{ - GrafanaAPIServer: {}, - StorageServer: {}, + GrafanaAPIServer: {InstrumentationServer}, + StorageServer: {InstrumentationServer}, Core: {}, All: {Core}, } diff --git a/pkg/server/instrumentation_service.go b/pkg/server/instrumentation_service.go new file mode 100644 index 00000000000..138658800cc --- /dev/null +++ b/pkg/server/instrumentation_service.go @@ -0,0 +1,68 @@ +package server + +import ( + "context" + "net" + "net/http" + "time" + + "github.com/grafana/dskit/services" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type instrumentationService struct { + *services.BasicService + httpServ *http.Server + log log.Logger + errChan chan error +} + +func NewInstrumentationService(log log.Logger) (*instrumentationService, error) { + s := &instrumentationService{log: log} + s.BasicService = services.NewBasicService(s.start, s.running, s.stop) + return s, nil +} + +func (s *instrumentationService) start(ctx context.Context) error { + s.httpServ = s.newInstrumentationServer(ctx) + s.errChan = make(chan error) + go func() { + s.errChan <- s.httpServ.ListenAndServe() + }() + return nil +} + +func (s *instrumentationService) running(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case err := <-s.errChan: + return err + } +} + +func (s *instrumentationService) stop(failureReason error) error { + s.log.Info("stopping instrumentation server", "reason", failureReason) + if err := s.httpServ.Shutdown(context.Background()); err != nil { + s.log.Error("failed to shutdown instrumentation server", "error", err) + return err + } + + return nil +} + +func (s *instrumentationService) newInstrumentationServer(ctx context.Context) *http.Server { + router := http.NewServeMux() + router.Handle("/metrics", promhttp.Handler()) + + srv := &http.Server{ + // 5s timeout for header reads to avoid Slowloris attacks (https://thetooth.io/blog/slowloris-attack/) + ReadHeaderTimeout: 5 * time.Second, + Addr: ":3000", // TODO - make configurable? + Handler: router, + BaseContext: func(_ net.Listener) context.Context { return ctx }, + } + + return srv +} diff --git a/pkg/server/instrumentation_service_test.go b/pkg/server/instrumentation_service_test.go new file mode 100644 index 00000000000..1c4d9bc14cb --- /dev/null +++ b/pkg/server/instrumentation_service_test.go @@ -0,0 +1,52 @@ +package server + +import ( + "context" + "io" + "net/http" + "testing" + "time" + + "github.com/grafana/dskit/services" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRunInstrumentationService(t *testing.T) { + s, err := NewInstrumentationService(log.New("test-logger")) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + err = services.StartAndAwaitRunning(ctx, s) + require.NoError(t, err) + + testCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter", + }) + err = prometheus.Register(testCounter) + require.NoError(t, err) + + testCounter.Inc() + + time.Sleep(100 * time.Millisecond) + + client := http.Client{} + res, err := client.Get("http://localhost:3000/metrics") + require.NoError(t, err) + assert.Equal(t, 200, res.StatusCode) + + b, err := io.ReadAll(res.Body) + require.NoError(t, err) + resp := string(b[len(b)-16:]) + assert.Equal(t, "\ntest_counter 1\n", resp) + + err = res.Body.Close() + require.NoError(t, err) + + err = services.StopAndAwaitTerminated(ctx, s) + require.NoError(t, err) +} diff --git a/pkg/server/module_server.go b/pkg/server/module_server.go index af3f4d9e8d6..d226d5faeda 100644 --- a/pkg/server/module_server.go +++ b/pkg/server/module_server.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/grafana/dskit/services" - "github.com/grafana/grafana/pkg/api" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/modules" @@ -117,6 +116,14 @@ func (s *ModuleServer) Run() error { m := modules.New(s.cfg.Target) + // only run the instrumentation server module if were not running a module that already contains an http server + m.RegisterInvisibleModule(modules.InstrumentationServer, func() (services.Service, error) { + if m.IsModuleEnabled(modules.All) || m.IsModuleEnabled(modules.Core) { + return services.NewBasicService(nil, nil, nil).WithName(modules.InstrumentationServer), nil + } + return NewInstrumentationService(s.log) + }) + m.RegisterModule(modules.Core, func() (services.Service, error) { return NewService(s.cfg, s.opts, s.apiOpts) }) @@ -131,7 +138,7 @@ func (s *ModuleServer) Run() error { //} m.RegisterModule(modules.StorageServer, func() (services.Service, error) { - return storageServer.ProvideService(s.cfg, s.features) + return storageServer.ProvideService(s.cfg, s.features, s.log) }) m.RegisterModule(modules.All, nil) diff --git a/pkg/server/module_server_test.go b/pkg/server/module_server_test.go new file mode 100644 index 00000000000..23d8a89a20d --- /dev/null +++ b/pkg/server/module_server_test.go @@ -0,0 +1,88 @@ +package server + +import ( + "context" + "net/http" + "os" + "testing" + "time" + + "cuelang.org/go/pkg/regexp" + "github.com/grafana/grafana/pkg/api" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/modules" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tests/testsuite" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + testsuite.Run(m) +} + +func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + dbType := os.Getenv("GRAFANA_TEST_DB") + if dbType == "" { + t.Skip("skipping - GRAFANA_TEST_DB not defined") + } + if dbType == "sqlite3" { + t.Skip("skipping - sqlite not supported for storage server target") + } + + testdb := db.InitTestDB(t) + cfg := testdb.Cfg + cfg.GRPCServerNetwork = "tcp" + cfg.GRPCServerAddress = "localhost:10000" + addStorageServerToConfig(t, cfg, dbType) + cfg.Target = []string{modules.StorageServer} + + ms, err := InitializeModuleServer(cfg, Options{}, api.ServerOptions{}) + require.NoError(t, err) + + go func() { + err = ms.Run() + if err.Error() != "context canceled" { + t.Error(err) + } + }() + time.Sleep(500 * time.Millisecond) // wait for http server to be running + + client := http.Client{} + res, err := client.Get("http://localhost:3000/metrics") + require.NoError(t, err) + err = res.Body.Close() + require.NoError(t, err) + assert.Equal(t, 200, res.StatusCode) + + err = ms.Shutdown(context.Background(), "test over") + require.NoError(t, err) +} + +func addStorageServerToConfig(t *testing.T, cfg *setting.Cfg, dbType string) { + s, err := cfg.Raw.NewSection("entity_api") + require.NoError(t, err) + _, err = s.NewKey("db_type", dbType) + require.NoError(t, err) + + if dbType == "postgres" { + _, _ = s.NewKey("db_host", "localhost") + _, _ = s.NewKey("db_name", "grafanatest") + _, _ = s.NewKey("db_user", "grafanatest") + _, _ = s.NewKey("db_pass", "grafanatest") + } else { + // cant use localhost as hostname in drone tests for mysql, so need to parse it from connection string + sec, err := cfg.Raw.GetSection("database") + require.NoError(t, err) + connString := sec.Key("connection_string").String() + matches, err := regexp.FindSubmatch("(.+):(.+)@tcp\\((.+):(\\d+)\\)/(.+)\\?", connString) + require.NoError(t, err) + _, _ = s.NewKey("db_host", matches[3]) + _, _ = s.NewKey("db_name", matches[5]) + _, _ = s.NewKey("db_user", matches[1]) + _, _ = s.NewKey("db_pass", matches[2]) + } +} diff --git a/pkg/services/store/entity/server/service.go b/pkg/services/store/entity/server/service.go index f4e4468e40a..7e315a5777b 100644 --- a/pkg/services/store/entity/server/service.go +++ b/pkg/services/store/entity/server/service.go @@ -4,8 +4,7 @@ import ( "context" "github.com/grafana/dskit/services" - "github.com/prometheus/client_golang/prometheus" - + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/modules" "github.com/grafana/grafana/pkg/registry" @@ -17,6 +16,7 @@ import ( "github.com/grafana/grafana/pkg/services/store/entity/grpc" "github.com/grafana/grafana/pkg/services/store/entity/sqlstash" "github.com/grafana/grafana/pkg/setting" + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -51,11 +51,14 @@ type service struct { tracing *tracing.TracingService authenticator interceptors.Authenticator + + log log.Logger } func ProvideService( cfg *setting.Cfg, features featuremgmt.FeatureToggles, + log log.Logger, ) (*service, error) { tracingCfg, err := tracing.ProvideTracingConfig(cfg) if err != nil { @@ -76,6 +79,7 @@ func ProvideService( stopCh: make(chan struct{}), authenticator: authn, tracing: tracing, + log: log, } // This will be used when running as a dskit service diff --git a/pkg/services/store/entity/sqlstash/metrics.go b/pkg/services/store/entity/sqlstash/metrics.go new file mode 100644 index 00000000000..075f058c7f5 --- /dev/null +++ b/pkg/services/store/entity/sqlstash/metrics.go @@ -0,0 +1,41 @@ +package sqlstash + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + once sync.Once + StorageServerMetrics *StorageApiMetrics +) + +type StorageApiMetrics struct { + OptimisticLockFailed *prometheus.CounterVec +} + +func NewStorageMetrics() *StorageApiMetrics { + once.Do(func() { + StorageServerMetrics = &StorageApiMetrics{ + OptimisticLockFailed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "storage_server", + Name: "optimistic_lock_failed", + Help: "count of optimistic locks failed", + }, + []string{"action"}, + ), + } + }) + + return StorageServerMetrics +} + +func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) { + s.OptimisticLockFailed.Collect(ch) +} + +func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) { + s.OptimisticLockFailed.Describe(ch) +} diff --git a/pkg/services/store/entity/sqlstash/sql_storage_server.go b/pkg/services/store/entity/sqlstash/sql_storage_server.go index 24f622c1273..ee9b609a4fa 100644 --- a/pkg/services/store/entity/sqlstash/sql_storage_server.go +++ b/pkg/services/store/entity/sqlstash/sql_storage_server.go @@ -14,7 +14,6 @@ import ( "github.com/bwmarrin/snowflake" "github.com/google/uuid" - folder "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" "github.com/grafana/grafana/pkg/infra/appcontext" "github.com/grafana/grafana/pkg/infra/log" @@ -23,6 +22,7 @@ import ( "github.com/grafana/grafana/pkg/services/store" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" + "github.com/prometheus/client_golang/prometheus" ) const entityTable = "entity" @@ -38,6 +38,10 @@ func ProvideSQLEntityServer(db db.EntityDBInterface /*, cfg *setting.Cfg */) (en ctx: context.Background(), } + if err := prometheus.Register(NewStorageMetrics()); err != nil { + entityServer.log.Warn("error registering storage server metrics", "error", err) + } + return entityServer, nil } @@ -514,6 +518,7 @@ func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequ // Optimistic locking if r.PreviousVersion > 0 && r.PreviousVersion != current.ResourceVersion { + StorageServerMetrics.OptimisticLockFailed.WithLabelValues("update").Inc() return fmt.Errorf("optimistic lock failed") } @@ -759,6 +764,7 @@ func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequ if r.PreviousVersion > 0 && r.PreviousVersion != rsp.Entity.ResourceVersion { rsp.Status = entity.DeleteEntityResponse_ERROR + StorageServerMetrics.OptimisticLockFailed.WithLabelValues("delete").Inc() return fmt.Errorf("optimistic lock failed") }