From 720d716e457a5380cfc75faa36ed99f939c23a7c Mon Sep 17 00:00:00 2001 From: Kyle Brandt Date: Fri, 18 Aug 2023 07:49:59 -0400 Subject: [PATCH] SSE: Group data source node execution by data source (#72935) Execute all queries to the same datasource in a single request. Uses the query index and the graph node ID index, and then a stable dependency graph sort based on node input index number in attempt to keep the original query order intact. --- pkg/expr/graph.go | 24 ++++++- pkg/expr/ml.go | 2 +- pkg/expr/nodes.go | 160 ++++++++++++++++++++++++++++------------------ 3 files changed, 122 insertions(+), 64 deletions(-) diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index eb26a87117f..ee983fb0665 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -55,7 +55,24 @@ type DataPipeline []Node // map of the refId of the of each command func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) { vars := make(mathexp.Vars) + + // Execute datasource nodes first, and grouped by datasource. + dsNodes := []*DSNode{} for _, node := range *dp { + if node.NodeType() != TypeDatasourceNode { + continue + } + dsNodes = append(dsNodes, node.(*DSNode)) + } + + if err := executeDSNodesGrouped(c, now, vars, s, dsNodes); err != nil { + return nil, err + } + + for _, node := range *dp { + if node.NodeType() == TypeDatasourceNode { + continue // already executed via executeDSNodesGrouped + } c, span := s.tracer.Start(c, "SSE.ExecuteNode") span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID())) if node.NodeType() == TypeCMDNode { @@ -112,8 +129,10 @@ func (s *Service) buildDependencyGraph(req *Request) (*simple.DirectedGraph, err } // buildExecutionOrder returns a sequence of nodes ordered by dependency. +// Note: During execution, Datasource query nodes for the same datasource will +// be grouped into one request and executed first as phase after this call. func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) { - sortedNodes, err := topo.Sort(graph) + sortedNodes, err := topo.SortStabilized(graph, nil) if err != nil { return nil, err } @@ -145,7 +164,7 @@ func buildNodeRegistry(g *simple.DirectedGraph) map[string]Node { func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { dp := simple.NewDirectedGraph() - for _, query := range req.Queries { + for i, query := range req.Queries { if query.DataSource == nil || query.DataSource.UID == "" { return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID) } @@ -169,6 +188,7 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { TimeRange: query.TimeRange, QueryType: query.QueryType, DataSource: query.DataSource, + idx: int64(i), } var node Node diff --git a/pkg/expr/ml.go b/pkg/expr/ml.go index 35df536cb77..27b9df49e0f 100644 --- a/pkg/expr/ml.go +++ b/pkg/expr/ml.go @@ -141,7 +141,7 @@ func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques return &MLNode{ baseNode: baseNode{ - id: dp.NewNode().ID(), + id: rn.idx, refID: rn.RefID, }, TimeRange: rn.TimeRange, diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 19eaccae8e3..19276e15fd0 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -40,6 +40,9 @@ type rawNode struct { QueryType string TimeRange TimeRange DataSource *datasources.DataSource + // We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order. + // Some data sources, such as cloud watch, have order dependencies between queries. + idx int64 } func (rn *rawNode) GetCommandType() (c CommandType, err error) { @@ -97,7 +100,7 @@ func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) { node := &CMDNode{ baseNode: baseNode{ - id: dp.NewNode().ID(), + id: rn.idx, refID: rn.RefID, }, CMDType: commandType, @@ -159,7 +162,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques dsNode := &DSNode{ baseNode: baseNode{ - id: dp.NewNode().ID(), + id: rn.idx, refID: rn.RefID, }, orgID: req.OrgId, @@ -191,69 +194,104 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques return dsNode, nil } +// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them +// in a single request with one or more queries to the datasource. +func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) (e error) { + type dsKey struct { + uid string // in theory I think this all I need for the key, but rather be safe + id int64 + orgID int64 + } + byDS := make(map[dsKey][]*DSNode) + for _, node := range nodes { + k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID} + byDS[k] = append(byDS[k], node) + } + + for _, nodeGroup := range byDS { + if err := func() error { + ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery") + defer span.End() + firstNode := nodeGroup[0] + pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource) + if err != nil { + return err + } + + logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type, + "queryRefId", firstNode.refID, + "datasourceUid", firstNode.datasource.UID, + "datasourceVersion", firstNode.datasource.Version, + ) + + span.SetAttributes("datasource.type", firstNode.datasource.Type, attribute.Key("datasource.type").String(firstNode.datasource.Type)) + span.SetAttributes("datasource.uid", firstNode.datasource.UID, attribute.Key("datasource.uid").String(firstNode.datasource.UID)) + + req := &backend.QueryDataRequest{ + PluginContext: pCtx, + Headers: firstNode.request.Headers, + } + + for _, dn := range nodeGroup { + req.Queries = append(req.Queries, backend.DataQuery{ + RefID: dn.refID, + MaxDataPoints: dn.maxDP, + Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS), + JSON: dn.query, + TimeRange: dn.timeRange.AbsoluteTime(now), + QueryType: dn.queryType, + }) + } + + responseType := "unknown" + respStatus := "success" + defer func() { + if e != nil { + responseType = "error" + respStatus = "failure" + span.AddEvents([]string{"error", "message"}, + []tracing.EventValue{ + {Str: fmt.Sprintf("%v", err)}, + {Str: "failed to query data source"}, + }) + } + logger.Debug("Data source queried", "responseType", responseType) + useDataplane := strings.HasPrefix(responseType, "dataplane-") + s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc() + }() + + resp, err := s.dataService.QueryData(ctx, req) + if err != nil { + return MakeQueryError(firstNode.refID, firstNode.datasource.UID, err) + } + + for _, dn := range nodeGroup { + dataFrames, err := getResponseFrame(resp, dn.refID) + if err != nil { + return MakeQueryError(dn.refID, dn.datasource.UID, err) + } + + var result mathexp.Results + responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) + if err != nil { + return MakeConversionError(dn.refID, err) + } + vars[dn.refID] = result + } + return nil + }(); err != nil { + return err + } + } + return nil +} + // Execute runs the node and adds the results to vars. If the node requires // other nodes they must have already been executed and their results must // already by in vars. func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) { - logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version) - ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery") - defer span.End() - - pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource) - if err != nil { - return mathexp.Results{}, err - } - span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type)) - span.SetAttributes("datasource.uid", dn.datasource.UID, attribute.Key("datasource.uid").String(dn.datasource.UID)) - - req := &backend.QueryDataRequest{ - PluginContext: pCtx, - Queries: []backend.DataQuery{ - { - RefID: dn.refID, - MaxDataPoints: dn.maxDP, - Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS), - JSON: dn.query, - TimeRange: dn.timeRange.AbsoluteTime(now), - QueryType: dn.queryType, - }, - }, - Headers: dn.request.Headers, - } - - responseType := "unknown" - respStatus := "success" - defer func() { - if e != nil { - responseType = "error" - respStatus = "failure" - span.AddEvents([]string{"error", "message"}, - []tracing.EventValue{ - {Str: fmt.Sprintf("%v", err)}, - {Str: "failed to query data source"}, - }) - } - logger.Debug("Data source queried", "responseType", responseType) - useDataplane := strings.HasPrefix(responseType, "dataplane-") - s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc() - }() - - resp, err := s.dataService.QueryData(ctx, req) - if err != nil { - return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) - } - - dataFrames, err := getResponseFrame(resp, dn.refID) - if err != nil { - return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) - } - - var result mathexp.Results - responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) - if err != nil { - err = MakeConversionError(dn.refID, err) - } - return result, err + panic("Execute called on DSNode and should not be") + // Datasource queries are sent as a group to the datasource, see executeDSNodesGrouped. } func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) {