From c26374b0b26a1d44cc745cfd82b465db8e84a50b Mon Sep 17 00:00:00 2001 From: kay delaney <45561153+kaydelaney@users.noreply.github.com> Date: Mon, 11 May 2020 14:44:13 +0100 Subject: [PATCH] Datasource/CloudWatch: Usability improvements (#24447) * Datasource/CloudWatch: Improve handling of long-running queries * Datasource/CloudWatch: Make order of dataframe fields consistent --- pkg/tsdb/cloudwatch/log_query.go | 50 ++++---- .../datasource/cloudwatch/datasource.ts | 26 +++- .../cloudwatch/specs/datasource.test.ts | 121 +++++++++++++++++- 3 files changed, 163 insertions(+), 34 deletions(-) diff --git a/pkg/tsdb/cloudwatch/log_query.go b/pkg/tsdb/cloudwatch/log_query.go index 43cc179e9cc..4c3ba380c01 100644 --- a/pkg/tsdb/cloudwatch/log_query.go +++ b/pkg/tsdb/cloudwatch/log_query.go @@ -10,6 +10,11 @@ import ( func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) { rowCount := len(response.Results) fieldValues := make(map[string]interface{}) + + // Maintaining a list of field names in the order returned from CloudWatch + // as just iterating over fieldValues would not give a consistent order + fieldNames := make([]*string, 0) + for i, row := range response.Results { for _, resultField := range row { // Strip @ptr field from results as it's not needed @@ -17,48 +22,37 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d continue } - if *resultField.Field == "@timestamp" { - if _, exists := fieldValues[*resultField.Field]; !exists { - fieldValues[*resultField.Field] = make([]*time.Time, rowCount) - } + if _, exists := fieldValues[*resultField.Field]; !exists { + fieldNames = append(fieldNames, resultField.Field) + // Check if field is time field + if _, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value); err == nil { + fieldValues[*resultField.Field] = make([]*time.Time, rowCount) + } else { + fieldValues[*resultField.Field] = make([]*string, rowCount) + } + } + + if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok { parsedTime, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value) if err != nil { return nil, err } - fieldValues[*resultField.Field].([]*time.Time)[i] = &parsedTime + timeField[i] = &parsedTime } else { - if _, exists := fieldValues[*resultField.Field]; !exists { - // Check if field is time field - if _, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value); err == nil { - fieldValues[*resultField.Field] = make([]*time.Time, rowCount) - } else { - fieldValues[*resultField.Field] = make([]*string, rowCount) - } - } - - if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok { - parsedTime, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value) - if err != nil { - return nil, err - } - - timeField[i] = &parsedTime - } else { - fieldValues[*resultField.Field].([]*string)[i] = resultField.Value - } + fieldValues[*resultField.Field].([]*string)[i] = resultField.Value } } } newFields := make([]*data.Field, 0) - for fieldName, vals := range fieldValues { - newFields = append(newFields, data.NewField(fieldName, nil, vals)) + for _, fieldName := range fieldNames { + newFields = append(newFields, data.NewField(*fieldName, nil, fieldValues[*fieldName])) - if fieldName == "@timestamp" { + if *fieldName == "@timestamp" { newFields[len(newFields)-1].SetConfig(&data.FieldConfig{Title: "Time"}) - } else if fieldName == "@logStream" || fieldName == "@log" { + } else if *fieldName == "@logStream" || *fieldName == "@log" { newFields[len(newFields)-1].SetConfig( &data.FieldConfig{ Custom: map[string]interface{}{ diff --git a/public/app/plugins/datasource/cloudwatch/datasource.ts b/public/app/plugins/datasource/cloudwatch/datasource.ts index dc310b97737..d5fe7e9435f 100644 --- a/public/app/plugins/datasource/cloudwatch/datasource.ts +++ b/public/app/plugins/datasource/cloudwatch/datasource.ts @@ -65,7 +65,7 @@ const displayCustomError = (title: string, message: string) => store.dispatch(notifyApp(createErrorNotification(title, message))); // TODO: Temporary times here, could just change to some fixed number. -const MAX_ATTEMPTS = 8; +export const MAX_ATTEMPTS = 8; const POLLING_TIMES = [100, 200, 500, 1000]; export class CloudWatchDatasource extends DataSourceApi { @@ -199,15 +199,35 @@ export class CloudWatchDatasource extends DataSourceApi): Observable { this.logQueries.clear(); queryParams.forEach(param => this.logQueries.add({ id: param.queryId, region: param.region })); + let prevRecordsMatched: Record = {}; return withTeardown( this.makeLogActionRequest('GetQueryResults', queryParams).pipe( expand((dataFrames, i) => { - return dataFrames.every( + const allFramesCompleted = dataFrames.every( dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete - ) || i >= MAX_ATTEMPTS + ); + return allFramesCompleted ? empty() : this.makeLogActionRequest('GetQueryResults', queryParams).pipe( + map(frames => { + let moreRecordsMatched = false; + for (const frame of frames) { + const recordsMatched = frame.meta?.custom?.['Statistics']['RecordsMatched']; + if (recordsMatched > (prevRecordsMatched[frame.refId!] ?? 0)) { + moreRecordsMatched = true; + } + prevRecordsMatched[frame.refId!] = recordsMatched; + } + const noProgressMade = i >= MAX_ATTEMPTS - 2 && !moreRecordsMatched; + if (noProgressMade) { + for (const frame of frames) { + _.set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Complete); + } + } + + return frames; + }), delay(POLLING_TIMES[Math.min(i, POLLING_TIMES.length - 1)]) ); }), diff --git a/public/app/plugins/datasource/cloudwatch/specs/datasource.test.ts b/public/app/plugins/datasource/cloudwatch/specs/datasource.test.ts index a26480bde35..e8c0ba70fef 100644 --- a/public/app/plugins/datasource/cloudwatch/specs/datasource.test.ts +++ b/public/app/plugins/datasource/cloudwatch/specs/datasource.test.ts @@ -1,14 +1,27 @@ import '../datasource'; -import { CloudWatchDatasource } from '../datasource'; +import { CloudWatchDatasource, MAX_ATTEMPTS } from '../datasource'; import * as redux from 'app/store/store'; -import { DataSourceInstanceSettings, dateMath, getFrameDisplayTitle, DataQueryResponse } from '@grafana/data'; +import { + DataSourceInstanceSettings, + dateMath, + getFrameDisplayTitle, + DataFrame, + DataQueryResponse, +} from '@grafana/data'; import { TemplateSrv } from 'app/features/templating/template_srv'; import { CustomVariable } from 'app/features/templating/all'; -import { CloudWatchQuery, CloudWatchMetricsQuery } from '../types'; +import { CloudWatchQuery, CloudWatchMetricsQuery, CloudWatchLogsQueryStatus, LogAction } from '../types'; import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__ import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; import { convertToStoreState } from '../../../../../test/helpers/convertToStoreState'; import { getTemplateSrvDependencies } from 'test/helpers/getTemplateSrvDependencies'; +import { of } from 'rxjs'; + +jest.mock('rxjs/operators', () => { + const operators = jest.requireActual('rxjs/operators'); + operators.delay = jest.fn(() => (s: any) => s); + return operators; +}); jest.mock('@grafana/runtime', () => ({ ...jest.requireActual('@grafana/runtime'), @@ -156,6 +169,86 @@ describe('CloudWatchDatasource', () => { ], }); }); + it('should stop querying when no more data retrieved past max attempts', async () => { + const fakeFrames = genMockFrames(10); + for (let i = 7; i < fakeFrames.length; i++) { + fakeFrames[i].meta!.custom!['Statistics']['RecordsMatched'] = fakeFrames[6].meta!.custom!['Statistics'][ + 'RecordsMatched' + ]; + } + + let i = 0; + jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => { + if (subtype === 'GetQueryResults') { + const mockObservable = of([fakeFrames[i]]); + i++; + return mockObservable; + } else { + return of([]); + } + }); + + const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise(); + + const expectedData = [ + { + ...fakeFrames[MAX_ATTEMPTS - 1], + meta: { custom: { ...fakeFrames[MAX_ATTEMPTS - 1].meta!.custom, Status: 'Complete' } }, + }, + ]; + expect(myResponse).toEqual({ + data: expectedData, + key: 'test-key', + state: 'Done', + }); + expect(i).toBe(MAX_ATTEMPTS); + }); + + it('should continue querying as long as new data is being received', async () => { + const fakeFrames = genMockFrames(15); + + let i = 0; + jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => { + if (subtype === 'GetQueryResults') { + const mockObservable = of([fakeFrames[i]]); + i++; + return mockObservable; + } else { + return of([]); + } + }); + + const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise(); + expect(myResponse).toEqual({ + data: [fakeFrames[fakeFrames.length - 1]], + key: 'test-key', + state: 'Done', + }); + expect(i).toBe(15); + }); + + it('should stop querying when results come back with status "Complete"', async () => { + const fakeFrames = genMockFrames(3); + let i = 0; + jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => { + if (subtype === 'GetQueryResults') { + const mockObservable = of([fakeFrames[i]]); + i++; + return mockObservable; + } else { + return of([]); + } + }); + + const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise(); + + expect(myResponse).toEqual({ + data: [fakeFrames[2]], + key: 'test-key', + state: 'Done', + }); + expect(i).toBe(3); + }); }); describe('When performing CloudWatch metrics query', () => { @@ -925,3 +1018,25 @@ describe('CloudWatchDatasource', () => { } ); }); + +function genMockFrames(numResponses: number): DataFrame[] { + const recordIncrement = 50; + const mockFrames: DataFrame[] = []; + + for (let i = 0; i < numResponses; i++) { + mockFrames.push({ + fields: [], + meta: { + custom: { + Status: i === numResponses - 1 ? CloudWatchLogsQueryStatus.Complete : CloudWatchLogsQueryStatus.Running, + Statistics: { + RecordsMatched: (i + 1) * recordIncrement, + }, + }, + }, + length: 0, + }); + } + + return mockFrames; +}