Plugins: Refactoring: Implement plugin instrumentation as a middleware (#76011)

* Plugins: Refactor instrumentation as plugin client middleware

* Simplify repeated code

* Fix compilation error

* Add comments

* Moved status and endpoint consts to utils.go

* Fix wrong endpoint name in CheckHealth InstrumentationMiddleware

* Add tests

* Fix wrong endpoint value in instrumentPluginRequestSize

* removed todo

* PR review feedback: use MustRegister

* PR review feedback: move tracing middleware before instrumentation middleware

* PR review feedback: removed decommissioned check

* PR review feedback: extract prometheus metrics into separate variables
This commit is contained in:
Giuseppe Guerra
2023-10-09 14:12:57 +02:00
committed by GitHub
parent 57b99728ad
commit cfcfbe4aaa
8 changed files with 426 additions and 205 deletions

View File

@ -11,12 +11,14 @@ import (
"github.com/grafana/grafana-azure-sdk-go/azsettings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
pluginClient "github.com/grafana/grafana/pkg/plugins/manager/client"
"github.com/grafana/grafana/pkg/plugins/manager/fakes"
@ -87,13 +89,20 @@ func TestCallResource(t *testing.T) {
require.NoError(t, resp.Body.Close())
require.Equal(t, 200, resp.StatusCode)
})
pluginRegistry := fakes.NewFakePluginRegistry()
require.NoError(t, pluginRegistry.Add(context.Background(), &plugins.Plugin{
JSONData: plugins.JSONData{
ID: "grafana-testdata-datasource",
Backend: true,
},
}))
middlewares := pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest(), &caching.OSSCachingService{}, &featuremgmt.FeatureManager{}, prometheus.DefaultRegisterer, pluginRegistry)
pc, err := pluginClient.NewDecorator(&fakes.FakePluginClient{
CallResourceHandlerFunc: backend.CallResourceHandlerFunc(func(ctx context.Context,
req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
return errors.New("something went wrong")
}),
}, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest(), &caching.OSSCachingService{}, &featuremgmt.FeatureManager{})...)
}, middlewares...)
require.NoError(t, err)
srv = SetupAPITestServer(t, func(hs *HTTPServer) {

View File

@ -1,136 +0,0 @@
// Package instrumentation contains backend plugin instrumentation logic.
package instrumentation
import (
"context"
"errors"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
)
var (
pluginRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "grafana",
Name: "plugin_request_total",
Help: "The total amount of plugin requests",
}, []string{"plugin_id", "endpoint", "status", "target"})
pluginRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_duration_milliseconds",
Help: "Plugin request duration",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
}, []string{"plugin_id", "endpoint", "target"})
pluginRequestSizeHistogram = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_size_bytes",
Help: "histogram of plugin request sizes returned",
Buckets: []float64{128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576},
}, []string{"source", "plugin_id", "endpoint", "target"},
)
PluginRequestDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_duration_seconds",
Help: "Plugin request duration in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25},
}, []string{"source", "plugin_id", "endpoint", "status", "target"})
)
const (
statusOK = "ok"
statusError = "error"
statusCancelled = "cancelled"
endpointCallResource = "callResource"
endpointCheckHealth = "checkHealth"
endpointCollectMetrics = "collectMetrics"
endpointQueryData = "queryData"
)
// instrumentPluginRequest instruments success rate and latency of `fn`
func instrumentPluginRequest(ctx context.Context, cfg Cfg, pluginCtx *backend.PluginContext, endpoint string, fn func(ctx context.Context) error) error {
status := statusOK
start := time.Now()
ctx = instrumentContext(ctx, endpoint, *pluginCtx)
err := fn(ctx)
if err != nil {
status = statusError
if errors.Is(err, context.Canceled) {
status = statusCancelled
}
}
elapsed := time.Since(start)
pluginRequestDurationWithLabels := pluginRequestDuration.WithLabelValues(pluginCtx.PluginID, endpoint, string(cfg.Target))
pluginRequestCounterWithLabels := pluginRequestCounter.WithLabelValues(pluginCtx.PluginID, endpoint, status, string(cfg.Target))
pluginRequestDurationSecondsWithLabels := PluginRequestDurationSeconds.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, status, string(cfg.Target))
if traceID := tracing.TraceIDFromContext(ctx, true); traceID != "" {
pluginRequestDurationWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar(
float64(elapsed/time.Millisecond), prometheus.Labels{"traceID": traceID},
)
pluginRequestCounterWithLabels.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{"traceID": traceID})
pluginRequestDurationSecondsWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar(
elapsed.Seconds(), prometheus.Labels{"traceID": traceID},
)
} else {
pluginRequestDurationWithLabels.Observe(float64(elapsed / time.Millisecond))
pluginRequestCounterWithLabels.Inc()
pluginRequestDurationSecondsWithLabels.Observe(elapsed.Seconds())
}
return err
}
func instrumentContext(ctx context.Context, endpoint string, pCtx backend.PluginContext) context.Context {
p := []any{"endpoint", endpoint, "pluginId", pCtx.PluginID}
if pCtx.DataSourceInstanceSettings != nil {
p = append(p, "dsName", pCtx.DataSourceInstanceSettings.Name)
p = append(p, "dsUID", pCtx.DataSourceInstanceSettings.UID)
}
if pCtx.User != nil {
p = append(p, "uname", pCtx.User.Login)
}
return log.WithContextualAttributes(ctx, p)
}
type Cfg struct {
Target backendplugin.Target
}
// InstrumentCollectMetrics instruments collectMetrics.
func InstrumentCollectMetrics(ctx context.Context, req *backend.PluginContext, cfg Cfg, fn func(ctx context.Context) error) error {
return instrumentPluginRequest(ctx, cfg, req, endpointCollectMetrics, fn)
}
// InstrumentCheckHealthRequest instruments checkHealth.
func InstrumentCheckHealthRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg, fn func(ctx context.Context) error) error {
return instrumentPluginRequest(ctx, cfg, req, endpointCheckHealth, fn)
}
// InstrumentCallResourceRequest instruments callResource.
func InstrumentCallResourceRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg, requestSize float64, fn func(ctx context.Context) error) error {
pluginRequestSizeHistogram.WithLabelValues("grafana-backend", req.PluginID, endpointCallResource,
string(cfg.Target)).Observe(requestSize)
return instrumentPluginRequest(ctx, cfg, req, endpointCallResource, fn)
}
// InstrumentQueryDataRequest instruments success rate and latency of query data requests.
func InstrumentQueryDataRequest(ctx context.Context, req *backend.PluginContext, cfg Cfg,
requestSize float64, fn func(ctx context.Context) error) error {
pluginRequestSizeHistogram.WithLabelValues("grafana-backend", req.PluginID, endpointQueryData,
string(cfg.Target)).Observe(requestSize)
return instrumentPluginRequest(ctx, cfg, req, endpointQueryData, fn)
}

