mirror of
https://github.com/grafana/grafana.git
synced 2025-08-02 03:12:13 +08:00
SSE: Group data source node execution by data source (#72935)
Execute all queries to the same datasource in a single request. Uses the query index and the graph node ID index, and then a stable dependency graph sort based on node input index number in attempt to keep the original query order intact.
This commit is contained in:
@ -55,7 +55,24 @@ type DataPipeline []Node
|
|||||||
// map of the refId of the of each command
|
// map of the refId of the of each command
|
||||||
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
|
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
|
||||||
vars := make(mathexp.Vars)
|
vars := make(mathexp.Vars)
|
||||||
|
|
||||||
|
// Execute datasource nodes first, and grouped by datasource.
|
||||||
|
dsNodes := []*DSNode{}
|
||||||
for _, node := range *dp {
|
for _, node := range *dp {
|
||||||
|
if node.NodeType() != TypeDatasourceNode {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dsNodes = append(dsNodes, node.(*DSNode))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := executeDSNodesGrouped(c, now, vars, s, dsNodes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range *dp {
|
||||||
|
if node.NodeType() == TypeDatasourceNode {
|
||||||
|
continue // already executed via executeDSNodesGrouped
|
||||||
|
}
|
||||||
c, span := s.tracer.Start(c, "SSE.ExecuteNode")
|
c, span := s.tracer.Start(c, "SSE.ExecuteNode")
|
||||||
span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID()))
|
span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID()))
|
||||||
if node.NodeType() == TypeCMDNode {
|
if node.NodeType() == TypeCMDNode {
|
||||||
@ -112,8 +129,10 @@ func (s *Service) buildDependencyGraph(req *Request) (*simple.DirectedGraph, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// buildExecutionOrder returns a sequence of nodes ordered by dependency.
|
// buildExecutionOrder returns a sequence of nodes ordered by dependency.
|
||||||
|
// Note: During execution, Datasource query nodes for the same datasource will
|
||||||
|
// be grouped into one request and executed first as phase after this call.
|
||||||
func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) {
|
func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) {
|
||||||
sortedNodes, err := topo.Sort(graph)
|
sortedNodes, err := topo.SortStabilized(graph, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -145,7 +164,7 @@ func buildNodeRegistry(g *simple.DirectedGraph) map[string]Node {
|
|||||||
func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
|
func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
|
||||||
dp := simple.NewDirectedGraph()
|
dp := simple.NewDirectedGraph()
|
||||||
|
|
||||||
for _, query := range req.Queries {
|
for i, query := range req.Queries {
|
||||||
if query.DataSource == nil || query.DataSource.UID == "" {
|
if query.DataSource == nil || query.DataSource.UID == "" {
|
||||||
return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID)
|
return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID)
|
||||||
}
|
}
|
||||||
@ -169,6 +188,7 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
|
|||||||
TimeRange: query.TimeRange,
|
TimeRange: query.TimeRange,
|
||||||
QueryType: query.QueryType,
|
QueryType: query.QueryType,
|
||||||
DataSource: query.DataSource,
|
DataSource: query.DataSource,
|
||||||
|
idx: int64(i),
|
||||||
}
|
}
|
||||||
|
|
||||||
var node Node
|
var node Node
|
||||||
|
@ -141,7 +141,7 @@ func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
|
|||||||
|
|
||||||
return &MLNode{
|
return &MLNode{
|
||||||
baseNode: baseNode{
|
baseNode: baseNode{
|
||||||
id: dp.NewNode().ID(),
|
id: rn.idx,
|
||||||
refID: rn.RefID,
|
refID: rn.RefID,
|
||||||
},
|
},
|
||||||
TimeRange: rn.TimeRange,
|
TimeRange: rn.TimeRange,
|
||||||
|
@ -40,6 +40,9 @@ type rawNode struct {
|
|||||||
QueryType string
|
QueryType string
|
||||||
TimeRange TimeRange
|
TimeRange TimeRange
|
||||||
DataSource *datasources.DataSource
|
DataSource *datasources.DataSource
|
||||||
|
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
|
||||||
|
// Some data sources, such as cloud watch, have order dependencies between queries.
|
||||||
|
idx int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rn *rawNode) GetCommandType() (c CommandType, err error) {
|
func (rn *rawNode) GetCommandType() (c CommandType, err error) {
|
||||||
@ -97,7 +100,7 @@ func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) {
|
|||||||
|
|
||||||
node := &CMDNode{
|
node := &CMDNode{
|
||||||
baseNode: baseNode{
|
baseNode: baseNode{
|
||||||
id: dp.NewNode().ID(),
|
id: rn.idx,
|
||||||
refID: rn.RefID,
|
refID: rn.RefID,
|
||||||
},
|
},
|
||||||
CMDType: commandType,
|
CMDType: commandType,
|
||||||
@ -159,7 +162,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
|
|||||||
|
|
||||||
dsNode := &DSNode{
|
dsNode := &DSNode{
|
||||||
baseNode: baseNode{
|
baseNode: baseNode{
|
||||||
id: dp.NewNode().ID(),
|
id: rn.idx,
|
||||||
refID: rn.RefID,
|
refID: rn.RefID,
|
||||||
},
|
},
|
||||||
orgID: req.OrgId,
|
orgID: req.OrgId,
|
||||||
@ -191,69 +194,104 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
|
|||||||
return dsNode, nil
|
return dsNode, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
|
||||||
|
// in a single request with one or more queries to the datasource.
|
||||||
|
func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) (e error) {
|
||||||
|
type dsKey struct {
|
||||||
|
uid string // in theory I think this all I need for the key, but rather be safe
|
||||||
|
id int64
|
||||||
|
orgID int64
|
||||||
|
}
|
||||||
|
byDS := make(map[dsKey][]*DSNode)
|
||||||
|
for _, node := range nodes {
|
||||||
|
k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
|
||||||
|
byDS[k] = append(byDS[k], node)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeGroup := range byDS {
|
||||||
|
if err := func() error {
|
||||||
|
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
||||||
|
defer span.End()
|
||||||
|
firstNode := nodeGroup[0]
|
||||||
|
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
|
||||||
|
"queryRefId", firstNode.refID,
|
||||||
|
"datasourceUid", firstNode.datasource.UID,
|
||||||
|
"datasourceVersion", firstNode.datasource.Version,
|
||||||
|
)
|
||||||
|
|
||||||
|
span.SetAttributes("datasource.type", firstNode.datasource.Type, attribute.Key("datasource.type").String(firstNode.datasource.Type))
|
||||||
|
span.SetAttributes("datasource.uid", firstNode.datasource.UID, attribute.Key("datasource.uid").String(firstNode.datasource.UID))
|
||||||
|
|
||||||
|
req := &backend.QueryDataRequest{
|
||||||
|
PluginContext: pCtx,
|
||||||
|
Headers: firstNode.request.Headers,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dn := range nodeGroup {
|
||||||
|
req.Queries = append(req.Queries, backend.DataQuery{
|
||||||
|
RefID: dn.refID,
|
||||||
|
MaxDataPoints: dn.maxDP,
|
||||||
|
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
||||||
|
JSON: dn.query,
|
||||||
|
TimeRange: dn.timeRange.AbsoluteTime(now),
|
||||||
|
QueryType: dn.queryType,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
responseType := "unknown"
|
||||||
|
respStatus := "success"
|
||||||
|
defer func() {
|
||||||
|
if e != nil {
|
||||||
|
responseType = "error"
|
||||||
|
respStatus = "failure"
|
||||||
|
span.AddEvents([]string{"error", "message"},
|
||||||
|
[]tracing.EventValue{
|
||||||
|
{Str: fmt.Sprintf("%v", err)},
|
||||||
|
{Str: "failed to query data source"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
logger.Debug("Data source queried", "responseType", responseType)
|
||||||
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
||||||
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
|
||||||
|
}()
|
||||||
|
|
||||||
|
resp, err := s.dataService.QueryData(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dn := range nodeGroup {
|
||||||
|
dataFrames, err := getResponseFrame(resp, dn.refID)
|
||||||
|
if err != nil {
|
||||||
|
return MakeQueryError(dn.refID, dn.datasource.UID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result mathexp.Results
|
||||||
|
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger)
|
||||||
|
if err != nil {
|
||||||
|
return MakeConversionError(dn.refID, err)
|
||||||
|
}
|
||||||
|
vars[dn.refID] = result
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Execute runs the node and adds the results to vars. If the node requires
|
// Execute runs the node and adds the results to vars. If the node requires
|
||||||
// other nodes they must have already been executed and their results must
|
// other nodes they must have already been executed and their results must
|
||||||
// already by in vars.
|
// already by in vars.
|
||||||
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
||||||
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
|
panic("Execute called on DSNode and should not be")
|
||||||
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
// Datasource queries are sent as a group to the datasource, see executeDSNodesGrouped.
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
|
|
||||||
if err != nil {
|
|
||||||
return mathexp.Results{}, err
|
|
||||||
}
|
|
||||||
span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type))
|
|
||||||
span.SetAttributes("datasource.uid", dn.datasource.UID, attribute.Key("datasource.uid").String(dn.datasource.UID))
|
|
||||||
|
|
||||||
req := &backend.QueryDataRequest{
|
|
||||||
PluginContext: pCtx,
|
|
||||||
Queries: []backend.DataQuery{
|
|
||||||
{
|
|
||||||
RefID: dn.refID,
|
|
||||||
MaxDataPoints: dn.maxDP,
|
|
||||||
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
|
||||||
JSON: dn.query,
|
|
||||||
TimeRange: dn.timeRange.AbsoluteTime(now),
|
|
||||||
QueryType: dn.queryType,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Headers: dn.request.Headers,
|
|
||||||
}
|
|
||||||
|
|
||||||
responseType := "unknown"
|
|
||||||
respStatus := "success"
|
|
||||||
defer func() {
|
|
||||||
if e != nil {
|
|
||||||
responseType = "error"
|
|
||||||
respStatus = "failure"
|
|
||||||
span.AddEvents([]string{"error", "message"},
|
|
||||||
[]tracing.EventValue{
|
|
||||||
{Str: fmt.Sprintf("%v", err)},
|
|
||||||
{Str: "failed to query data source"},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
logger.Debug("Data source queried", "responseType", responseType)
|
|
||||||
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
|
||||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
|
||||||
}()
|
|
||||||
|
|
||||||
resp, err := s.dataService.QueryData(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dataFrames, err := getResponseFrame(resp, dn.refID)
|
|
||||||
if err != nil {
|
|
||||||
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var result mathexp.Results
|
|
||||||
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger)
|
|
||||||
if err != nil {
|
|
||||||
err = MakeConversionError(dn.refID, err)
|
|
||||||
}
|
|
||||||
return result, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) {
|
func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) {
|
||||||
|
Reference in New Issue
Block a user