From 79c06fdddc9ddfa03ee17ef83165e1efef09e6ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Farkas?= Date: Wed, 20 Apr 2022 13:52:15 +0200 Subject: [PATCH] Loki: add backend-forward mode to queries, update log-row-context (#47726) * loki: add helper function to sort dataframe by time * loki: add direction-attribute to queries * loki: make log-row-context code backward-compatible * better comment Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> * fixed test * simplified code Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> --- pkg/tsdb/loki/api.go | 2 + pkg/tsdb/loki/framing_test.go | 8 +- pkg/tsdb/loki/loki.go | 1 + pkg/tsdb/loki/parse_query.go | 21 +++ pkg/tsdb/loki/types.go | 8 ++ .../datasource/loki/datasource.test.ts | 10 +- .../app/plugins/datasource/loki/datasource.ts | 120 +++++++++++------- .../loki/result_transformer.test.ts | 4 +- .../datasource/loki/result_transformer.ts | 20 +-- .../datasource/loki/sortDataFrame.test.ts | 52 ++++++++ .../plugins/datasource/loki/sortDataFrame.ts | 70 ++++++++++ public/app/plugins/datasource/loki/types.ts | 6 + 12 files changed, 250 insertions(+), 72 deletions(-) create mode 100644 public/app/plugins/datasource/loki/sortDataFrame.test.ts create mode 100644 public/app/plugins/datasource/loki/sortDataFrame.ts diff --git a/pkg/tsdb/loki/api.go b/pkg/tsdb/loki/api.go index 08e3b1eb8c5..87e340dfc8c 100644 --- a/pkg/tsdb/loki/api.go +++ b/pkg/tsdb/loki/api.go @@ -29,6 +29,8 @@ func makeRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http. qs := url.Values{} qs.Set("query", query.Expr) + qs.Set("direction", string(query.Direction)) + // MaxLines defaults to zero when not received, // and Loki does not like limit=0, even when it is not needed // (for example for metric queries), so we diff --git a/pkg/tsdb/loki/framing_test.go b/pkg/tsdb/loki/framing_test.go index 696cc5d3fb6..8525a7f0673 100644 --- a/pkg/tsdb/loki/framing_test.go +++ b/pkg/tsdb/loki/framing_test.go @@ -22,9 +22,9 @@ import ( // but i wanted to test for all of them, to be sure. func TestSuccessResponse(t *testing.T) { - matrixQuery := lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42, QueryType: QueryTypeRange} - vectorQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeInstant} - streamsQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeRange} + matrixQuery := lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42, QueryType: QueryTypeRange, Direction: DirectionBackward} + vectorQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeInstant, Direction: DirectionBackward} + streamsQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeRange, Direction: DirectionBackward} tt := []struct { name string @@ -119,7 +119,7 @@ func TestErrorResponse(t *testing.T) { for _, test := range tt { t.Run(test.name, func(t *testing.T) { - frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body), &lokiQuery{QueryType: QueryTypeRange}) + frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}) require.Len(t, frames, 0) require.Error(t, err) diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index 59cc30eb7e1..e2112d33623 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -53,6 +53,7 @@ type datasourceInfo struct { type QueryJSONModel struct { QueryType string `json:"queryType"` Expr string `json:"expr"` + Direction string `json:"direction"` LegendFormat string `json:"legendFormat"` Interval string `json:"interval"` IntervalMS int `json:"intervalMS"` diff --git a/pkg/tsdb/loki/parse_query.go b/pkg/tsdb/loki/parse_query.go index d62802e5f06..80f0c0cbc27 100644 --- a/pkg/tsdb/loki/parse_query.go +++ b/pkg/tsdb/loki/parse_query.go @@ -67,6 +67,21 @@ func parseQueryType(jsonValue string) (QueryType, error) { } } +func parseDirection(jsonValue string) (Direction, error) { + switch jsonValue { + case "backward": + return DirectionBackward, nil + case "forward": + return DirectionForward, nil + case "": + // there are older queries stored in alerting that did not have queryDirection, + // we default to "backward" + return DirectionBackward, nil + default: + return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue) + } +} + func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) { qs := []*lokiQuery{} for _, query := range queryContext.Queries { @@ -95,9 +110,15 @@ func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) { return nil, err } + direction, err := parseDirection(model.Direction) + if err != nil { + return nil, err + } + qs = append(qs, &lokiQuery{ Expr: expr, QueryType: queryType, + Direction: direction, Step: step, MaxLines: model.MaxLines, LegendFormat: model.LegendFormat, diff --git a/pkg/tsdb/loki/types.go b/pkg/tsdb/loki/types.go index 309d2ef8246..9d5fcead26d 100644 --- a/pkg/tsdb/loki/types.go +++ b/pkg/tsdb/loki/types.go @@ -9,9 +9,17 @@ const ( QueryTypeInstant QueryType = "instant" ) +type Direction string + +const ( + DirectionBackward Direction = "backward" + DirectionForward Direction = "forward" +) + type lokiQuery struct { Expr string QueryType QueryType + Direction Direction Step time.Duration MaxLines int LegendFormat string diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index e7f84a0a51f..38ebf6cedfd 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -943,9 +943,9 @@ describe('LokiDatasource', () => { dataFrame: new MutableDataFrame({ fields: [ { - name: 'tsNs', - type: FieldType.string, - values: ['0'], + name: 'ts', + type: FieldType.time, + values: [0], }, ], }), @@ -957,8 +957,8 @@ describe('LokiDatasource', () => { jest.spyOn(ds.languageProvider, 'getLabelKeys').mockImplementation(() => ['bar']); const contextQuery = ds.prepareLogRowContextQueryTarget(row, 10, 'BACKWARD'); - expect(contextQuery.expr).toContain('baz'); - expect(contextQuery.expr).not.toContain('uniqueParsedLabel'); + expect(contextQuery.query.expr).toContain('baz'); + expect(contextQuery.query.expr).not.toContain('uniqueParsedLabel'); }); }); diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index 9de4a813dbc..476a1b95a17 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -33,24 +33,21 @@ import { ScopedVars, TimeRange, rangeUtil, + toUtc, } from '@grafana/data'; import { BackendSrvRequest, FetchError, getBackendSrv, config, DataSourceWithBackend } from '@grafana/runtime'; import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv'; import { addLabelToQuery } from './add_label_to_query'; import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { convertToWebSocketUrl } from 'app/core/utils/explore'; -import { - lokiResultsToTableModel, - lokiStreamsToDataFrames, - lokiStreamsToRawDataFrame, - processRangeQueryResponse, -} from './result_transformer'; +import { lokiResultsToTableModel, lokiStreamsToDataFrames, processRangeQueryResponse } from './result_transformer'; import { transformBackendResult } from './backendResultTransformer'; import { addParsedLabelToQuery, getNormalizedLokiQuery, queryHasPipeParser } from './query_utils'; import { LokiOptions, LokiQuery, + LokiQueryDirection, LokiQueryType, LokiRangeQueryRequest, LokiResultType, @@ -65,6 +62,7 @@ import { DEFAULT_RESOLUTION } from './components/LokiOptionFields'; import { queryLogsVolume } from 'app/core/logs_model'; import { doLokiChannelStream } from './streaming'; import { renderLegendFormat } from '../prometheus/legend'; +import { sortDataFrameByTime } from './sortDataFrame'; export type RangeQueryOptions = DataQueryRequest | AnnotationQueryRequest; export const DEFAULT_MAX_LINES = 1000; @@ -75,7 +73,6 @@ const RANGE_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query_range`; const INSTANT_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query`; const DEFAULT_QUERY_PARAMS: Partial = { - direction: 'BACKWARD', limit: DEFAULT_MAX_LINES, query: '', }; @@ -252,6 +249,7 @@ export class LokiDatasource query: target.expr, time: `${timeNs + (1e9 - (timeNs % 1e9))}`, limit: Math.min(queryLimit || Infinity, this.maxLines), + direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD', }; /** Used only for results of metrics instant queries */ @@ -311,6 +309,7 @@ export class LokiDatasource ...range, query, limit, + direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD', }; } @@ -554,15 +553,29 @@ export class LokiDatasource } getLogRowContext = (row: LogRowModel, options?: RowContextOptions): Promise<{ data: DataFrame[] }> => { - const target = this.prepareLogRowContextQueryTarget( - row, - (options && options.limit) || 10, - (options && options.direction) || 'BACKWARD' - ); + const direction = (options && options.direction) || 'BACKWARD'; + const limit = (options && options.limit) || 10; + const { query, range } = this.prepareLogRowContextQueryTarget(row, limit, direction); + + const sortResults = (result: DataQueryResponse): DataQueryResponse => { + return { + ...result, + data: result.data.map((frame: DataFrame) => { + const timestampFieldIndex = frame.fields.findIndex((field) => field.type === FieldType.time); + if (timestampFieldIndex === -1) { + return frame; + } + + return sortDataFrameByTime(frame, 'DESCENDING'); + }), + }; + }; + + // this can only be called from explore currently + const app = CoreApp.Explore; - const reverse = options && options.direction === 'FORWARD'; return lastValueFrom( - this._request(RANGE_QUERY_ENDPOINT, target).pipe( + this.query(makeRequest(query, range, app, `log-row-context-query-${direction}`)).pipe( catchError((err) => { const error: DataQueryError = { message: 'Error during context query. Please check JS console logs.', @@ -571,18 +584,18 @@ export class LokiDatasource }; throw error; }), - switchMap((res) => - of({ - data: res.data ? [lokiStreamsToRawDataFrame(res.data.data.result, reverse)] : [], - }) - ) + switchMap((res) => of(sortResults(res))) ) ); }; - prepareLogRowContextQueryTarget = (row: LogRowModel, limit: number, direction: 'BACKWARD' | 'FORWARD') => { + prepareLogRowContextQueryTarget = ( + row: LogRowModel, + limit: number, + direction: 'BACKWARD' | 'FORWARD' + ): { query: LokiQuery; range: TimeRange } => { const labels = this.languageProvider.getLabelKeys(); - const query = Object.keys(row.labels) + const expr = Object.keys(row.labels) .map((label: string) => { if (labels.includes(label)) { // escape backslashes in label as users can't escape them by themselves @@ -595,36 +608,49 @@ export class LokiDatasource .join(','); const contextTimeBuffer = 2 * 60 * 60 * 1000; // 2h buffer - const commonTargetOptions = { - limit, - query: `{${query}}`, - expr: `{${query}}`, - direction, + + const queryDirection = direction === 'FORWARD' ? LokiQueryDirection.Forward : LokiQueryDirection.Backward; + + const query: LokiQuery = { + expr: `{${expr}}`, + queryType: LokiQueryType.Range, + refId: '', + maxLines: limit, + direction: queryDirection, }; const fieldCache = new FieldCache(row.dataFrame); - const nsField = fieldCache.getFieldByName('tsNs')!; - const nsTimestamp = nsField.values.get(row.rowIndex); - - if (direction === 'BACKWARD') { - return { - ...commonTargetOptions, - // convert to ns, we loose some precision here but it is not that important at the far points of the context - start: row.timeEpochMs - contextTimeBuffer + '000000', - end: nsTimestamp, - direction, - }; - } else { - return { - ...commonTargetOptions, - // start param in Loki API is inclusive so we'll have to filter out the row that this request is based from - // and any other that were logged in the same ns but before the row. Right now these rows will be lost - // because the are before but came it he response that should return only rows after. - start: nsTimestamp, - // convert to ns, we loose some precision here but it is not that important at the far points of the context - end: row.timeEpochMs + contextTimeBuffer + '000000', - }; + const tsField = fieldCache.getFirstFieldOfType(FieldType.time); + if (tsField === undefined) { + throw new Error('loki: dataframe missing time-field, should never happen'); } + const tsValue = tsField.values.get(row.rowIndex); + const timestamp = toUtc(tsValue); + + const range = + queryDirection === LokiQueryDirection.Forward + ? { + // start param in Loki API is inclusive so we'll have to filter out the row that this request is based from + // and any other that were logged in the same ns but before the row. Right now these rows will be lost + // because the are before but came it he response that should return only rows after. + from: timestamp, + // convert to ns, we loose some precision here but it is not that important at the far points of the context + to: toUtc(row.timeEpochMs + contextTimeBuffer), + } + : { + // convert to ns, we loose some precision here but it is not that important at the far points of the context + from: toUtc(row.timeEpochMs - contextTimeBuffer), + to: timestamp, + }; + + return { + query, + range: { + from: range.from, + to: range.to, + raw: range, + }, + }; }; testDatasource() { diff --git a/public/app/plugins/datasource/loki/result_transformer.test.ts b/public/app/plugins/datasource/loki/result_transformer.test.ts index db35da689a6..c43d3f99b0d 100644 --- a/public/app/plugins/datasource/loki/result_transformer.test.ts +++ b/public/app/plugins/datasource/loki/result_transformer.test.ts @@ -112,7 +112,7 @@ describe('loki result transformer', () => { }); it('should append refId to the unique ids if refId is provided', () => { - const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, false, 'B'); + const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, 'B'); expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc_B'); expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1_B'); }); @@ -121,7 +121,7 @@ describe('loki result transformer', () => { describe('lokiStreamsToDataFrames', () => { it('should enhance data frames', () => { jest.spyOn(ResultTransformer, 'enhanceDataFrame'); - const dataFrames = ResultTransformer.lokiStreamsToDataFrames(lokiResponse, { refId: 'B' }, 500, { + const dataFrames = ResultTransformer.lokiStreamsToDataFrames(lokiResponse, { refId: 'B', expr: '' }, 500, { derivedFields: [ { matcherRegex: 'trace=(w+)', diff --git a/public/app/plugins/datasource/loki/result_transformer.ts b/public/app/plugins/datasource/loki/result_transformer.ts index c71fdf9bc86..38df63a95f1 100644 --- a/public/app/plugins/datasource/loki/result_transformer.ts +++ b/public/app/plugins/datasource/loki/result_transformer.ts @@ -45,7 +45,7 @@ const UUID_NAMESPACE = '6ec946da-0f49-47a8-983a-1d76d17e7c92'; /** * Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries */ -export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], reverse?: boolean, refId?: string): DataFrame { +export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], refId?: string): DataFrame { const labels = new ArrayVector<{}>([]); const times = new ArrayVector([]); const timesNs = new ArrayVector([]); @@ -72,11 +72,11 @@ export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], reverse?: } } - return constructDataFrame(times, timesNs, lines, uids, labels, reverse, refId); + return constructDataFrame(times, timesNs, lines, uids, labels, refId); } /** - * Constructs dataFrame with supplied fields and other data. Also makes sure it is properly reversed if needed. + * Constructs dataFrame with supplied fields and other data. */ function constructDataFrame( times: ArrayVector, @@ -84,7 +84,6 @@ function constructDataFrame( lines: ArrayVector, uids: ArrayVector, labels: ArrayVector<{}>, - reverse?: boolean, refId?: string ) { const dataFrame = { @@ -99,12 +98,6 @@ function constructDataFrame( length: times.length, }; - if (reverse) { - const mutableDataFrame = new MutableDataFrame(dataFrame); - mutableDataFrame.reverse(); - return mutableDataFrame; - } - return dataFrame; } @@ -330,10 +323,9 @@ function lokiStatsToMetaStat(stats: LokiStats | undefined): QueryResultMetaStat[ export function lokiStreamsToDataFrames( response: LokiStreamResponse, - target: { refId: string; expr?: string }, + target: LokiQuery, limit: number, - config: LokiOptions, - reverse = false + config: LokiOptions ): DataFrame[] { const data = limit > 0 ? response.data.result : []; const stats: QueryResultMetaStat[] = lokiStatsToMetaStat(response.data.stats); @@ -350,7 +342,7 @@ export function lokiStreamsToDataFrames( preferredVisualisationType: 'logs', }; - const dataFrame = lokiStreamsToRawDataFrame(data, reverse, target.refId); + const dataFrame = lokiStreamsToRawDataFrame(data, target.refId); enhanceDataFrame(dataFrame, config); if (meta.custom && dataFrame.fields.some((f) => f.labels && Object.keys(f.labels).some((l) => l === '__error__'))) { diff --git a/public/app/plugins/datasource/loki/sortDataFrame.test.ts b/public/app/plugins/datasource/loki/sortDataFrame.test.ts new file mode 100644 index 00000000000..19b35cd7dc3 --- /dev/null +++ b/public/app/plugins/datasource/loki/sortDataFrame.test.ts @@ -0,0 +1,52 @@ +import { ArrayVector, DataFrame, FieldType } from '@grafana/data'; +import { sortDataFrameByTime } from './sortDataFrame'; + +const inputFrame: DataFrame = { + refId: 'A', + fields: [ + { + name: 'time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1005, 1001, 1004, 1002, 1003]), + }, + { + name: 'value', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line5', 'line1', 'line4', 'line2', 'line3']), + }, + { + name: 'tsNs', + type: FieldType.time, + config: {}, + values: new ArrayVector([`1005000000`, `1001000000`, `1004000000`, `1002000000`, `1003000000`]), + }, + ], + length: 5, +}; + +describe('loki sortDataFrame', () => { + it('sorts a dataframe ascending', () => { + const sortedFrame = sortDataFrameByTime(inputFrame, 'ASCENDING'); + expect(sortedFrame.length).toBe(5); + const timeValues = sortedFrame.fields[0].values.toArray(); + const lineValues = sortedFrame.fields[1].values.toArray(); + const tsNsValues = sortedFrame.fields[2].values.toArray(); + + expect(timeValues).toStrictEqual([1001, 1002, 1003, 1004, 1005]); + expect(lineValues).toStrictEqual(['line1', 'line2', 'line3', 'line4', 'line5']); + expect(tsNsValues).toStrictEqual([`1001000000`, `1002000000`, `1003000000`, `1004000000`, `1005000000`]); + }); + it('sorts a dataframe descending', () => { + const sortedFrame = sortDataFrameByTime(inputFrame, 'DESCENDING'); + expect(sortedFrame.length).toBe(5); + const timeValues = sortedFrame.fields[0].values.toArray(); + const lineValues = sortedFrame.fields[1].values.toArray(); + const tsNsValues = sortedFrame.fields[2].values.toArray(); + + expect(timeValues).toStrictEqual([1005, 1004, 1003, 1002, 1001]); + expect(lineValues).toStrictEqual(['line5', 'line4', 'line3', 'line2', 'line1']); + expect(tsNsValues).toStrictEqual([`1005000000`, `1004000000`, `1003000000`, `1002000000`, `1001000000`]); + }); +}); diff --git a/public/app/plugins/datasource/loki/sortDataFrame.ts b/public/app/plugins/datasource/loki/sortDataFrame.ts new file mode 100644 index 00000000000..c524de61160 --- /dev/null +++ b/public/app/plugins/datasource/loki/sortDataFrame.ts @@ -0,0 +1,70 @@ +import { DataFrame, Field, SortedVector } from '@grafana/data'; + +type SortDirection = 'ASCENDING' | 'DESCENDING'; + +// creates the `index` for the sorting. +// this is needed by the `SortedVector`. +// the index is an array of numbers, and it defines an order. +// at every slot in the index the values is the position of +// the sorted item. +// for example, an index of [3,1,2] means that +// in the dataframe, that has 3 rows, after sorting: +// - the third row will become the first +// - the first row will become the second +// - the second row will become the third +function makeIndex(field: Field, dir: SortDirection): number[] { + const fieldValues: string[] = field.values.toArray(); + + // we first build an array which is [0,1,2,3....] + const index = Array(fieldValues.length); + for (let i = 0; i < index.length; i++) { + index[i] = i; + } + + const isAsc = dir === 'ASCENDING'; + + index.sort((a: number, b: number): number => { + // we need to answer this question: + // in the field-used-for-sorting, how would we compare value-at-index-a to value-at-index-b? + const valA = fieldValues[a]; + const valB = fieldValues[b]; + if (valA < valB) { + return isAsc ? -1 : 1; + } + + if (valA > valB) { + return isAsc ? 1 : -1; + } + + return 0; + }); + + return index; +} + +// sort a dataframe that is in the Loki format ascending or descending, +// based on the nanosecond-timestamp +export function sortDataFrameByTime(frame: DataFrame, dir: SortDirection): DataFrame { + const { fields, ...rest } = frame; + + // we use the approach used in @grafana/data/sortDataframe. + // we cannot use it directly, because our tsNs field has a type=time, + // so we have to build the `index` manually. + + const tsNsField = fields.find((field) => field.name === 'tsNs'); + if (tsNsField === undefined) { + throw new Error('missing nanosecond-timestamp field. should never happen'); + } + + const index = makeIndex(tsNsField, dir); + + return { + ...rest, + fields: fields.map((field) => ({ + ...field, + values: new SortedVector(field.values, index), + })), + }; + + return frame; +} diff --git a/public/app/plugins/datasource/loki/types.ts b/public/app/plugins/datasource/loki/types.ts index 1563669632c..59d0456cddd 100644 --- a/public/app/plugins/datasource/loki/types.ts +++ b/public/app/plugins/datasource/loki/types.ts @@ -29,9 +29,15 @@ export enum LokiQueryType { Stream = 'stream', } +export enum LokiQueryDirection { + Backward = 'backward', + Forward = 'forward', +} + export interface LokiQuery extends DataQuery { queryType?: LokiQueryType; expr: string; + direction?: LokiQueryDirection; legendFormat?: string; maxLines?: number; resolution?: number;