View File

@ -10,7 +10,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
"github.com/grafana/grafana/pkg/plugins/config"
"github.com/grafana/grafana/pkg/plugins/manager/registry"
)
@ -50,19 +49,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return nil, plugins.ErrPluginNotRegistered
}
var totalBytes float64
for _, v := range req.Queries {
totalBytes += float64(len(v.JSON))
}
var resp *backend.QueryDataResponse
err := instrumentation.InstrumentQueryDataRequest(ctx, &req.PluginContext, instrumentation.Cfg{
Target: p.Target(),
}, totalBytes, func(ctx context.Context) (innerErr error) {
resp, innerErr = p.QueryData(ctx, req)
return
})
resp, err := p.QueryData(ctx, req)
if err != nil {
if errors.Is(err, plugins.ErrMethodNotImplemented) {
return nil, err
@ -101,35 +88,28 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
return plugins.ErrPluginNotRegistered
}
totalBytes := float64(len(req.Body))
err := instrumentation.InstrumentCallResourceRequest(ctx, &req.PluginContext, instrumentation.Cfg{
Target: p.Target(),
}, totalBytes, func(ctx context.Context) (innerErr error) {
removeConnectionHeaders(req.Headers)
removeHopByHopHeaders(req.Headers)
removeNonAllowedHeaders(req.Headers)
removeConnectionHeaders(req.Headers)
removeHopByHopHeaders(req.Headers)
removeNonAllowedHeaders(req.Headers)
processedStreams := 0
wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
// Expected that headers and status are only part of first stream
if processedStreams == 0 && res != nil {
if len(res.Headers) > 0 {
removeConnectionHeaders(res.Headers)
removeHopByHopHeaders(res.Headers)
removeNonAllowedHeaders(res.Headers)
}
ensureContentTypeHeader(res)
processedStreams := 0
wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
// Expected that headers and status are only part of first stream
if processedStreams == 0 && res != nil {
if len(res.Headers) > 0 {
removeConnectionHeaders(res.Headers)
removeHopByHopHeaders(res.Headers)
removeNonAllowedHeaders(res.Headers)
}
processedStreams++
return sender.Send(res)
})
ensureContentTypeHeader(res)
}
innerErr = p.CallResource(ctx, req, wrappedSender)
return
processedStreams++
return sender.Send(res)
})
err := p.CallResource(ctx, req, wrappedSender)
if err != nil {
return plugins.ErrPluginDownstreamErrorBase.Errorf("client: failed to call resources: %w", err)
}
@ -147,13 +127,7 @@ func (s *Service) CollectMetrics(ctx context.Context, req *backend.CollectMetric
return nil, plugins.ErrPluginNotRegistered
}
var resp *backend.CollectMetricsResult
err := instrumentation.InstrumentCollectMetrics(ctx, &req.PluginContext, instrumentation.Cfg{
Target: p.Target(),
}, func(ctx context.Context) (innerErr error) {
resp, innerErr = p.CollectMetrics(ctx, req)
return
})
resp, err := p.CollectMetrics(ctx, req)
if err != nil {
return nil, plugins.ErrPluginDownstreamErrorBase.Errorf("client: failed to collect metrics: %w", err)
}
@ -171,14 +145,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
return nil, plugins.ErrPluginNotRegistered
}
var resp *backend.CheckHealthResult
err := instrumentation.InstrumentCheckHealthRequest(ctx, &req.PluginContext, instrumentation.Cfg{
Target: p.Target(),
}, func(ctx context.Context) (innerErr error) {
resp, innerErr = p.CheckHealth(ctx, req)
return
})
resp, err := p.CheckHealth(ctx, req)
if err != nil {
if errors.Is(err, plugins.ErrMethodNotImplemented) {
return nil, err

View File

@ -0,0 +1,214 @@
package clientmiddleware
import (
"context"
"errors"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/manager/registry"
)
// pluginMetrics contains the prometheus metrics used by the InstrumentationMiddleware.
type pluginMetrics struct {
pluginRequestCounter *prometheus.CounterVec
pluginRequestDuration *prometheus.HistogramVec
pluginRequestSize *prometheus.HistogramVec
pluginRequestDurationSeconds *prometheus.HistogramVec
}
// InstrumentationMiddleware is a middleware that instruments plugin requests.
// It tracks requests count, duration and size as prometheus metrics.
// It also enriches the [context.Context] with a contextual logger containing plugin and request details.
// For those reasons, this middleware should live at the top of the middleware stack.
type InstrumentationMiddleware struct {
pluginMetrics
pluginRegistry registry.Service
next plugins.Client
}
func newInstrumentationMiddleware(promRegisterer prometheus.Registerer, pluginRegistry registry.Service) *InstrumentationMiddleware {
pluginRequestCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "grafana",
Name: "plugin_request_total",
Help: "The total amount of plugin requests",
}, []string{"plugin_id", "endpoint", "status", "target"})
pluginRequestDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_duration_milliseconds",
Help: "Plugin request duration",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100},
}, []string{"plugin_id", "endpoint", "target"})
pluginRequestSize := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_size_bytes",
Help: "histogram of plugin request sizes returned",
Buckets: []float64{128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576},
}, []string{"source", "plugin_id", "endpoint", "target"},
)
pluginRequestDurationSeconds := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "grafana",
Name: "plugin_request_duration_seconds",
Help: "Plugin request duration in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25},
}, []string{"source", "plugin_id", "endpoint", "status", "target"})
promRegisterer.MustRegister(
pluginRequestCounter,
pluginRequestDuration,
pluginRequestSize,
pluginRequestDurationSeconds,
)
return &InstrumentationMiddleware{
pluginMetrics: pluginMetrics{
pluginRequestCounter: pluginRequestCounter,
pluginRequestDuration: pluginRequestDuration,
pluginRequestSize: pluginRequestSize,
pluginRequestDurationSeconds: pluginRequestDurationSeconds,
},
pluginRegistry: pluginRegistry,
}
}
// NewInstrumentationMiddleware returns a new InstrumentationMiddleware.
func NewInstrumentationMiddleware(promRegisterer prometheus.Registerer, pluginRegistry registry.Service) plugins.ClientMiddleware {
imw := newInstrumentationMiddleware(promRegisterer, pluginRegistry)
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
imw.next = next
return imw
})
}
// pluginTarget returns the value for the "target" Prometheus label for the given plugin ID.
func (m *InstrumentationMiddleware) pluginTarget(ctx context.Context, pluginID string) (string, error) {
p, exists := m.pluginRegistry.Plugin(ctx, pluginID)
if !exists {
return "", plugins.ErrPluginNotRegistered
}
return string(p.Target()), nil
}
// instrumentContext adds a contextual logger with plugin and request details to the given context.
func instrumentContext(ctx context.Context, endpoint string, pCtx backend.PluginContext) context.Context {
p := []any{"endpoint", endpoint, "pluginId", pCtx.PluginID}
if pCtx.DataSourceInstanceSettings != nil {
p = append(p, "dsName", pCtx.DataSourceInstanceSettings.Name)
p = append(p, "dsUID", pCtx.DataSourceInstanceSettings.UID)
}
if pCtx.User != nil {
p = append(p, "uname", pCtx.User.Login)
}
return log.WithContextualAttributes(ctx, p)
}
// instrumentPluginRequestSize tracks the size of the given request in the m.pluginRequestSize metric.
func (m *InstrumentationMiddleware) instrumentPluginRequestSize(ctx context.Context, pluginCtx backend.PluginContext, endpoint string, requestSize float64) error {
target, err := m.pluginTarget(ctx, pluginCtx.PluginID)
if err != nil {
return err
}
m.pluginRequestSize.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, target).Observe(requestSize)
return nil
}
// instrumentPluginRequest increments the m.pluginRequestCounter metric and tracks the duration of the given request.
func (m *InstrumentationMiddleware) instrumentPluginRequest(ctx context.Context, pluginCtx backend.PluginContext, endpoint string, fn func(context.Context) error) error {
target, err := m.pluginTarget(ctx, pluginCtx.PluginID)
if err != nil {
return err
}
status := statusOK
start := time.Now()
ctx = instrumentContext(ctx, endpoint, pluginCtx)
err = fn(ctx)
if err != nil {
status = statusError
if errors.Is(err, context.Canceled) {
status = statusCancelled
}
}
elapsed := time.Since(start)
pluginRequestDurationWithLabels := m.pluginRequestDuration.WithLabelValues(pluginCtx.PluginID, endpoint, target)
pluginRequestCounterWithLabels := m.pluginRequestCounter.WithLabelValues(pluginCtx.PluginID, endpoint, status, target)
pluginRequestDurationSecondsWithLabels := m.pluginRequestDurationSeconds.WithLabelValues("grafana-backend", pluginCtx.PluginID, endpoint, status, target)
if traceID := tracing.TraceIDFromContext(ctx, true); traceID != "" {
pluginRequestDurationWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar(
float64(elapsed/time.Millisecond), prometheus.Labels{"traceID": traceID},
)
pluginRequestCounterWithLabels.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{"traceID": traceID})
pluginRequestDurationSecondsWithLabels.(prometheus.ExemplarObserver).ObserveWithExemplar(
elapsed.Seconds(), prometheus.Labels{"traceID": traceID},
)
} else {
pluginRequestDurationWithLabels.Observe(float64(elapsed / time.Millisecond))
pluginRequestCounterWithLabels.Inc()
pluginRequestDurationSecondsWithLabels.Observe(elapsed.Seconds())
}
return err
}
func (m *InstrumentationMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
var requestSize float64
for _, v := range req.Queries {
requestSize += float64(len(v.JSON))
}
if err := m.instrumentPluginRequestSize(ctx, req.PluginContext, endpointQueryData, requestSize); err != nil {
return nil, err
}
var resp *backend.QueryDataResponse
err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointQueryData, func(ctx context.Context) (innerErr error) {
resp, innerErr = m.next.QueryData(ctx, req)
return innerErr
})
return resp, err
}
func (m *InstrumentationMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
if err := m.instrumentPluginRequestSize(ctx, req.PluginContext, endpointCallResource, float64(len(req.Body))); err != nil {
return err
}
return m.instrumentPluginRequest(ctx, req.PluginContext, endpointCallResource, func(ctx context.Context) error {
return m.next.CallResource(ctx, req, sender)
})
}
func (m *InstrumentationMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
var result *backend.CheckHealthResult
err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointCheckHealth, func(ctx context.Context) (innerErr error) {
result, innerErr = m.next.CheckHealth(ctx, req)
return
})
return result, err
}
func (m *InstrumentationMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
var result *backend.CollectMetricsResult
err := m.instrumentPluginRequest(ctx, req.PluginContext, endpointCollectMetrics, func(ctx context.Context) (innerErr error) {
result, innerErr = m.next.CollectMetrics(ctx, req)
return
})
return result, err
}
func (m *InstrumentationMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
return m.next.SubscribeStream(ctx, req)
}
func (m *InstrumentationMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return m.next.PublishStream(ctx, req)
}
func (m *InstrumentationMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
return m.next.RunStream(ctx, req, sender)
}

