mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 23:32:30 +08:00
SSE: DSNode to update result with names to make each value identifiable by labels (only Graphite and TestData) (#71246)
* introduce a function checkIfSeriesNeedToBeFixed to scan all value fields in the response and provide a function that updates Series so they can be uniquely identifiable. Only Graphite and TestData are checked. * update `convertDataFramesToResults` to run this function and provide it to WideToMany * update WideToMany to run the fix function if it is not nil
This commit is contained in:
@ -20,6 +20,9 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
)
|
||||
|
||||
// label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
|
||||
const nameLabelName = "__name__"
|
||||
|
||||
var (
|
||||
logger = log.New("expr")
|
||||
)
|
||||
@ -295,7 +298,6 @@ func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasou
|
||||
return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil
|
||||
}
|
||||
|
||||
vals := make([]mathexp.Value, 0)
|
||||
var dt data.FrameType
|
||||
dt, useDataplane, _ := shouldUseDataplane(frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane))
|
||||
if useDataplane {
|
||||
@ -325,6 +327,7 @@ func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasou
|
||||
if err != nil {
|
||||
return "", mathexp.Results{}, err
|
||||
}
|
||||
vals := make([]mathexp.Value, 0, len(numberSet))
|
||||
for _, n := range numberSet {
|
||||
vals = append(vals, n)
|
||||
}
|
||||
@ -334,16 +337,33 @@ func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasou
|
||||
}
|
||||
}
|
||||
|
||||
filtered := make([]*data.Frame, 0, len(frames))
|
||||
totalLen := 0
|
||||
for _, frame := range frames {
|
||||
schema := frame.TimeSeriesSchema()
|
||||
// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
|
||||
// the WideToMany() function to error out, which results in unhealthy alerts.
|
||||
// This check should be removed once inconsistencies in data source responses are solved.
|
||||
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB {
|
||||
if schema.Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB {
|
||||
logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields")
|
||||
continue
|
||||
}
|
||||
var series []mathexp.Series
|
||||
series, err := WideToMany(frame)
|
||||
if schema.Type != data.TimeSeriesTypeWide {
|
||||
return "", mathexp.Results{}, fmt.Errorf("input data must be a wide series but got type %s (input refid)", schema.Type)
|
||||
}
|
||||
filtered = append(filtered, frame)
|
||||
totalLen += len(schema.ValueIndices)
|
||||
}
|
||||
|
||||
if len(filtered) == 0 {
|
||||
return "no data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frames[0]}}}, nil
|
||||
}
|
||||
|
||||
maybeFixerFn := checkIfSeriesNeedToBeFixed(filtered, datasourceType)
|
||||
|
||||
vals := make([]mathexp.Value, 0, totalLen)
|
||||
for _, frame := range filtered {
|
||||
series, err := WideToMany(frame, maybeFixerFn)
|
||||
if err != nil {
|
||||
return "", mathexp.Results{}, err
|
||||
}
|
||||
@ -351,14 +371,17 @@ func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasou
|
||||
vals = append(vals, ser)
|
||||
}
|
||||
}
|
||||
|
||||
return "series set", mathexp.Results{
|
||||
Values: vals, // TODO vals can be empty. Should we replace with no-data?
|
||||
dataType := "single frame series"
|
||||
if len(filtered) > 1 {
|
||||
dataType = "multi frame series"
|
||||
}
|
||||
return dataType, mathexp.Results{
|
||||
Values: vals,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isAllFrameVectors(datasourceType string, frames data.Frames) bool {
|
||||
if datasourceType != "prometheus" {
|
||||
if datasourceType != datasources.DS_PROMETHEUS {
|
||||
return false
|
||||
}
|
||||
allVector := false
|
||||
@ -466,7 +489,7 @@ func extractNumberSet(frame *data.Frame) ([]mathexp.Number, error) {
|
||||
// is created for each value type column of wide frame.
|
||||
//
|
||||
// This might not be a good idea long term, but works now as an adapter/shim.
|
||||
func WideToMany(frame *data.Frame) ([]mathexp.Series, error) {
|
||||
func WideToMany(frame *data.Frame, fixSeries func(series mathexp.Series, valueField *data.Field)) ([]mathexp.Series, error) {
|
||||
tsSchema := frame.TimeSeriesSchema()
|
||||
if tsSchema.Type != data.TimeSeriesTypeWide {
|
||||
return nil, fmt.Errorf("input data must be a wide series but got type %s (input refid)", tsSchema.Type)
|
||||
@ -477,10 +500,13 @@ func WideToMany(frame *data.Frame) ([]mathexp.Series, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fixSeries != nil {
|
||||
fixSeries(s, frame.Fields[tsSchema.ValueIndices[0]])
|
||||
}
|
||||
return []mathexp.Series{s}, nil
|
||||
}
|
||||
|
||||
series := []mathexp.Series{}
|
||||
series := make([]mathexp.Series, 0, len(tsSchema.ValueIndices))
|
||||
for _, valIdx := range tsSchema.ValueIndices {
|
||||
l := frame.Rows()
|
||||
f := data.NewFrameOfFieldTypes(frame.Name, l, frame.Fields[tsSchema.TimeIndex].Type(), frame.Fields[valIdx].Type())
|
||||
@ -500,8 +526,79 @@ func WideToMany(frame *data.Frame) ([]mathexp.Series, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fixSeries != nil {
|
||||
fixSeries(s, frame.Fields[valIdx])
|
||||
}
|
||||
series = append(series, s)
|
||||
}
|
||||
|
||||
return series, nil
|
||||
}
|
||||
|
||||
// checkIfSeriesNeedToBeFixed scans all value fields of all provided frames and determines whether the resulting mathexp.Series
|
||||
// needs to be updated so each series could be identifiable by labels.
|
||||
// NOTE: applicable only to only datasources.DS_GRAPHITE and datasources.DS_TESTDATA data sources
|
||||
// returns a function that patches the mathexp.Series with information from data.Field from which it was created if the all series need to be fixed. Otherwise, returns nil
|
||||
func checkIfSeriesNeedToBeFixed(frames []*data.Frame, datasourceType string) func(series mathexp.Series, valueField *data.Field) {
|
||||
if !(datasourceType == datasources.DS_GRAPHITE || datasourceType == datasources.DS_TESTDATA) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// get all value fields
|
||||
var valueFields []*data.Field
|
||||
for _, frame := range frames {
|
||||
tsSchema := frame.TimeSeriesSchema()
|
||||
for _, index := range tsSchema.ValueIndices {
|
||||
field := frame.Fields[index]
|
||||
// if at least one value field contains labels, the result does not need to be fixed.
|
||||
if len(field.Labels) > 0 {
|
||||
return nil
|
||||
}
|
||||
if valueFields == nil {
|
||||
valueFields = make([]*data.Field, 0, len(frames)*len(tsSchema.ValueIndices))
|
||||
}
|
||||
valueFields = append(valueFields, field)
|
||||
}
|
||||
}
|
||||
|
||||
// selectors are in precedence order.
|
||||
nameSelectors := []func(f *data.Field) string{
|
||||
func(f *data.Field) string {
|
||||
if f == nil || f.Config == nil {
|
||||
return ""
|
||||
}
|
||||
return f.Config.DisplayNameFromDS
|
||||
},
|
||||
func(f *data.Field) string {
|
||||
if f == nil || f.Config == nil {
|
||||
return ""
|
||||
}
|
||||
return f.Config.DisplayName
|
||||
},
|
||||
func(f *data.Field) string {
|
||||
return f.Name
|
||||
},
|
||||
}
|
||||
|
||||
// now look for the first selector that would make all value fields be unique
|
||||
for _, selector := range nameSelectors {
|
||||
names := make(map[string]struct{}, len(valueFields))
|
||||
good := true
|
||||
for _, field := range valueFields {
|
||||
name := selector(field)
|
||||
if _, ok := names[name]; ok || name == "" {
|
||||
good = false
|
||||
break
|
||||
}
|
||||
names[name] = struct{}{}
|
||||
}
|
||||
if good {
|
||||
return func(series mathexp.Series, valueField *data.Field) {
|
||||
series.SetLabels(data.Labels{
|
||||
nameLabelName: selector(valueField),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user