mirror of
https://github.com/grafana/grafana.git
synced 2025-08-02 17:52:24 +08:00
SSE: Put data source query grouping behind feature flag (#74551)
change was originally merged in commit: 720d716 via PR: https://github.com/grafana/grafana/pull/72935 with no flag flag is: sseGroupByDatasource
This commit is contained in:
@ -56,21 +56,24 @@ type DataPipeline []Node
|
||||
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
|
||||
vars := make(mathexp.Vars)
|
||||
|
||||
groupByDSFlag := s.features.IsEnabled(featuremgmt.FlagSseGroupByDatasource)
|
||||
// Execute datasource nodes first, and grouped by datasource.
|
||||
dsNodes := []*DSNode{}
|
||||
for _, node := range *dp {
|
||||
if node.NodeType() != TypeDatasourceNode {
|
||||
continue
|
||||
if groupByDSFlag {
|
||||
dsNodes := []*DSNode{}
|
||||
for _, node := range *dp {
|
||||
if node.NodeType() != TypeDatasourceNode {
|
||||
continue
|
||||
}
|
||||
dsNodes = append(dsNodes, node.(*DSNode))
|
||||
}
|
||||
dsNodes = append(dsNodes, node.(*DSNode))
|
||||
}
|
||||
|
||||
if err := executeDSNodesGrouped(c, now, vars, s, dsNodes); err != nil {
|
||||
return nil, err
|
||||
if err := executeDSNodesGrouped(c, now, vars, s, dsNodes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
for _, node := range *dp {
|
||||
if node.NodeType() == TypeDatasourceNode {
|
||||
if groupByDSFlag && node.NodeType() == TypeDatasourceNode {
|
||||
continue // already executed via executeDSNodesGrouped
|
||||
}
|
||||
c, span := s.tracer.Start(c, "SSE.ExecuteNode")
|
||||
|
@ -290,8 +290,65 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars
|
||||
// other nodes they must have already been executed and their results must
|
||||
// already by in vars.
|
||||
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
||||
panic("Execute called on DSNode and should not be")
|
||||
// Datasource queries are sent as a group to the datasource, see executeDSNodesGrouped.
|
||||
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
|
||||
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
||||
defer span.End()
|
||||
|
||||
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type))
|
||||
span.SetAttributes("datasource.uid", dn.datasource.UID, attribute.Key("datasource.uid").String(dn.datasource.UID))
|
||||
|
||||
req := &backend.QueryDataRequest{
|
||||
PluginContext: pCtx,
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: dn.refID,
|
||||
MaxDataPoints: dn.maxDP,
|
||||
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
||||
JSON: dn.query,
|
||||
TimeRange: dn.timeRange.AbsoluteTime(now),
|
||||
QueryType: dn.queryType,
|
||||
},
|
||||
},
|
||||
Headers: dn.request.Headers,
|
||||
}
|
||||
|
||||
responseType := "unknown"
|
||||
respStatus := "success"
|
||||
defer func() {
|
||||
if e != nil {
|
||||
responseType = "error"
|
||||
respStatus = "failure"
|
||||
span.AddEvents([]string{"error", "message"},
|
||||
[]tracing.EventValue{
|
||||
{Str: fmt.Sprintf("%v", err)},
|
||||
{Str: "failed to query data source"},
|
||||
})
|
||||
}
|
||||
logger.Debug("Data source queried", "responseType", responseType)
|
||||
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
||||
}()
|
||||
|
||||
resp, err := s.dataService.QueryData(ctx, req)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
||||
}
|
||||
|
||||
dataFrames, err := getResponseFrame(resp, dn.refID)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
||||
}
|
||||
|
||||
var result mathexp.Results
|
||||
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger)
|
||||
if err != nil {
|
||||
err = MakeConversionError(dn.refID, err)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) {
|
||||
|
Reference in New Issue
Block a user