diff --git a/pkg/api/plugin_resource_test.go b/pkg/api/plugin_resource_test.go index f5772491203..914ef903c73 100644 --- a/pkg/api/plugin_resource_test.go +++ b/pkg/api/plugin_resource_test.go @@ -11,12 +11,14 @@ import ( "github.com/grafana/grafana-azure-sdk-go/azsettings" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" pluginClient "github.com/grafana/grafana/pkg/plugins/manager/client" "github.com/grafana/grafana/pkg/plugins/manager/fakes" @@ -87,13 +89,20 @@ func TestCallResource(t *testing.T) { require.NoError(t, resp.Body.Close()) require.Equal(t, 200, resp.StatusCode) }) - + pluginRegistry := fakes.NewFakePluginRegistry() + require.NoError(t, pluginRegistry.Add(context.Background(), &plugins.Plugin{ + JSONData: plugins.JSONData{ + ID: "grafana-testdata-datasource", + Backend: true, + }, + })) + middlewares := pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest(), &caching.OSSCachingService{}, &featuremgmt.FeatureManager{}, prometheus.DefaultRegisterer, pluginRegistry) pc, err := pluginClient.NewDecorator(&fakes.FakePluginClient{ CallResourceHandlerFunc: backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { return errors.New("something went wrong") }), - }, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest(), &caching.OSSCachingService{}, &featuremgmt.FeatureManager{})...) + }, middlewares...) require.NoError(t, err) srv = SetupAPITestServer(t, func(hs *HTTPServer) { diff --git a/pkg/plugins/backendplugin/instrumentation/instrumentation.go b/pkg/plugins/backendplugin/instrumentation/instrumentation.go deleted file mode 100644 index 9b714449f90..00000000000 --- a/pkg/plugins/backendplugin/instrumentation/instrumentation.go +++ /dev/null @@ -1,136 +0,0 @@ -// Package instrumentation contains backend plugin instrumentation logic. -package instrumentation - -import ( - "context" - "errors" - "time" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/plugins/backendplugin" -) - -var ( - pluginRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "grafana", - Name: "plugin_request_total", - Help: "The total amount of plugin requests", - }, []string{"plugin_id", "endpoint", "status", "target"}) - - pluginRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "grafana", - Name: "plugin_request_duration_milliseconds", - Help: "Plugin request duration", - Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}, - }, []string{"plugin_id", "endpoint", "target"}) - - pluginRequestSizeHistogram = promauto.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "grafana", - Name: "plugin_request_size_bytes", - Help: "histogram of plugin request sizes returned", - Buckets: []float64{128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576}, - }, []string{"source", "plugin_id", "endpoint", "target"}, - ) - - PluginRequestDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "grafana", - Name: "plugin_request_duration_seconds", - Help: "Plugin request duration in seconds", - Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25}, - }, []string{"source", "plugin_id", "endpoint", "status", "target"}) -) - -const ( - statusOK = "ok" - statusError = "error" - statusCancelled = "cancelled" - - endpointCallResource = "callResource" - endpointCheckHealth = "checkHealth" - endpointCollectMetrics = "collectMetrics" - endpointQueryData = "queryData" -) - -// instrumentPluginRequest instruments success rate and latency of `fn` -func instrumentPluginRequest(ctx context.Context, cfg Cfg, pluginCtx *backend.PluginContext, endpoint string, fn func(ctx context.Context) error) error { - status := statusOK - start := time.Now() - - ctx = instrumentContext(ctx, endpoint, *pluginCtx) - err := fn(ctx) - if err != nil { - status = statusError - if errors.Is(err, context.Canceled) { - status = statusCancelled - } - } - - elapsed := time.Since(start) - - pluginRequestDurationWithLabels := pluginRequestDuration.WithLabelValues(pluginCtx.PluginID, endpoint, string(cfg.Target)) - pluginRequestCounterWithLabels := pluginRequestCounter.WithLabelValues(pluginCtx.PluginID, endpoint, status, string(cfg.Target)) - pluginRequestDurationSecondsWithLabels := PluginRequestDurationSeconds.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, status, string(cfg.Target)) - - if traceID := tracing.TraceIDFromContext(ctx, true); traceID != "" { - pluginRequestDurationWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar( - float64(elapsed/time.Millisecond), prometheus.Labels{"traceID": traceID}, - ) - pluginRequestCounterWithLabels.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{"traceID": traceID}) - pluginRequestDurationSecondsWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar( - elapsed.Seconds(), prometheus.Labels{"traceID": traceID}, - ) - } else { - pluginRequestDurationWithLabels.Observe(float64(elapsed / time.Millisecond)) - pluginRequestCounterWithLabels.Inc() - pluginRequestDurationSecondsWithLabels.Observe(elapsed.Seconds()) - } - - return err -} - -func instrumentContext(ctx context.Context, endpoint string, pCtx backend.PluginContext) context.Context { - p := []any{"endpoint", endpoint, "pluginId", pCtx.PluginID} - if pCtx.DataSourceInstanceSettings != nil { - p = append(p, "dsName", pCtx.DataSourceInstanceSettings.Name) - p = append(p, "dsUID", pCtx.DataSourceInstanceSettings.UID) - } - if pCtx.User != nil { - p = append(p, "uname", pCtx.User.Login) - } - return log.WithContextualAttributes(ctx, p) -} - -type Cfg struct { - Target backendplugin.Target -} - -// InstrumentCollectMetrics instruments collectMetrics. -func InstrumentCollectMetrics(ctx context.Context, req *backend.PluginContext, cfg Cfg, fn func(ctx context.Context) error) error { - return instrumentPluginRequest(ctx, cfg, req, endpointCollectMetrics, fn) -} - -// InstrumentCheckHealthRequest instruments checkHealth. -func InstrumentCheckHealthRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg, fn func(ctx context.Context) error) error { - return instrumentPluginRequest(ctx, cfg, req, endpointCheckHealth, fn) -} - -// InstrumentCallResourceRequest instruments callResource. -func InstrumentCallResourceRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg, requestSize float64, fn func(ctx context.Context) error) error { - pluginRequestSizeHistogram.WithLabelValues("grafana-backend", req.PluginID, endpointCallResource, - string(cfg.Target)).Observe(requestSize) - return instrumentPluginRequest(ctx, cfg, req, endpointCallResource, fn) -} - -// InstrumentQueryDataRequest instruments success rate and latency of query data requests. -func InstrumentQueryDataRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg, - requestSize float64, fn func(ctx context.Context) error) error { - pluginRequestSizeHistogram.WithLabelValues("grafana-backend", req.PluginID, endpointQueryData, - string(cfg.Target)).Observe(requestSize) - return instrumentPluginRequest(ctx, cfg, req, endpointQueryData, fn) -} diff --git a/pkg/plugins/manager/client/client.go b/pkg/plugins/manager/client/client.go index 89afd71fdb0..729c5f7618b 100644 --- a/pkg/plugins/manager/client/client.go +++ b/pkg/plugins/manager/client/client.go @@ -10,7 +10,6 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation" "github.com/grafana/grafana/pkg/plugins/config" "github.com/grafana/grafana/pkg/plugins/manager/registry" ) @@ -50,19 +49,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) return nil, plugins.ErrPluginNotRegistered } - var totalBytes float64 - for _, v := range req.Queries { - totalBytes += float64(len(v.JSON)) - } - - var resp *backend.QueryDataResponse - err := instrumentation.InstrumentQueryDataRequest(ctx, &req.PluginContext, instrumentation.Cfg{ - Target: p.Target(), - }, totalBytes, func(ctx context.Context) (innerErr error) { - resp, innerErr = p.QueryData(ctx, req) - return - }) - + resp, err := p.QueryData(ctx, req) if err != nil { if errors.Is(err, plugins.ErrMethodNotImplemented) { return nil, err @@ -101,35 +88,28 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq return plugins.ErrPluginNotRegistered } - totalBytes := float64(len(req.Body)) - err := instrumentation.InstrumentCallResourceRequest(ctx, &req.PluginContext, instrumentation.Cfg{ - Target: p.Target(), - }, totalBytes, func(ctx context.Context) (innerErr error) { - removeConnectionHeaders(req.Headers) - removeHopByHopHeaders(req.Headers) - removeNonAllowedHeaders(req.Headers) + removeConnectionHeaders(req.Headers) + removeHopByHopHeaders(req.Headers) + removeNonAllowedHeaders(req.Headers) - processedStreams := 0 - wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error { - // Expected that headers and status are only part of first stream - if processedStreams == 0 && res != nil { - if len(res.Headers) > 0 { - removeConnectionHeaders(res.Headers) - removeHopByHopHeaders(res.Headers) - removeNonAllowedHeaders(res.Headers) - } - - ensureContentTypeHeader(res) + processedStreams := 0 + wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error { + // Expected that headers and status are only part of first stream + if processedStreams == 0 && res != nil { + if len(res.Headers) > 0 { + removeConnectionHeaders(res.Headers) + removeHopByHopHeaders(res.Headers) + removeNonAllowedHeaders(res.Headers) } - processedStreams++ - return sender.Send(res) - }) + ensureContentTypeHeader(res) + } - innerErr = p.CallResource(ctx, req, wrappedSender) - return + processedStreams++ + return sender.Send(res) }) + err := p.CallResource(ctx, req, wrappedSender) if err != nil { return plugins.ErrPluginDownstreamErrorBase.Errorf("client: failed to call resources: %w", err) } @@ -147,13 +127,7 @@ func (s *Service) CollectMetrics(ctx context.Context, req *backend.CollectMetric return nil, plugins.ErrPluginNotRegistered } - var resp *backend.CollectMetricsResult - err := instrumentation.InstrumentCollectMetrics(ctx, &req.PluginContext, instrumentation.Cfg{ - Target: p.Target(), - }, func(ctx context.Context) (innerErr error) { - resp, innerErr = p.CollectMetrics(ctx, req) - return - }) + resp, err := p.CollectMetrics(ctx, req) if err != nil { return nil, plugins.ErrPluginDownstreamErrorBase.Errorf("client: failed to collect metrics: %w", err) } @@ -171,14 +145,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque return nil, plugins.ErrPluginNotRegistered } - var resp *backend.CheckHealthResult - err := instrumentation.InstrumentCheckHealthRequest(ctx, &req.PluginContext, instrumentation.Cfg{ - Target: p.Target(), - }, func(ctx context.Context) (innerErr error) { - resp, innerErr = p.CheckHealth(ctx, req) - return - }) - + resp, err := p.CheckHealth(ctx, req) if err != nil { if errors.Is(err, plugins.ErrMethodNotImplemented) { return nil, err diff --git a/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware.go b/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware.go new file mode 100644 index 00000000000..ddbaaeaa3a1 --- /dev/null +++ b/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware.go @@ -0,0 +1,214 @@ +package clientmiddleware + +import ( + "context" + "errors" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/manager/registry" +) + +// pluginMetrics contains the prometheus metrics used by the InstrumentationMiddleware. +type pluginMetrics struct { + pluginRequestCounter *prometheus.CounterVec + pluginRequestDuration *prometheus.HistogramVec + pluginRequestSize *prometheus.HistogramVec + pluginRequestDurationSeconds *prometheus.HistogramVec +} + +// InstrumentationMiddleware is a middleware that instruments plugin requests. +// It tracks requests count, duration and size as prometheus metrics. +// It also enriches the [context.Context] with a contextual logger containing plugin and request details. +// For those reasons, this middleware should live at the top of the middleware stack. +type InstrumentationMiddleware struct { + pluginMetrics + pluginRegistry registry.Service + next plugins.Client +} + +func newInstrumentationMiddleware(promRegisterer prometheus.Registerer, pluginRegistry registry.Service) *InstrumentationMiddleware { + pluginRequestCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana", + Name: "plugin_request_total", + Help: "The total amount of plugin requests", + }, []string{"plugin_id", "endpoint", "status", "target"}) + pluginRequestDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "grafana", + Name: "plugin_request_duration_milliseconds", + Help: "Plugin request duration", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}, + }, []string{"plugin_id", "endpoint", "target"}) + pluginRequestSize := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "grafana", + Name: "plugin_request_size_bytes", + Help: "histogram of plugin request sizes returned", + Buckets: []float64{128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576}, + }, []string{"source", "plugin_id", "endpoint", "target"}, + ) + pluginRequestDurationSeconds := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "grafana", + Name: "plugin_request_duration_seconds", + Help: "Plugin request duration in seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25}, + }, []string{"source", "plugin_id", "endpoint", "status", "target"}) + promRegisterer.MustRegister( + pluginRequestCounter, + pluginRequestDuration, + pluginRequestSize, + pluginRequestDurationSeconds, + ) + return &InstrumentationMiddleware{ + pluginMetrics: pluginMetrics{ + pluginRequestCounter: pluginRequestCounter, + pluginRequestDuration: pluginRequestDuration, + pluginRequestSize: pluginRequestSize, + pluginRequestDurationSeconds: pluginRequestDurationSeconds, + }, + pluginRegistry: pluginRegistry, + } +} + +// NewInstrumentationMiddleware returns a new InstrumentationMiddleware. +func NewInstrumentationMiddleware(promRegisterer prometheus.Registerer, pluginRegistry registry.Service) plugins.ClientMiddleware { + imw := newInstrumentationMiddleware(promRegisterer, pluginRegistry) + return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client { + imw.next = next + return imw + }) +} + +// pluginTarget returns the value for the "target" Prometheus label for the given plugin ID. +func (m *InstrumentationMiddleware) pluginTarget(ctx context.Context, pluginID string) (string, error) { + p, exists := m.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return "", plugins.ErrPluginNotRegistered + } + return string(p.Target()), nil +} + +// instrumentContext adds a contextual logger with plugin and request details to the given context. +func instrumentContext(ctx context.Context, endpoint string, pCtx backend.PluginContext) context.Context { + p := []any{"endpoint", endpoint, "pluginId", pCtx.PluginID} + if pCtx.DataSourceInstanceSettings != nil { + p = append(p, "dsName", pCtx.DataSourceInstanceSettings.Name) + p = append(p, "dsUID", pCtx.DataSourceInstanceSettings.UID) + } + if pCtx.User != nil { + p = append(p, "uname", pCtx.User.Login) + } + return log.WithContextualAttributes(ctx, p) +} + +// instrumentPluginRequestSize tracks the size of the given request in the m.pluginRequestSize metric. +func (m *InstrumentationMiddleware) instrumentPluginRequestSize(ctx context.Context, pluginCtx backend.PluginContext, endpoint string, requestSize float64) error { + target, err := m.pluginTarget(ctx, pluginCtx.PluginID) + if err != nil { + return err + } + m.pluginRequestSize.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, target).Observe(requestSize) + return nil +} + +// instrumentPluginRequest increments the m.pluginRequestCounter metric and tracks the duration of the given request. +func (m *InstrumentationMiddleware) instrumentPluginRequest(ctx context.Context, pluginCtx backend.PluginContext, endpoint string, fn func(context.Context) error) error { + target, err := m.pluginTarget(ctx, pluginCtx.PluginID) + if err != nil { + return err + } + + status := statusOK + start := time.Now() + + ctx = instrumentContext(ctx, endpoint, pluginCtx) + err = fn(ctx) + if err != nil { + status = statusError + if errors.Is(err, context.Canceled) { + status = statusCancelled + } + } + + elapsed := time.Since(start) + + pluginRequestDurationWithLabels := m.pluginRequestDuration.WithLabelValues(pluginCtx.PluginID, endpoint, target) + pluginRequestCounterWithLabels := m.pluginRequestCounter.WithLabelValues(pluginCtx.PluginID, endpoint, status, target) + pluginRequestDurationSecondsWithLabels := m.pluginRequestDurationSeconds.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, status, target) + + if traceID := tracing.TraceIDFromContext(ctx, true); traceID != "" { + pluginRequestDurationWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar( + float64(elapsed/time.Millisecond), prometheus.Labels{"traceID": traceID}, + ) + pluginRequestCounterWithLabels.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{"traceID": traceID}) + pluginRequestDurationSecondsWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar( + elapsed.Seconds(), prometheus.Labels{"traceID": traceID}, + ) + } else { + pluginRequestDurationWithLabels.Observe(float64(elapsed / time.Millisecond)) + pluginRequestCounterWithLabels.Inc() + pluginRequestDurationSecondsWithLabels.Observe(elapsed.Seconds()) + } + + return err +} + +func (m *InstrumentationMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + var requestSize float64 + for _, v := range req.Queries { + requestSize += float64(len(v.JSON)) + } + if err := m.instrumentPluginRequestSize(ctx, req.PluginContext, endpointQueryData, requestSize); err != nil { + return nil, err + } + var resp *backend.QueryDataResponse + err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointQueryData, func(ctx context.Context) (innerErr error) { + resp, innerErr = m.next.QueryData(ctx, req) + return innerErr + }) + return resp, err +} + +func (m *InstrumentationMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if err := m.instrumentPluginRequestSize(ctx, req.PluginContext, endpointCallResource, float64(len(req.Body))); err != nil { + return err + } + return m.instrumentPluginRequest(ctx, req.PluginContext, endpointCallResource, func(ctx context.Context) error { + return m.next.CallResource(ctx, req, sender) + }) +} + +func (m *InstrumentationMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + var result *backend.CheckHealthResult + err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointCheckHealth, func(ctx context.Context) (innerErr error) { + result, innerErr = m.next.CheckHealth(ctx, req) + return + }) + return result, err +} + +func (m *InstrumentationMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + var result *backend.CollectMetricsResult + err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointCollectMetrics, func(ctx context.Context) (innerErr error) { + result, innerErr = m.next.CollectMetrics(ctx, req) + return + }) + return result, err +} + +func (m *InstrumentationMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + return m.next.SubscribeStream(ctx, req) +} + +func (m *InstrumentationMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + return m.next.PublishStream(ctx, req) +} + +func (m *InstrumentationMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + return m.next.RunStream(ctx, req, sender) +} diff --git a/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware_test.go b/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware_test.go new file mode 100644 index 00000000000..38dfea37df2 --- /dev/null +++ b/pkg/services/pluginsintegration/clientmiddleware/instrumentation_middleware_test.go @@ -0,0 +1,164 @@ +package clientmiddleware + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/manager/client/clienttest" + "github.com/grafana/grafana/pkg/plugins/manager/fakes" +) + +func TestInstrumentationMiddleware(t *testing.T) { + const ( + pluginID = "plugin-id" + + metricRequestTotal = "grafana_plugin_request_total" + metricRequestDurationMs = "grafana_plugin_request_duration_milliseconds" + metricRequestDurationS = "grafana_plugin_request_duration_seconds" + metricRequestSize = "grafana_plugin_request_size_bytes" + ) + + pCtx := backend.PluginContext{PluginID: pluginID} + + t.Run("should instrument requests", func(t *testing.T) { + for _, tc := range []struct { + expEndpoint string + fn func(cdt *clienttest.ClientDecoratorTest) error + shouldInstrumentRequestSize bool + }{ + { + expEndpoint: endpointCheckHealth, + fn: func(cdt *clienttest.ClientDecoratorTest) error { + _, err := cdt.Decorator.CheckHealth(context.Background(), &backend.CheckHealthRequest{PluginContext: pCtx}) + return err + }, + shouldInstrumentRequestSize: false, + }, + { + expEndpoint: endpointCallResource, + fn: func(cdt *clienttest.ClientDecoratorTest) error { + return cdt.Decorator.CallResource(context.Background(), &backend.CallResourceRequest{PluginContext: pCtx}, nopCallResourceSender) + }, + shouldInstrumentRequestSize: true, + }, + { + expEndpoint: endpointQueryData, + fn: func(cdt *clienttest.ClientDecoratorTest) error { + _, err := cdt.Decorator.QueryData(context.Background(), &backend.QueryDataRequest{PluginContext: pCtx}) + return err + }, + shouldInstrumentRequestSize: true, + }, + { + expEndpoint: endpointCollectMetrics, + fn: func(cdt *clienttest.ClientDecoratorTest) error { + _, err := cdt.Decorator.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{PluginContext: pCtx}) + return err + }, + shouldInstrumentRequestSize: false, + }, + } { + t.Run(tc.expEndpoint, func(t *testing.T) { + promRegistry := prometheus.NewRegistry() + pluginsRegistry := fakes.NewFakePluginRegistry() + require.NoError(t, pluginsRegistry.Add(context.Background(), &plugins.Plugin{ + JSONData: plugins.JSONData{ID: pluginID, Backend: true}, + })) + + mw := newInstrumentationMiddleware(promRegistry, pluginsRegistry) + cdt := clienttest.NewClientDecoratorTest(t, clienttest.WithMiddlewares( + plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client { + mw.next = next + return mw + }), + )) + require.NoError(t, tc.fn(cdt)) + + // Ensure the correct metrics have been incremented/observed + require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestTotal)) + require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestDurationMs)) + require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestDurationS)) + + counter := mw.pluginMetrics.pluginRequestCounter.WithLabelValues(pluginID, tc.expEndpoint, statusOK, string(backendplugin.TargetUnknown)) + require.Equal(t, 1.0, testutil.ToFloat64(counter)) + for _, m := range []string{metricRequestDurationMs, metricRequestDurationS} { + require.NoError(t, checkHistogram(promRegistry, m, map[string]string{ + "plugin_id": pluginID, + "endpoint": tc.expEndpoint, + "target": string(backendplugin.TargetUnknown), + })) + } + if tc.shouldInstrumentRequestSize { + require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestSize), "request size should have been instrumented") + require.NoError(t, checkHistogram(promRegistry, metricRequestSize, map[string]string{ + "plugin_id": pluginID, + "endpoint": tc.expEndpoint, + "target": string(backendplugin.TargetUnknown), + "source": "grafana-backend", + }), "request size should have been instrumented") + } + }) + } + }) +} + +// checkHistogram is a utility function that checks if a histogram with the given name and label values exists +// and has been observed at least once. +func checkHistogram(promRegistry *prometheus.Registry, expMetricName string, expLabels map[string]string) error { + metrics, err := promRegistry.Gather() + if err != nil { + return fmt.Errorf("gather: %w", err) + } + var metricFamily *dto.MetricFamily + for _, mf := range metrics { + if *mf.Name == expMetricName { + metricFamily = mf + break + } + } + if metricFamily == nil { + return fmt.Errorf("metric %q not found", expMetricName) + } + var foundLabels int + var metric *dto.Metric + for _, m := range metricFamily.Metric { + for _, l := range m.GetLabel() { + v, ok := expLabels[*l.Name] + if !ok { + continue + } + if v != *l.Value { + return fmt.Errorf("expected label %q to have value %q, got %q", *l.Name, v, *l.Value) + } + foundLabels++ + } + if foundLabels == 0 { + continue + } + if foundLabels != len(expLabels) { + return fmt.Errorf("expected %d labels, got %d", len(expLabels), foundLabels) + } + metric = m + break + } + if metric == nil { + return fmt.Errorf("could not find metric with labels %v", expLabels) + } + if metric.Histogram == nil { + return fmt.Errorf("metric %q is not a histogram", expMetricName) + } + if metric.Histogram.SampleCount == nil || *metric.Histogram.SampleCount == 0 { + return errors.New("found metric but no samples have been collected") + } + return nil +} diff --git a/pkg/services/pluginsintegration/clientmiddleware/logger_middleware.go b/pkg/services/pluginsintegration/clientmiddleware/logger_middleware.go index c8a9311cfe2..716df10bb8e 100644 --- a/pkg/services/pluginsintegration/clientmiddleware/logger_middleware.go +++ b/pkg/services/pluginsintegration/clientmiddleware/logger_middleware.go @@ -13,17 +13,6 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -const ( - statusOK = "ok" - statusError = "error" - statusCancelled = "cancelled" - - endpointCallResource = "callResource" - endpointCheckHealth = "checkHealth" - endpointCollectMetrics = "collectMetrics" - endpointQueryData = "queryData" -) - // NewLoggerMiddleware creates a new plugins.ClientMiddleware that will // log requests. func NewLoggerMiddleware(cfg *setting.Cfg, logger plog.Logger) plugins.ClientMiddleware { diff --git a/pkg/services/pluginsintegration/clientmiddleware/utils.go b/pkg/services/pluginsintegration/clientmiddleware/utils.go index dc1f768532f..4201c3fe417 100644 --- a/pkg/services/pluginsintegration/clientmiddleware/utils.go +++ b/pkg/services/pluginsintegration/clientmiddleware/utils.go @@ -4,6 +4,17 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" ) +const ( + statusOK = "ok" + statusError = "error" + statusCancelled = "cancelled" + + endpointCallResource = "callResource" + endpointCheckHealth = "checkHealth" + endpointCollectMetrics = "collectMetrics" + endpointQueryData = "queryData" +) + type callResourceResponseSenderFunc func(res *backend.CallResourceResponse) error func (fn callResourceResponseSenderFunc) Send(res *backend.CallResourceResponse) error { diff --git a/pkg/services/pluginsintegration/pluginsintegration.go b/pkg/services/pluginsintegration/pluginsintegration.go index dbeeb237e28..cc09121be9f 100644 --- a/pkg/services/pluginsintegration/pluginsintegration.go +++ b/pkg/services/pluginsintegration/pluginsintegration.go @@ -2,6 +2,7 @@ package pluginsintegration import ( "github.com/google/wire" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" @@ -135,25 +136,27 @@ func ProvideClientDecorator( tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager, + promRegisterer prometheus.Registerer, ) (*client.Decorator, error) { - return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer, cachingService, features) + return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer, cachingService, features, promRegisterer, pluginRegistry) } func NewClientDecorator( cfg *setting.Cfg, pCfg *pCfg.Cfg, pluginRegistry registry.Service, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager, + promRegisterer prometheus.Registerer, registry registry.Service, ) (*client.Decorator, error) { c := client.ProvideService(pluginRegistry, pCfg) - middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer, cachingService, features) - + middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer, cachingService, features, promRegisterer, registry) return client.NewDecorator(c, middlewares...) } -func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager) []plugins.ClientMiddleware { +func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager, promRegisterer prometheus.Registerer, registry registry.Service) []plugins.ClientMiddleware { skipCookiesNames := []string{cfg.LoginCookieName} middlewares := []plugins.ClientMiddleware{ clientmiddleware.NewTracingMiddleware(tracer), + clientmiddleware.NewInstrumentationMiddleware(promRegisterer, registry), clientmiddleware.NewLoggerMiddleware(cfg, log.New("plugin.instrumentation")), clientmiddleware.NewTracingHeaderMiddleware(), clientmiddleware.NewClearAuthHeadersMiddleware(),