View File

@ -0,0 +1,164 @@
package clientmiddleware
import (
"context"
"errors"
"fmt"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/manager/client/clienttest"
"github.com/grafana/grafana/pkg/plugins/manager/fakes"
)
func TestInstrumentationMiddleware(t *testing.T) {
const (
pluginID = "plugin-id"
metricRequestTotal = "grafana_plugin_request_total"
metricRequestDurationMs = "grafana_plugin_request_duration_milliseconds"
metricRequestDurationS = "grafana_plugin_request_duration_seconds"
metricRequestSize = "grafana_plugin_request_size_bytes"
)
pCtx := backend.PluginContext{PluginID: pluginID}
t.Run("should instrument requests", func(t *testing.T) {
for _, tc := range []struct {
expEndpoint string
fn func(cdt *clienttest.ClientDecoratorTest) error
shouldInstrumentRequestSize bool
}{
{
expEndpoint: endpointCheckHealth,
fn: func(cdt *clienttest.ClientDecoratorTest) error {
_, err := cdt.Decorator.CheckHealth(context.Background(), &backend.CheckHealthRequest{PluginContext: pCtx})
return err
},
shouldInstrumentRequestSize: false,
},
{
expEndpoint: endpointCallResource,
fn: func(cdt *clienttest.ClientDecoratorTest) error {
return cdt.Decorator.CallResource(context.Background(), &backend.CallResourceRequest{PluginContext: pCtx}, nopCallResourceSender)
},
shouldInstrumentRequestSize: true,
},
{
expEndpoint: endpointQueryData,
fn: func(cdt *clienttest.ClientDecoratorTest) error {
_, err := cdt.Decorator.QueryData(context.Background(), &backend.QueryDataRequest{PluginContext: pCtx})
return err
},
shouldInstrumentRequestSize: true,
},
{
expEndpoint: endpointCollectMetrics,
fn: func(cdt *clienttest.ClientDecoratorTest) error {
_, err := cdt.Decorator.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{PluginContext: pCtx})
return err
},
shouldInstrumentRequestSize: false,
},
} {
t.Run(tc.expEndpoint, func(t *testing.T) {
promRegistry := prometheus.NewRegistry()
pluginsRegistry := fakes.NewFakePluginRegistry()
require.NoError(t, pluginsRegistry.Add(context.Background(), &plugins.Plugin{
JSONData: plugins.JSONData{ID: pluginID, Backend: true},
}))
mw := newInstrumentationMiddleware(promRegistry, pluginsRegistry)
cdt := clienttest.NewClientDecoratorTest(t, clienttest.WithMiddlewares(
plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
mw.next = next
return mw
}),
))
require.NoError(t, tc.fn(cdt))
// Ensure the correct metrics have been incremented/observed
require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestTotal))
require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestDurationMs))
require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestDurationS))
counter := mw.pluginMetrics.pluginRequestCounter.WithLabelValues(pluginID, tc.expEndpoint, statusOK, string(backendplugin.TargetUnknown))
require.Equal(t, 1.0, testutil.ToFloat64(counter))
for _, m := range []string{metricRequestDurationMs, metricRequestDurationS} {
require.NoError(t, checkHistogram(promRegistry, m, map[string]string{
"plugin_id": pluginID,
"endpoint": tc.expEndpoint,
"target": string(backendplugin.TargetUnknown),
}))
}
if tc.shouldInstrumentRequestSize {
require.Equal(t, 1, testutil.CollectAndCount(promRegistry, metricRequestSize), "request size should have been instrumented")
require.NoError(t, checkHistogram(promRegistry, metricRequestSize, map[string]string{
"plugin_id": pluginID,
"endpoint": tc.expEndpoint,
"target": string(backendplugin.TargetUnknown),
"source": "grafana-backend",
}), "request size should have been instrumented")
}
})
}
})
}
// checkHistogram is a utility function that checks if a histogram with the given name and label values exists
// and has been observed at least once.
func checkHistogram(promRegistry *prometheus.Registry, expMetricName string, expLabels map[string]string) error {
metrics, err := promRegistry.Gather()
if err != nil {
return fmt.Errorf("gather: %w", err)
}
var metricFamily *dto.MetricFamily
for _, mf := range metrics {
if *mf.Name == expMetricName {
metricFamily = mf
break
}
}
if metricFamily == nil {
return fmt.Errorf("metric %q not found", expMetricName)
}
var foundLabels int
var metric *dto.Metric
for _, m := range metricFamily.Metric {
for _, l := range m.GetLabel() {
v, ok := expLabels[*l.Name]
if !ok {
continue
}
if v != *l.Value {
return fmt.Errorf("expected label %q to have value %q, got %q", *l.Name, v, *l.Value)
}
foundLabels++
}
if foundLabels == 0 {
continue
}
if foundLabels != len(expLabels) {
return fmt.Errorf("expected %d labels, got %d", len(expLabels), foundLabels)
}
metric = m
break
}
if metric == nil {
return fmt.Errorf("could not find metric with labels %v", expLabels)
}
if metric.Histogram == nil {
return fmt.Errorf("metric %q is not a histogram", expMetricName)
}
if metric.Histogram.SampleCount == nil || *metric.Histogram.SampleCount == 0 {
return errors.New("found metric but no samples have been collected")
}
return nil
}

