mirror of
https://github.com/grafana/grafana.git
synced 2025-08-06 04:19:26 +08:00
Chore: Expression engine to support relative time range (#57474)
* make TimeRange interface and add relative range * make Execute methods support the current time * update resample to support relative time range * update DSNode to support relative time range * update query service to create queries with absolute time * make alerting evaluator create relative time ranges
This commit is contained in:
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
@ -67,7 +68,7 @@ func (cmd *ConditionsCmd) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (cmd *ConditionsCmd) Execute(_ context.Context, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (cmd *ConditionsCmd) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
firing := true
|
||||
newRes := mathexp.Results{}
|
||||
noDataFound := true
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -405,7 +406,7 @@ func TestConditionsCmdExecute(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
res, err := tt.conditionsCmd.Execute(context.Background(), tt.vars)
|
||||
res, err := tt.conditionsCmd.Execute(context.Background(), time.Now(), tt.vars)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(res.Values))
|
||||
|
@ -16,7 +16,7 @@ import (
|
||||
// Command is an interface for all expression commands.
|
||||
type Command interface {
|
||||
NeedsVars() []string
|
||||
Execute(c context.Context, vars mathexp.Vars) (mathexp.Results, error)
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error)
|
||||
}
|
||||
|
||||
// MathCommand is a command for a math expression such as "1 + $GA / 2"
|
||||
@ -66,7 +66,7 @@ func (gm *MathCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gm *MathCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (gm *MathCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
return gm.Expression.Execute(gm.refID, vars)
|
||||
}
|
||||
|
||||
@ -154,7 +154,7 @@ func (gr *ReduceCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ReduceCommand) Execute(_ context.Context, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (gr *ReduceCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
newRes := mathexp.Results{}
|
||||
for _, val := range vars[gr.VarToReduce].Values {
|
||||
switch v := val.(type) {
|
||||
@ -210,6 +210,9 @@ func NewResampleCommand(refID, rawWindow, varToResample string, downsampler stri
|
||||
|
||||
// UnmarshalResampleCommand creates a ResampleCMD from Grafana's frontend query.
|
||||
func UnmarshalResampleCommand(rn *rawNode) (*ResampleCommand, error) {
|
||||
if rn.TimeRange == nil {
|
||||
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
|
||||
}
|
||||
rawVar, ok := rn.Query["expression"]
|
||||
if !ok {
|
||||
return nil, errors.New("no expression ID to resample. must be a reference to an existing query or expression")
|
||||
@ -259,14 +262,15 @@ func (gr *ResampleCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ResampleCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (gr *ResampleCommand) Execute(_ context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
newRes := mathexp.Results{}
|
||||
timeRange := gr.TimeRange.AbsoluteTime(now)
|
||||
for _, val := range vars[gr.VarToResample].Values {
|
||||
series, ok := val.(mathexp.Series)
|
||||
if !ok {
|
||||
return newRes, fmt.Errorf("can only resample type series, got type %v", val.Type())
|
||||
}
|
||||
num, err := series.Resample(gr.refID, gr.Window, gr.Downsampler, gr.Upsampler, gr.TimeRange.From, gr.TimeRange.To)
|
||||
num, err := series.Resample(gr.refID, gr.Window, gr.Downsampler, gr.Upsampler, timeRange.From, timeRange.To)
|
||||
if err != nil {
|
||||
return newRes, err
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -80,7 +81,7 @@ func Test_UnmarshalReduceCommand_Settings(t *testing.T) {
|
||||
RefID: "A",
|
||||
Query: qmap,
|
||||
QueryType: "",
|
||||
TimeRange: TimeRange{},
|
||||
TimeRange: RelativeTimeRange{},
|
||||
DataSource: nil,
|
||||
})
|
||||
|
||||
@ -114,7 +115,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
execute, err := cmd.Execute(context.Background(), vars)
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, execute.Values, len(numbers))
|
||||
@ -149,7 +150,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
results, err := cmd.Execute(context.Background(), vars)
|
||||
results, err := cmd.Execute(context.Background(), time.Now(), vars)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, results.Values, 1)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
|
||||
@ -37,7 +38,7 @@ type Node interface {
|
||||
ID() int64 // ID() allows the gonum graph node interface to be fulfilled
|
||||
NodeType() NodeType
|
||||
RefID() string
|
||||
Execute(c context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error)
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error)
|
||||
String() string
|
||||
}
|
||||
|
||||
@ -46,10 +47,10 @@ type DataPipeline []Node
|
||||
|
||||
// execute runs all the command/datasource requests in the pipeline return a
|
||||
// map of the refId of the of each command
|
||||
func (dp *DataPipeline) execute(c context.Context, s *Service) (mathexp.Vars, error) {
|
||||
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
|
||||
vars := make(mathexp.Vars)
|
||||
for _, node := range *dp {
|
||||
res, err := node.Execute(c, vars, s)
|
||||
res, err := node.Execute(c, now, vars, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ func TestServicebuildPipeLine(t *testing.T) {
|
||||
DataSource: &datasources.DataSource{
|
||||
Uid: "Fake",
|
||||
},
|
||||
TimeRange: AbsoluteTimeRange{},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -144,6 +145,7 @@ func TestServicebuildPipeLine(t *testing.T) {
|
||||
DataSource: &datasources.DataSource{
|
||||
Uid: "Fake",
|
||||
},
|
||||
TimeRange: AbsoluteTimeRange{},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -198,6 +200,7 @@ func TestServicebuildPipeLine(t *testing.T) {
|
||||
DataSource: &datasources.DataSource{
|
||||
Uid: "Fake",
|
||||
},
|
||||
TimeRange: AbsoluteTimeRange{},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -221,6 +224,7 @@ func TestServicebuildPipeLine(t *testing.T) {
|
||||
DataSource: &datasources.DataSource{
|
||||
Uid: "Fake",
|
||||
},
|
||||
TimeRange: AbsoluteTimeRange{},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -92,8 +92,8 @@ func (gn *CMDNode) NodeType() NodeType {
|
||||
// Execute runs the node and adds the results to vars. If the node requires
|
||||
// other nodes they must have already been executed and their results must
|
||||
// already by in vars.
|
||||
func (gn *CMDNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
||||
return gn.Command.Execute(ctx, vars)
|
||||
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, _ *Service) (mathexp.Results, error) {
|
||||
return gn.Command.Execute(ctx, now, vars)
|
||||
}
|
||||
|
||||
func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) {
|
||||
@ -156,6 +156,9 @@ func (dn *DSNode) NodeType() NodeType {
|
||||
}
|
||||
|
||||
func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) {
|
||||
if rn.TimeRange == nil {
|
||||
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
|
||||
}
|
||||
encodedQuery, err := json.Marshal(rn.Query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -198,7 +201,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
|
||||
// Execute runs the node and adds the results to vars. If the node requires
|
||||
// other nodes they must have already been executed and their results must
|
||||
// already by in vars.
|
||||
func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
||||
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (mathexp.Results, error) {
|
||||
dsInstanceSettings, err := adapters.ModelToInstanceSettings(dn.datasource, s.decryptSecureJsonDataFn(ctx))
|
||||
if err != nil {
|
||||
return mathexp.Results{}, fmt.Errorf("%v: %w", "failed to convert datasource instance settings", err)
|
||||
@ -215,10 +218,7 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m
|
||||
MaxDataPoints: dn.maxDP,
|
||||
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
||||
JSON: dn.query,
|
||||
TimeRange: backend.TimeRange{
|
||||
From: dn.timeRange.From,
|
||||
To: dn.timeRange.To,
|
||||
},
|
||||
TimeRange: dn.timeRange.AbsoluteTime(now),
|
||||
QueryType: dn.queryType,
|
||||
},
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package expr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
|
||||
@ -62,9 +63,9 @@ func (s *Service) BuildPipeline(req *Request) (DataPipeline, error) {
|
||||
}
|
||||
|
||||
// ExecutePipeline executes an expression pipeline and returns all the results.
|
||||
func (s *Service) ExecutePipeline(ctx context.Context, pipeline DataPipeline) (*backend.QueryDataResponse, error) {
|
||||
func (s *Service) ExecutePipeline(ctx context.Context, now time.Time, pipeline DataPipeline) (*backend.QueryDataResponse, error) {
|
||||
res := backend.NewQueryDataResponse()
|
||||
vars, err := pipeline.execute(ctx, s)
|
||||
vars, err := pipeline.execute(ctx, now, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -43,6 +43,10 @@ func TestService(t *testing.T) {
|
||||
Type: "test",
|
||||
},
|
||||
JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`),
|
||||
TimeRange: AbsoluteTimeRange{
|
||||
From: time.Time{},
|
||||
To: time.Time{},
|
||||
},
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
@ -56,7 +60,7 @@ func TestService(t *testing.T) {
|
||||
pl, err := s.BuildPipeline(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := s.ExecutePipeline(context.Background(), pl)
|
||||
res, err := s.ExecutePipeline(context.Background(), time.Now(), pl)
|
||||
require.NoError(t, err)
|
||||
|
||||
bDF := data.NewFrame("",
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
)
|
||||
@ -88,7 +89,7 @@ func (tc *ThresholdCommand) NeedsVars() []string {
|
||||
return []string{tc.ReferenceVar}
|
||||
}
|
||||
|
||||
func (tc *ThresholdCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (tc *ThresholdCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
mathExpression, err := createMathExpression(tc.ReferenceVar, tc.ThresholdFunc, tc.Conditions)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
@ -99,7 +100,7 @@ func (tc *ThresholdCommand) Execute(ctx context.Context, vars mathexp.Vars) (mat
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
|
||||
return mathCommand.Execute(ctx, vars)
|
||||
return mathCommand.Execute(ctx, now, vars)
|
||||
}
|
||||
|
||||
// createMathExpression converts all the info we have about a "threshold" expression in to a Math expression
|
||||
|
@ -93,7 +93,6 @@ func TestUnmarshalThresholdCommand(t *testing.T) {
|
||||
RefID: "",
|
||||
Query: qmap,
|
||||
QueryType: "",
|
||||
TimeRange: TimeRange{},
|
||||
DataSource: nil,
|
||||
})
|
||||
|
||||
|
@ -50,14 +50,38 @@ type Query struct {
|
||||
}
|
||||
|
||||
// TimeRange is a time.Time based TimeRange.
|
||||
type TimeRange struct {
|
||||
type TimeRange interface {
|
||||
AbsoluteTime(now time.Time) backend.TimeRange
|
||||
}
|
||||
|
||||
type AbsoluteTimeRange struct {
|
||||
From time.Time
|
||||
To time.Time
|
||||
}
|
||||
|
||||
func (r AbsoluteTimeRange) AbsoluteTime(_ time.Time) backend.TimeRange {
|
||||
return backend.TimeRange{
|
||||
From: r.From,
|
||||
To: r.To,
|
||||
}
|
||||
}
|
||||
|
||||
// RelativeTimeRange is a time range relative to some absolute time.
|
||||
type RelativeTimeRange struct {
|
||||
From time.Duration
|
||||
To time.Duration
|
||||
}
|
||||
|
||||
func (r RelativeTimeRange) AbsoluteTime(t time.Time) backend.TimeRange {
|
||||
return backend.TimeRange{
|
||||
From: t.Add(r.From),
|
||||
To: t.Add(r.To),
|
||||
}
|
||||
}
|
||||
|
||||
// TransformData takes Queries which are either expressions nodes
|
||||
// or are datasource requests.
|
||||
func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.QueryDataResponse, err error) {
|
||||
func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request) (r *backend.QueryDataResponse, err error) {
|
||||
if s.isDisabled() {
|
||||
return nil, fmt.Errorf("server side expressions are disabled")
|
||||
}
|
||||
@ -83,7 +107,7 @@ func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.Q
|
||||
}
|
||||
|
||||
// Execute the pipeline
|
||||
responses, err := s.ExecutePipeline(ctx, pipeline)
|
||||
responses, err := s.ExecutePipeline(ctx, now, pipeline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -215,10 +215,7 @@ func getExprRequest(ctx EvaluationContext, data []models.AlertQuery, dsCacheServ
|
||||
}
|
||||
|
||||
req.Queries = append(req.Queries, expr.Query{
|
||||
TimeRange: expr.TimeRange{
|
||||
From: q.RelativeTimeRange.ToTimeRange(ctx.At).From,
|
||||
To: q.RelativeTimeRange.ToTimeRange(ctx.At).To,
|
||||
},
|
||||
TimeRange: q.RelativeTimeRange.ToTimeRange(),
|
||||
DataSource: ds,
|
||||
JSON: model,
|
||||
Interval: interval,
|
||||
@ -334,7 +331,7 @@ func executeQueriesAndExpressions(ctx EvaluationContext, data []models.AlertQuer
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return exprService.TransformData(ctx.Ctx, queryDataReq)
|
||||
return exprService.TransformData(ctx.Ctx, ctx.At, queryDataReq)
|
||||
}
|
||||
|
||||
// datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID.
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
)
|
||||
|
||||
@ -70,10 +69,10 @@ func (rtr *RelativeTimeRange) isValid() bool {
|
||||
return rtr.From > rtr.To
|
||||
}
|
||||
|
||||
func (rtr *RelativeTimeRange) ToTimeRange(now time.Time) backend.TimeRange {
|
||||
return backend.TimeRange{
|
||||
From: now.Add(-time.Duration(rtr.From)),
|
||||
To: now.Add(-time.Duration(rtr.To)),
|
||||
func (rtr *RelativeTimeRange) ToTimeRange() expr.TimeRange {
|
||||
return expr.RelativeTimeRange{
|
||||
From: -time.Duration(rtr.From),
|
||||
To: -time.Duration(rtr.To),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,14 +144,14 @@ func (s *Service) handleExpressions(ctx context.Context, user *user.SignedInUser
|
||||
MaxDataPoints: pq.query.MaxDataPoints,
|
||||
QueryType: pq.query.QueryType,
|
||||
DataSource: pq.datasource,
|
||||
TimeRange: expr.TimeRange{
|
||||
TimeRange: expr.AbsoluteTimeRange{
|
||||
From: pq.query.TimeRange.From,
|
||||
To: pq.query.TimeRange.To,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
qdr, err := s.expressionService.TransformData(ctx, &exprReq)
|
||||
qdr, err := s.expressionService.TransformData(ctx, time.Now(), &exprReq) // use time now because all queries have absolute time range
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("expression request error: %w", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user