Prometheus: Run dashboard queries trough backend (#40333)

* Fix processing for table in dashboard - expolre compatible

* Add processing of histograms

* Interpolsate rate_interval in step field

* Add tests

* Fix function name

* Extract internal range and interval variables to constants

* Fix typings, refactor, simplify

* Fix constant that was missing _ms
This commit is contained in:
Ivana Huckova
2021-10-15 13:37:27 +02:00
committed by GitHub
parent 58fdb717ba
commit 7140867868
6 changed files with 233 additions and 97 deletions

View File

@ -28,6 +28,16 @@ import (
"github.com/prometheus/common/model"
)
// Internal interval and range variables
const (
varInterval = "$__interval"
varIntervalMs = "$__interval_ms"
varRange = "$__range"
varRangeS = "$__range_s"
varRangeMs = "$__range_ms"
varRateInterval = "$__rate_interval"
)
var (
plog = log.New("tsdb.prometheus")
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
@ -151,7 +161,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
}
response[Range] = rangeResponse
response[RangeQueryType] = rangeResponse
}
if query.InstantQuery {
@ -159,7 +169,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
}
response[Instant] = instantResponse
response[InstantQueryType] = instantResponse
}
// For now, we ignore exemplar errors and continue with processing of other results
if query.ExemplarQuery {
@ -168,7 +178,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
exemplarResponse = nil
plog.Error("Exemplar query", query.Expr, "failed with", err)
}
response[Exemplar] = exemplarResponse
response[ExemplarQueryType] = exemplarResponse
}
frames, err := parseResponse(response, query)
@ -251,11 +261,13 @@ func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *Dat
if err != nil {
return nil, err
}
//Final interval value
var interval time.Duration
//Calculate interval
queryInterval := model.Interval
//If we are using variable or interval/step, we will replace it with calculated interval
if queryInterval == "$__interval" || queryInterval == "$__interval_ms" {
if queryInterval == varInterval || queryInterval == varIntervalMs || queryInterval == varRateInterval {
queryInterval = ""
}
minInterval, err := intervalv2.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, model.IntervalMS, 15*time.Second)
@ -265,29 +277,34 @@ func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *Dat
calculatedInterval := s.intervalCalculator.Calculate(query.TimeRange, minInterval, query.MaxDataPoints)
safeInterval := s.intervalCalculator.CalculateSafeInterval(query.TimeRange, int64(safeRes))
adjustedInterval := safeInterval.Value
if calculatedInterval.Value > safeInterval.Value {
adjustedInterval = calculatedInterval.Value
}
intervalFactor := model.IntervalFactor
if intervalFactor == 0 {
intervalFactor = 1
if queryInterval == varRateInterval {
// Rate interval is final and is not affected by resolution
interval = calculateRateInterval(adjustedInterval, dsInfo.TimeInterval, s.intervalCalculator)
} else {
intervalFactor := model.IntervalFactor
if intervalFactor == 0 {
intervalFactor = 1
}
interval = time.Duration(int64(adjustedInterval) * intervalFactor)
}
interval := time.Duration(int64(adjustedInterval) * intervalFactor)
intervalMs := int64(interval / time.Millisecond)
rangeS := query.TimeRange.To.Unix() - query.TimeRange.From.Unix()
// Interpolate variables in expr
expr := model.Expr
expr = strings.ReplaceAll(expr, "$__interval_ms", strconv.FormatInt(intervalMs, 10))
expr = strings.ReplaceAll(expr, "$__interval", intervalv2.FormatDuration(interval))
expr = strings.ReplaceAll(expr, "$__range_ms", strconv.FormatInt(rangeS*1000, 10))
expr = strings.ReplaceAll(expr, "$__range_s", strconv.FormatInt(rangeS, 10))
expr = strings.ReplaceAll(expr, "$__range", strconv.FormatInt(rangeS, 10)+"s")
expr = strings.ReplaceAll(expr, "$__rate_interval", intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
expr = strings.ReplaceAll(expr, varIntervalMs, strconv.FormatInt(intervalMs, 10))
expr = strings.ReplaceAll(expr, varInterval, intervalv2.FormatDuration(interval))
expr = strings.ReplaceAll(expr, varRangeMs, strconv.FormatInt(rangeS*1000, 10))
expr = strings.ReplaceAll(expr, varRangeS, strconv.FormatInt(rangeS, 10))
expr = strings.ReplaceAll(expr, varRange, strconv.FormatInt(rangeS, 10)+"s")
expr = strings.ReplaceAll(expr, varRateInterval, intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
rangeQuery := model.RangeQuery
if !model.InstantQuery && !model.RangeQuery {

View File

@ -384,7 +384,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
},
}
value[Exemplar] = exemplars
value[ExemplarQueryType] = exemplars
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
@ -413,7 +413,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
{Value: 5, Timestamp: 5000},
}
value := make(map[PrometheusQueryType]interface{})
value[Range] = p.Matrix{
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
@ -442,7 +442,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
t.Run("vector response should be parsed normally", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value[Range] = p.Vector{
value[RangeQueryType] = p.Vector{
&p.Sample{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Value: 1,
@ -473,7 +473,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
t.Run("scalar response should be parsed normally", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value[Range] = &p.Scalar{
value[RangeQueryType] = &p.Scalar{
Value: 1,
Timestamp: 1000,
}

View File

@ -49,7 +49,7 @@ type QueryModel struct {
type PrometheusQueryType string
const (
Range PrometheusQueryType = "range"
Instant PrometheusQueryType = "instant"
Exemplar PrometheusQueryType = "exemplar"
RangeQueryType PrometheusQueryType = "range"
InstantQueryType PrometheusQueryType = "instant"
ExemplarQueryType PrometheusQueryType = "exemplar"
)

View File

@ -316,10 +316,7 @@ export class PrometheusDatasource extends DataSourceWithBackend<PromQuery, PromO
}
query(request: DataQueryRequest<PromQuery>): Observable<DataQueryResponse> {
// WIP - currently we want to run trough backend only if all queries are explore + range/instant queries
const shouldRunBackendQuery = this.access === 'proxy' && request.app === CoreApp.Explore;
if (shouldRunBackendQuery) {
if (this.access === 'proxy') {
const targets = request.targets.map((target) => this.processTargetV2(target, request));
return super
.query({ ...request, targets })

View File

@ -1,5 +1,5 @@
import { DataFrame, FieldType, DataQueryRequest, DataQueryResponse, MutableDataFrame } from '@grafana/data';
import { transform, transformV2, transformDFoTable } from './result_transformer';
import { transform, transformV2, transformDFToTable } from './result_transformer';
import { PromQuery } from './types';
jest.mock('@grafana/runtime', () => ({
@ -34,7 +34,7 @@ const matrixResponse = {
describe('Prometheus Result Transformer', () => {
describe('transformV2', () => {
it('results with time_series format should be enriched with preferredVisualisationType', () => {
const options = ({
const request = ({
targets: [
{
format: 'time_series',
@ -53,7 +53,7 @@ describe('Prometheus Result Transformer', () => {
},
],
} as unknown) as DataQueryResponse;
const series = transformV2(response, options, {});
const series = transformV2(response, request, {});
expect(series).toEqual({
data: [{ fields: [], length: 2, meta: { preferredVisualisationType: 'graph' }, name: 'ALERTS', refId: 'A' }],
state: 'Done',
@ -61,7 +61,7 @@ describe('Prometheus Result Transformer', () => {
});
it('results with table format should be transformed to table dataFrames', () => {
const options = ({
const request = ({
targets: [
{
format: 'table',
@ -86,15 +86,65 @@ describe('Prometheus Result Transformer', () => {
}),
],
} as unknown) as DataQueryResponse;
const series = transformV2(response, options, {});
// expect(series.data[0]).toBe({});
expect(series.data[0].fields[0].name).toEqual('time');
const series = transformV2(response, request, {});
expect(series.data[0].fields[0].name).toEqual('Time');
expect(series.data[0].fields[1].name).toEqual('label1');
expect(series.data[0].fields[2].name).toEqual('label2');
expect(series.data[0].fields[3].name).toEqual('Value');
expect(series.data[0].meta?.preferredVisualisationType).toEqual('table');
});
it('results with table format and multiple data frames should be transformed to 1 table dataFrame', () => {
const request = ({
targets: [
{
format: 'table',
refId: 'A',
},
],
} as unknown) as DataQueryRequest<PromQuery>;
const response = ({
state: 'Done',
data: [
new MutableDataFrame({
refId: 'A',
fields: [
{ name: 'time', type: FieldType.time, values: [6, 5, 4] },
{
name: 'value',
type: FieldType.number,
values: [6, 5, 4],
labels: { label1: 'value1', label2: 'value2' },
},
],
}),
new MutableDataFrame({
refId: 'A',
fields: [
{ name: 'time', type: FieldType.time, values: [2, 3, 7] },
{
name: 'value',
type: FieldType.number,
values: [2, 3, 7],
labels: { label3: 'value3', label4: 'value4' },
},
],
}),
],
} as unknown) as DataQueryResponse;
const series = transformV2(response, request, {});
expect(series.data.length).toEqual(1);
expect(series.data[0].fields[0].name).toEqual('Time');
expect(series.data[0].fields[1].name).toEqual('label1');
expect(series.data[0].fields[2].name).toEqual('label2');
expect(series.data[0].fields[3].name).toEqual('label3');
expect(series.data[0].fields[4].name).toEqual('label4');
expect(series.data[0].fields[5].name).toEqual('Value #A');
expect(series.data[0].meta?.preferredVisualisationType).toEqual('table');
});
it('results with table and time_series format should be correctly transformed', () => {
const options = ({
targets: [
@ -143,8 +193,66 @@ describe('Prometheus Result Transformer', () => {
expect(series.data[1].fields.length).toEqual(4);
expect(series.data[1].meta?.preferredVisualisationType).toEqual('table');
});
it('results with heatmap format should be correctly transformed', () => {
const options = ({
targets: [
{
format: 'heatmap',
refId: 'A',
},
],
} as unknown) as DataQueryRequest<PromQuery>;
const response = ({
state: 'Done',
data: [
new MutableDataFrame({
refId: 'A',
fields: [
{ name: 'Time', type: FieldType.time, values: [6, 5, 4] },
{
name: 'Value',
type: FieldType.number,
values: [10, 10, 0],
labels: { le: '1' },
},
],
}),
new MutableDataFrame({
refId: 'A',
fields: [
{ name: 'Time', type: FieldType.time, values: [6, 5, 4] },
{
name: 'Value',
type: FieldType.number,
values: [20, 10, 30],
labels: { le: '2' },
},
],
}),
new MutableDataFrame({
refId: 'A',
fields: [
{ name: 'Time', type: FieldType.time, values: [6, 5, 4] },
{
name: 'Value',
type: FieldType.number,
values: [30, 10, 40],
labels: { le: '3' },
},
],
}),
],
} as unknown) as DataQueryResponse;
const series = transformV2(response, options, {});
expect(series.data[0].fields.length).toEqual(2);
expect(series.data[0].fields[1].values.toArray()).toEqual([10, 10, 0]);
expect(series.data[1].fields[1].values.toArray()).toEqual([10, 0, 30]);
expect(series.data[2].fields[1].values.toArray()).toEqual([10, 0, 10]);
});
});
describe('transformDFoTable', () => {
describe('transformDFToTable', () => {
it('transforms dataFrame with response length 1 to table dataFrame', () => {
const df = new MutableDataFrame({
refId: 'A',
@ -159,9 +267,9 @@ describe('Prometheus Result Transformer', () => {
],
});
const tableDf = transformDFoTable(df, 1);
const tableDf = transformDFToTable([df])[0];
expect(tableDf.fields.length).toBe(4);
expect(tableDf.fields[0].name).toBe('time');
expect(tableDf.fields[0].name).toBe('Time');
expect(tableDf.fields[1].name).toBe('label1');
expect(tableDf.fields[1].values.get(0)).toBe('value1');
expect(tableDf.fields[2].name).toBe('label2');
@ -183,14 +291,14 @@ describe('Prometheus Result Transformer', () => {
],
});
const tableDf = transformDFoTable(df, 3);
const tableDf = transformDFToTable([df])[0];
expect(tableDf.fields.length).toBe(4);
expect(tableDf.fields[0].name).toBe('time');
expect(tableDf.fields[0].name).toBe('Time');
expect(tableDf.fields[1].name).toBe('label1');
expect(tableDf.fields[1].values.get(0)).toBe('value1');
expect(tableDf.fields[2].name).toBe('label2');
expect(tableDf.fields[2].values.get(0)).toBe('value2');
expect(tableDf.fields[3].name).toBe('Value #A');
expect(tableDf.fields[3].name).toBe('Value');
});
});

View File

@ -19,7 +19,7 @@ import {
CoreApp,
} from '@grafana/data';
import { FetchResponse, getDataSourceSrv, getTemplateSrv } from '@grafana/runtime';
import { partition } from 'lodash';
import { partition, groupBy } from 'lodash';
import { descending, deviation } from 'd3';
import {
ExemplarTraceIdDestination,
@ -50,11 +50,12 @@ const isTableResult = (dataFrame: DataFrame, options: DataQueryRequest<PromQuery
// We want to process all dataFrames with target.format === 'table' as table
const target = options.targets.find((target) => target.refId === dataFrame.refId);
if (target?.format === 'table') {
return true;
}
return target?.format === 'table';
};
return false;
const isHeatmapResult = (dataFrame: DataFrame, options: DataQueryRequest<PromQuery>): boolean => {
const target = options.targets.find((target) => target.refId === dataFrame.refId);
return target?.format === 'heatmap';
};
// V2 result trasnformer used to transform query results from queries that were run trough prometheus backend
@ -63,25 +64,22 @@ export function transformV2(
request: DataQueryRequest<PromQuery>,
options: { exemplarTraceIdDestinations?: ExemplarTraceIdDestination[] }
) {
const [tableResults, results]: [DataFrame[], DataFrame[]] = partition(response.data, (dataFrame) =>
isTableResult(dataFrame, request)
const [tableFrames, framesWithoutTable] = partition<DataFrame>(response.data, (df) => isTableResult(df, request));
const processedTableFrames = transformDFToTable(tableFrames);
const [heatmapResults, framesWithoutTableAndHeatmaps] = partition<DataFrame>(framesWithoutTable, (df) =>
isHeatmapResult(df, request)
);
const processedHeatmapFrames = transformToHistogramOverTime(heatmapResults.sort(sortSeriesByLabel));
// TABLE FRAMES: For table results, we need to transform data frames to table data frames
const responseLength = request.targets.filter((target) => !target.hide).length;
const tableFrames = tableResults.map((dataFrame) => {
const df = transformDFoTable(dataFrame, responseLength);
return df;
});
const [exemplarResults, otherResults]: [DataFrame[], DataFrame[]] = partition(
results,
(dataFrame) => dataFrame.meta?.custom?.resultType === 'exemplar'
const [exemplarFrames, framesWithoutTableHeatmapsAndExemplars] = partition<DataFrame>(
framesWithoutTableAndHeatmaps,
(df) => df.meta?.custom?.resultType === 'exemplar'
);
// EXEMPLAR FRAMES: We enrich exemplar frames with data links and add dataTopic meta info
const { exemplarTraceIdDestinations: destinations } = options;
const exemplarFrames = exemplarResults.map((dataFrame) => {
const processedExemplarFrames = exemplarFrames.map((dataFrame) => {
if (destinations?.length) {
for (const exemplarTraceIdDestination of destinations) {
const traceIDField = dataFrame.fields.find((field) => field.name === exemplarTraceIdDestination.name);
@ -97,8 +95,8 @@ export function transformV2(
return { ...dataFrame, meta: { ...dataFrame.meta, dataTopic: DataTopic.Annotations } };
});
// OTHER FRAMES: Everything else is processed as time_series result and graph preferredVisualisationType
const otherFrames = otherResults.map((dataFrame) => {
// Everything else is processed as time_series result and graph preferredVisualisationType
const otherFrames = framesWithoutTableHeatmapsAndExemplars.map((dataFrame) => {
const df = {
...dataFrame,
meta: {
@ -109,52 +107,68 @@ export function transformV2(
return df;
});
return { ...response, data: [...otherFrames, ...tableFrames, ...exemplarFrames] };
return {
...response,
data: [...otherFrames, ...processedTableFrames, ...processedHeatmapFrames, ...processedExemplarFrames],
};
}
export function transformDFoTable(df: DataFrame, responseLength: number): DataFrame {
if (df.length === 0) {
return df;
export function transformDFToTable(dfs: DataFrame[]): DataFrame[] {
// If no dataFrames or if 1 dataFrames with no values, return original dataFrame
if (dfs.length === 0 || (dfs.length === 1 && dfs[0].length === 0)) {
return dfs;
}
const timeField = df.fields[0];
const valueField = df.fields[1];
// Group results by refId and process dataFrames with the same refId as 1 dataFrame
const dataFramesByRefId = groupBy(dfs, 'refId');
// Create label fields
const promLabels: PromMetric = valueField.labels ?? {};
const labelFields = Object.keys(promLabels)
.sort()
.map((label) => {
const numberField = label === 'le';
return {
name: label,
config: { filterable: true },
type: numberField ? FieldType.number : FieldType.string,
values: new ArrayVector(),
};
const frames = Object.keys(dataFramesByRefId).map((refId) => {
// Create timeField, valueField and labelFields
const valueText = getValueText(dfs.length, refId);
const valueField = getValueField({ data: [], valueName: valueText });
const timeField = getTimeField([]);
const labelFields: MutableField[] = [];
// Fill labelsFields with labels from dataFrames
dataFramesByRefId[refId].forEach((df) => {
const frameValueField = df.fields[1];
const promLabels = frameValueField.labels ?? {};
Object.keys(promLabels)
.sort()
.forEach((label) => {
// If we don't have label in labelFields, add it
if (!labelFields.some((l) => l.name === label)) {
const numberField = label === 'le';
labelFields.push({
name: label,
config: { filterable: true },
type: numberField ? FieldType.number : FieldType.string,
values: new ArrayVector(),
});
}
});
});
// Fill labelFields with label values
labelFields.forEach((field) => field.values.add(getLabelValue(promLabels, field.name)));
// Fill valueField, timeField and labelFields with values
dataFramesByRefId[refId].forEach((df) => {
df.fields[0].values.toArray().forEach((value) => timeField.values.add(value));
df.fields[1].values.toArray().forEach((value) => {
valueField.values.add(parseSampleValue(value));
const labelsForField = df.fields[1].labels ?? {};
labelFields.forEach((field) => field.values.add(getLabelValue(labelsForField, field.name)));
});
});
const tableDataFrame = {
...df,
name: undefined,
meta: { ...df.meta, preferredVisualisationType: 'table' as PreferredVisualisationType },
fields: [
timeField,
...labelFields,
{
...valueField,
name: getValueText(responseLength, df.refId),
labels: undefined,
config: { ...valueField.config, displayNameFromDS: undefined },
state: { ...valueField.state, displayName: undefined },
},
],
};
return tableDataFrame;
const fields = [timeField, ...labelFields, valueField];
return {
refId,
fields,
meta: { ...dfs[0].meta, preferredVisualisationType: 'table' as PreferredVisualisationType },
length: timeField.values.length,
};
});
return frames;
}
function getValueText(responseLength: number, refId = '') {