View File

@ -13,17 +13,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
)
const (
statusOK = "ok"
statusError = "error"
statusCancelled = "cancelled"
endpointCallResource = "callResource"
endpointCheckHealth = "checkHealth"
endpointCollectMetrics = "collectMetrics"
endpointQueryData = "queryData"
)
// NewLoggerMiddleware creates a new plugins.ClientMiddleware that will
// log requests.
func NewLoggerMiddleware(cfg *setting.Cfg, logger plog.Logger) plugins.ClientMiddleware {

View File

@ -4,6 +4,17 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
const (
statusOK = "ok"
statusError = "error"
statusCancelled = "cancelled"
endpointCallResource = "callResource"
endpointCheckHealth = "checkHealth"
endpointCollectMetrics = "collectMetrics"
endpointQueryData = "queryData"
)
type callResourceResponseSenderFunc func(res *backend.CallResourceResponse) error
func (fn callResourceResponseSenderFunc) Send(res *backend.CallResourceResponse) error {

View File

@ -2,6 +2,7 @@ package pluginsintegration
import (
"github.com/google/wire"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
@ -135,25 +136,27 @@ func ProvideClientDecorator(
tracer tracing.Tracer,
cachingService caching.CachingService,
features *featuremgmt.FeatureManager,
promRegisterer prometheus.Registerer,
) (*client.Decorator, error) {
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer, cachingService, features)
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer, cachingService, features, promRegisterer, pluginRegistry)
}
func NewClientDecorator(
cfg *setting.Cfg, pCfg *pCfg.Cfg,
pluginRegistry registry.Service, oAuthTokenService oauthtoken.OAuthTokenService,
tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager,
promRegisterer prometheus.Registerer, registry registry.Service,
) (*client.Decorator, error) {
c := client.ProvideService(pluginRegistry, pCfg)
middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer, cachingService, features)
middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer, cachingService, features, promRegisterer, registry)
return client.NewDecorator(c, middlewares...)
}
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager) []plugins.ClientMiddleware {
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer, cachingService caching.CachingService, features *featuremgmt.FeatureManager, promRegisterer prometheus.Registerer, registry registry.Service) []plugins.ClientMiddleware {
skipCookiesNames := []string{cfg.LoginCookieName}
middlewares := []plugins.ClientMiddleware{
clientmiddleware.NewTracingMiddleware(tracer),
clientmiddleware.NewInstrumentationMiddleware(promRegisterer, registry),
clientmiddleware.NewLoggerMiddleware(cfg, log.New("plugin.instrumentation")),
clientmiddleware.NewTracingHeaderMiddleware(),
clientmiddleware.NewClearAuthHeadersMiddleware(),