mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 21:12:37 +08:00
SQL Expression: Add instrumentation for sql expressions (#103758)
This commit is contained in:
@ -10,6 +10,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
@ -69,7 +70,7 @@ func (cmd *ConditionsCmd) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
ctx, span := tracer.Start(ctx, "SSE.ExecuteClassicConditions")
|
||||
defer span.End()
|
||||
// isFiring and isNoData contains the outcome of ConditionsCmd, and is derived from the
|
||||
|
@ -639,7 +639,7 @@ func TestConditionsCmd(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars, tracing.InitializeTracerForTest())
|
||||
res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expected(), res)
|
||||
})
|
||||
|
@ -12,13 +12,15 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// Command is an interface for all expression commands.
|
||||
type Command interface {
|
||||
NeedsVars() []string
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error)
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error)
|
||||
Type() string
|
||||
}
|
||||
|
||||
@ -69,7 +71,7 @@ func (gm *MathCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gm *MathCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (gm *MathCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteMath")
|
||||
span.SetAttributes(attribute.String("expression", gm.RawExpression))
|
||||
defer span.End()
|
||||
@ -165,7 +167,7 @@ func (gr *ReduceCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ReduceCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (gr *ReduceCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteReduce")
|
||||
defer span.End()
|
||||
|
||||
@ -295,7 +297,7 @@ func (gr *ResampleCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ResampleCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (gr *ResampleCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteResample")
|
||||
defer span.End()
|
||||
newRes := mathexp.Results{}
|
||||
|
@ -119,7 +119,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest())
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, execute.Values, len(numbers))
|
||||
@ -163,7 +163,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
t.Run("drop all non numbers if mapper is DropNonNumber", func(t *testing.T) {
|
||||
cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, &mathexp.DropNonNumber{})
|
||||
require.NoError(t, err)
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest())
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, execute.Values, 2)
|
||||
})
|
||||
@ -171,7 +171,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
t.Run("replace all non numbers if mapper is ReplaceNonNumberWithValue", func(t *testing.T) {
|
||||
cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, &mathexp.ReplaceNonNumberWithValue{Value: 1})
|
||||
require.NoError(t, err)
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest())
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, execute.Values, len(numbers))
|
||||
for _, value := range execute.Values[1 : len(numbers)-1] {
|
||||
@ -194,7 +194,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
}
|
||||
cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, nil)
|
||||
require.NoError(t, err)
|
||||
results, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest())
|
||||
results, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, results.Values, 1)
|
||||
@ -253,7 +253,7 @@ func TestResampleCommand_Execute(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
varToReduce: mathexp.Results{Values: mathexp.Values{test.vals}},
|
||||
}, tracing.InitializeTracerForTest())
|
||||
}, tracing.InitializeTracerForTest(), nil)
|
||||
if test.isError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
@ -268,7 +268,7 @@ func TestResampleCommand_Execute(t *testing.T) {
|
||||
t.Run("should return empty result if input is nil Value", func(t *testing.T) {
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
varToReduce: mathexp.Results{Values: mathexp.Values{nil}},
|
||||
}, tracing.InitializeTracerForTest())
|
||||
}, tracing.InitializeTracerForTest(), nil)
|
||||
require.Empty(t, result.Values)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -21,7 +22,7 @@ func TestConvertDataFramesToResults(t *testing.T) {
|
||||
cfg: setting.NewCfg(),
|
||||
features: featuremgmt.WithFeatures(),
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
metrics: newMetrics(nil),
|
||||
metrics: metrics.NewSSEMetrics(nil),
|
||||
}
|
||||
converter := &ResultConverter{Features: s.features, Tracer: s.tracer}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
@ -64,7 +65,7 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er
|
||||
&datafakes.FakeCacheService{}, &datafakes.FakeDataSourceService{},
|
||||
nil, pluginconfig.NewFakePluginRequestConfigProvider()),
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
metrics: newMetrics(nil),
|
||||
metrics: metrics.NewSSEMetrics(nil),
|
||||
converter: &ResultConverter{
|
||||
Features: features,
|
||||
Tracer: tracing.InitializeTracerForTest(),
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
@ -32,7 +33,7 @@ func (h *HysteresisCommand) NeedsVars() []string {
|
||||
return []string{h.ReferenceVar}
|
||||
}
|
||||
|
||||
func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
results := vars[h.ReferenceVar]
|
||||
|
||||
logger := logger.FromContext(ctx)
|
||||
@ -46,7 +47,7 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat
|
||||
return mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil
|
||||
}
|
||||
if len(h.LoadedDimensions) == 0 {
|
||||
return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer)
|
||||
return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics)
|
||||
}
|
||||
var loadedVals, unloadedVals mathexp.Values
|
||||
for _, value := range results.Values {
|
||||
@ -62,10 +63,10 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat
|
||||
|
||||
logger.Debug("Evaluating thresholds", "unloadingThresholdDimensions", len(loadedVals), "loadingThresholdDimensions", len(unloadedVals))
|
||||
if len(loadedVals) == 0 { // if all values are unloaded
|
||||
return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer)
|
||||
return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics)
|
||||
}
|
||||
if len(unloadedVals) == 0 { // if all values are loaded
|
||||
return h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer)
|
||||
return h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@ -74,12 +75,12 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat
|
||||
}()
|
||||
|
||||
vars[h.ReferenceVar] = mathexp.Results{Values: unloadedVals}
|
||||
loadingResults, err := h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer)
|
||||
loadingResults, err := h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, fmt.Errorf("failed to execute loading threshold: %w", err)
|
||||
}
|
||||
vars[h.ReferenceVar] = mathexp.Results{Values: loadedVals}
|
||||
unloadingResults, err := h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer)
|
||||
unloadingResults, err := h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, fmt.Errorf("failed to execute unloading threshold: %w", err)
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func TestHysteresisExecute(t *testing.T) {
|
||||
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
"A": mathexp.Results{Values: tc.input},
|
||||
}, tracer)
|
||||
}, tracer, nil)
|
||||
if tc.expectedError != nil {
|
||||
require.ErrorIs(t, err, tc.expectedError)
|
||||
return
|
||||
|
@ -1,47 +0,0 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
metricsSubSystem = "sse"
|
||||
metricsNamespace = "grafana"
|
||||
)
|
||||
|
||||
type metrics struct {
|
||||
dsRequests *prometheus.CounterVec
|
||||
|
||||
// older metric
|
||||
expressionsQuerySummary *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
func newMetrics(reg prometheus.Registerer) *metrics {
|
||||
m := &metrics{
|
||||
dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Subsystem: metricsSubSystem,
|
||||
Name: "ds_queries_total",
|
||||
Help: "Number of datasource queries made via server side expression requests",
|
||||
}, []string{"error", "dataplane", "datasource_type"}),
|
||||
|
||||
// older (No Namespace or Subsystem)
|
||||
expressionsQuerySummary: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "expressions_queries_duration_milliseconds",
|
||||
Help: "Expressions query summary",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{"status"},
|
||||
),
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.dsRequests,
|
||||
m.expressionsQuerySummary,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
125
pkg/expr/metrics/metrics.go
Normal file
125
pkg/expr/metrics/metrics.go
Normal file
@ -0,0 +1,125 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// ExprMetrics is a struct that contains all the metrics for an implementation of the expressions service
|
||||
// shared between multiple versions of expressions service, which are delineated by the subsystem string
|
||||
type ExprMetrics struct {
|
||||
DSRequests *prometheus.CounterVec
|
||||
ExpressionsQuerySummary *prometheus.SummaryVec
|
||||
SqlCommandDuration *prometheus.HistogramVec
|
||||
SqlCommandErrorCount *prometheus.CounterVec
|
||||
SqlCommandCellCount *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
func newExprMetrics(subsystem string) *ExprMetrics {
|
||||
return &ExprMetrics{
|
||||
DSRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "grafana",
|
||||
Subsystem: subsystem,
|
||||
Name: "ds_queries_total",
|
||||
Help: "Number of datasource queries made via server side expression requests",
|
||||
}, []string{"error", "dataplane", "datasource_type"}),
|
||||
|
||||
ExpressionsQuerySummary: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "grafana",
|
||||
Subsystem: subsystem,
|
||||
Name: "expressions_queries_duration_milliseconds",
|
||||
Help: "Expressions query summary",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{"status"},
|
||||
),
|
||||
|
||||
SqlCommandDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "grafana",
|
||||
Subsystem: subsystem,
|
||||
Name: "sql_command_duration_seconds",
|
||||
Help: "Duration of SQL command execution",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
}, []string{"status"}),
|
||||
|
||||
SqlCommandErrorCount: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "grafana",
|
||||
Subsystem: subsystem,
|
||||
Name: "sql_command_errors_total",
|
||||
Help: "Total number of SQL command execution errors",
|
||||
}, []string{}),
|
||||
|
||||
SqlCommandCellCount: prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "grafana",
|
||||
Subsystem: subsystem,
|
||||
Name: "sql_command_cell_count",
|
||||
Help: "Distribution of the total number of cells in each SQL command execution",
|
||||
Buckets: prometheus.ExponentialBuckets(100, 2, 10),
|
||||
},
|
||||
[]string{"status"},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// NewSSEMetrics creates a new ExprMetrics struct for the ST implementation of the expressions service
|
||||
func NewSSEMetrics(reg prometheus.Registerer) *ExprMetrics {
|
||||
metricsSubSystem := "sse"
|
||||
|
||||
m := &ExprMetrics{
|
||||
DSRequests: newExprMetrics(metricsSubSystem).DSRequests,
|
||||
|
||||
ExpressionsQuerySummary: newExprMetrics(metricsSubSystem).ExpressionsQuerySummary,
|
||||
|
||||
SqlCommandDuration: newExprMetrics(metricsSubSystem).SqlCommandDuration,
|
||||
|
||||
SqlCommandErrorCount: newExprMetrics(metricsSubSystem).SqlCommandErrorCount,
|
||||
|
||||
SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount,
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.DSRequests,
|
||||
m.ExpressionsQuerySummary,
|
||||
m.SqlCommandDuration,
|
||||
m.SqlCommandErrorCount,
|
||||
m.SqlCommandCellCount,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// NewQueryServiceExpressionsMetrics creates a new ExprMetrics struct for the query service implementation of the expressions service
|
||||
func NewQueryServiceExpressionsMetrics(reg prometheus.Registerer) *ExprMetrics {
|
||||
metricsSubSystem := "queryservice"
|
||||
|
||||
m := &ExprMetrics{
|
||||
DSRequests: newExprMetrics(metricsSubSystem).DSRequests,
|
||||
|
||||
ExpressionsQuerySummary: newExprMetrics(metricsSubSystem).ExpressionsQuerySummary,
|
||||
|
||||
SqlCommandDuration: newExprMetrics(metricsSubSystem).SqlCommandDuration,
|
||||
|
||||
SqlCommandErrorCount: newExprMetrics(metricsSubSystem).SqlCommandErrorCount,
|
||||
|
||||
SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount,
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.DSRequests,
|
||||
m.ExpressionsQuerySummary,
|
||||
m.SqlCommandDuration,
|
||||
m.SqlCommandErrorCount,
|
||||
m.SqlCommandCellCount,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func NewTestMetrics() *ExprMetrics {
|
||||
return newExprMetrics("test")
|
||||
}
|
@ -86,7 +86,7 @@ func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *
|
||||
}
|
||||
logger.Debug("Data source queried", "responseType", responseType)
|
||||
useDataplane := strings.HasPrefix("dataplane-", responseType)
|
||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc()
|
||||
s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc()
|
||||
}()
|
||||
|
||||
// Execute the command and provide callback function for sending a request via plugin API.
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/expr/ml"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
@ -61,7 +62,7 @@ func TestMLNodeExecute(t *testing.T) {
|
||||
features: nil,
|
||||
pluginsClient: pluginsClient,
|
||||
tracer: nil,
|
||||
metrics: newMetrics(nil),
|
||||
metrics: metrics.NewSSEMetrics(nil),
|
||||
}
|
||||
|
||||
cmdResponse := data.NewFrame("test",
|
||||
@ -198,7 +199,7 @@ func TestMLNodeExecute(t *testing.T) {
|
||||
features: nil,
|
||||
pluginsClient: pluginsClient,
|
||||
tracer: nil,
|
||||
metrics: newMetrics(nil),
|
||||
metrics: metrics.NewSSEMetrics(nil),
|
||||
}
|
||||
|
||||
cmd := &ml.FakeCommand{
|
||||
|
@ -103,7 +103,7 @@ func (gn *CMDNode) NeedsVars() []string {
|
||||
// other nodes they must have already been executed and their results must
|
||||
// already by in vars.
|
||||
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
||||
return gn.Command.Execute(ctx, now, vars, s.tracer)
|
||||
return gn.Command.Execute(ctx, now, vars, s.tracer, s.metrics)
|
||||
}
|
||||
|
||||
func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles, sqlExpressionCellLimit int64) (*CMDNode, error) {
|
||||
@ -320,7 +320,7 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars
|
||||
}
|
||||
logger.Debug("Data source queried", "responseType", responseType)
|
||||
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
|
||||
s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
|
||||
}
|
||||
|
||||
resp, err := s.dataService.QueryData(ctx, req)
|
||||
@ -395,7 +395,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
}
|
||||
logger.Debug("Data source queried", "responseType", responseType)
|
||||
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
||||
s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
||||
}()
|
||||
|
||||
resp, err := s.dataService.QueryData(ctx, req)
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
@ -65,7 +66,7 @@ type Service struct {
|
||||
pluginsClient backend.CallResourceHandler
|
||||
|
||||
tracer tracing.Tracer
|
||||
metrics *metrics
|
||||
metrics *metrics.ExprMetrics
|
||||
}
|
||||
|
||||
type pluginContextProvider interface {
|
||||
@ -81,7 +82,7 @@ func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, pCtxProvider
|
||||
pCtxProvider: pCtxProvider,
|
||||
features: features,
|
||||
tracer: tracer,
|
||||
metrics: newMetrics(registerer),
|
||||
metrics: metrics.NewSSEMetrics(registerer),
|
||||
pluginsClient: pluginClient,
|
||||
converter: &ResultConverter{
|
||||
Features: features,
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
@ -249,7 +250,7 @@ func newMockQueryService(responses map[string]backend.DataResponse, queries []Qu
|
||||
pCtxProvider: pCtxProvider,
|
||||
features: featuremgmt.WithFeatures(),
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
metrics: newMetrics(nil),
|
||||
metrics: metrics.NewSSEMetrics(nil),
|
||||
converter: &ResultConverter{
|
||||
Features: features,
|
||||
Tracer: tracing.InitializeTracerForTest(),
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/expr/sql"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
@ -99,9 +100,22 @@ func (gr *SQLCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathExprResult mathexp.Results, resultError error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteSQL")
|
||||
defer span.End()
|
||||
start := time.Now()
|
||||
tc := int64(0)
|
||||
|
||||
defer func() {
|
||||
span.End()
|
||||
statusLabel := "ok"
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
if resultError != nil {
|
||||
statusLabel = "error"
|
||||
metrics.SqlCommandErrorCount.WithLabelValues().Inc()
|
||||
}
|
||||
metrics.SqlCommandDuration.WithLabelValues(statusLabel).Observe(duration)
|
||||
metrics.SqlCommandCellCount.WithLabelValues(statusLabel).Observe(float64(tc))
|
||||
}()
|
||||
|
||||
allFrames := []*data.Frame{}
|
||||
for _, ref := range gr.varsToQuery {
|
||||
@ -114,14 +128,15 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V
|
||||
allFrames = append(allFrames, frames...)
|
||||
}
|
||||
|
||||
totalCells := totalCells(allFrames)
|
||||
tc = totalCells(allFrames)
|
||||
|
||||
// limit of 0 or less means no limit (following convention)
|
||||
if gr.limit > 0 && totalCells > gr.limit {
|
||||
if gr.limit > 0 && tc > gr.limit {
|
||||
return mathexp.Results{},
|
||||
fmt.Errorf(
|
||||
"SQL expression: total cell count across all input tables exceeds limit of %d. Total cells: %d",
|
||||
gr.limit,
|
||||
totalCells,
|
||||
tc,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
@ -134,7 +136,7 @@ func TestSQLCommandCellLimits(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
_, err = cmd.Execute(context.Background(), time.Now(), vars, &testTracer{})
|
||||
_, err = cmd.Execute(context.Background(), time.Now(), vars, &testTracer{}, metrics.NewTestMetrics())
|
||||
|
||||
if tt.expectError {
|
||||
require.Error(t, err)
|
||||
@ -146,6 +148,28 @@ func TestSQLCommandCellLimits(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSQLCommandMetrics(t *testing.T) {
|
||||
// Create test metrics
|
||||
m := metrics.NewTestMetrics()
|
||||
|
||||
// Create a command
|
||||
cmd, err := NewSQLCommand("A", "someformat", "select * from foo", 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Execute successful command
|
||||
_, err = cmd.Execute(context.Background(), time.Now(), mathexp.Vars{}, &testTracer{}, m)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify error count was not incremented
|
||||
require.Equal(t, 0, testutil.CollectAndCount(m.SqlCommandErrorCount), "Expected error metric not to be recorded")
|
||||
|
||||
// Verify duration was recorded
|
||||
require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandDuration), "Expected duration metric to be recorded")
|
||||
|
||||
// Verify cell count was recorded
|
||||
require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandCellCount), "Expected cell count metric to be recorded")
|
||||
}
|
||||
|
||||
type testTracer struct {
|
||||
trace.Tracer
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
@ -174,7 +175,7 @@ func (tc *ThresholdCommand) NeedsVars() []string {
|
||||
return []string{tc.ReferenceVar}
|
||||
}
|
||||
|
||||
func (tc *ThresholdCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars, _ tracing.Tracer) (mathexp.Results, error) {
|
||||
func (tc *ThresholdCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars, _ tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
eval := func(maybeValue *float64) *float64 {
|
||||
if maybeValue == nil {
|
||||
return nil
|
||||
|
@ -33,7 +33,7 @@ func BenchmarkThreshold(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace)
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace, nil)
|
||||
}
|
||||
})
|
||||
b.Run("less than", func(b *testing.B) {
|
||||
@ -43,7 +43,7 @@ func BenchmarkThreshold(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace)
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace, nil)
|
||||
}
|
||||
})
|
||||
b.Run("within range", func(b *testing.B) {
|
||||
@ -53,7 +53,7 @@ func BenchmarkThreshold(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace)
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace, nil)
|
||||
}
|
||||
})
|
||||
b.Run("within range, no labels", func(b *testing.B) {
|
||||
@ -72,7 +72,7 @@ func BenchmarkThreshold(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace)
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace, nil)
|
||||
}
|
||||
})
|
||||
b.Run("outside range", func(b *testing.B) {
|
||||
@ -82,7 +82,7 @@ func BenchmarkThreshold(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace)
|
||||
_, _ = greater.Execute(ctx, timeNow, vars, trace, nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ func TestThresholdExecute(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
"A": newResults(input[name]),
|
||||
}, tracing.InitializeTracerForTest())
|
||||
}, tracing.InitializeTracerForTest(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, newResults(tc.expected[name]), result)
|
||||
})
|
||||
|
@ -81,7 +81,7 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request
|
||||
respStatus = "failure"
|
||||
}
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
s.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
s.metrics.ExpressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
|
||||
span.End()
|
||||
}()
|
||||
|
@ -1,48 +0,0 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
metricsSubSystem = "queryservice"
|
||||
metricsNamespace = "grafana"
|
||||
)
|
||||
|
||||
type queryMetrics struct {
|
||||
dsRequests *prometheus.CounterVec
|
||||
|
||||
// older metric
|
||||
expressionsQuerySummary *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
func newQueryMetrics(reg prometheus.Registerer) *queryMetrics {
|
||||
m := &queryMetrics{
|
||||
dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Subsystem: metricsSubSystem,
|
||||
Name: "ds_queries_total",
|
||||
Help: "Number of datasource queries made from the query service",
|
||||
}, []string{"error", "dataplane", "datasource_type"}),
|
||||
|
||||
expressionsQuerySummary: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Subsystem: metricsSubSystem,
|
||||
Name: "expressions_queries_duration_milliseconds",
|
||||
Help: "Expressions query summary",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{"status"},
|
||||
),
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.dsRequests,
|
||||
m.expressionsQuerySummary,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
@ -360,7 +360,7 @@ func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedReque
|
||||
respStatus = "failure"
|
||||
}
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
b.metrics.ExpressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
|
||||
span.End()
|
||||
}()
|
||||
@ -403,7 +403,7 @@ func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedReque
|
||||
}
|
||||
|
||||
refId := expression.RefID
|
||||
results, err := expression.Command.Execute(ctx, now, vars, b.tracer)
|
||||
results, err := expression.Command.Execute(ctx, now, vars, b.tracer, b.metrics)
|
||||
if err != nil {
|
||||
expressionsLogger.Error("error executing expression", "error", err)
|
||||
results.Error = err
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
claims "github.com/grafana/authlib/types"
|
||||
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
@ -43,7 +44,7 @@ type QueryAPIBuilder struct {
|
||||
authorizer authorizer.Authorizer
|
||||
|
||||
tracer tracing.Tracer
|
||||
metrics *queryMetrics
|
||||
metrics *metrics.ExprMetrics
|
||||
parser *queryParser
|
||||
client clientapi.DataSourceClientSupplier
|
||||
registry query.DataSourceApiServerRegistry
|
||||
@ -83,7 +84,7 @@ func NewQueryAPIBuilder(features featuremgmt.FeatureToggles,
|
||||
authorizer: ar,
|
||||
registry: registry,
|
||||
parser: newQueryParser(reader, legacy, tracer, log.New("query_parser")),
|
||||
metrics: newQueryMetrics(registerer),
|
||||
metrics: metrics.NewQueryServiceExpressionsMetrics(registerer),
|
||||
tracer: tracer,
|
||||
features: features,
|
||||
queryTypes: queryTypes,
|
||||
|
Reference in New Issue
Block a user