mirror of
https://github.com/grafana/grafana.git
synced 2025-09-25 21:34:01 +08:00
Datasource/CloudWatch: Usability improvements (#24447)
* Datasource/CloudWatch: Improve handling of long-running queries * Datasource/CloudWatch: Make order of dataframe fields consistent
This commit is contained in:
@ -10,6 +10,11 @@ import (
|
|||||||
func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) {
|
func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) {
|
||||||
rowCount := len(response.Results)
|
rowCount := len(response.Results)
|
||||||
fieldValues := make(map[string]interface{})
|
fieldValues := make(map[string]interface{})
|
||||||
|
|
||||||
|
// Maintaining a list of field names in the order returned from CloudWatch
|
||||||
|
// as just iterating over fieldValues would not give a consistent order
|
||||||
|
fieldNames := make([]*string, 0)
|
||||||
|
|
||||||
for i, row := range response.Results {
|
for i, row := range response.Results {
|
||||||
for _, resultField := range row {
|
for _, resultField := range row {
|
||||||
// Strip @ptr field from results as it's not needed
|
// Strip @ptr field from results as it's not needed
|
||||||
@ -17,48 +22,37 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if *resultField.Field == "@timestamp" {
|
if _, exists := fieldValues[*resultField.Field]; !exists {
|
||||||
if _, exists := fieldValues[*resultField.Field]; !exists {
|
fieldNames = append(fieldNames, resultField.Field)
|
||||||
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Check if field is time field
|
||||||
|
if _, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value); err == nil {
|
||||||
|
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
|
||||||
|
} else {
|
||||||
|
fieldValues[*resultField.Field] = make([]*string, rowCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok {
|
||||||
parsedTime, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value)
|
parsedTime, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldValues[*resultField.Field].([]*time.Time)[i] = &parsedTime
|
timeField[i] = &parsedTime
|
||||||
} else {
|
} else {
|
||||||
if _, exists := fieldValues[*resultField.Field]; !exists {
|
fieldValues[*resultField.Field].([]*string)[i] = resultField.Value
|
||||||
// Check if field is time field
|
|
||||||
if _, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value); err == nil {
|
|
||||||
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
|
|
||||||
} else {
|
|
||||||
fieldValues[*resultField.Field] = make([]*string, rowCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok {
|
|
||||||
parsedTime, err := time.Parse(CLOUDWATCH_TS_FORMAT, *resultField.Value)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
timeField[i] = &parsedTime
|
|
||||||
} else {
|
|
||||||
fieldValues[*resultField.Field].([]*string)[i] = resultField.Value
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newFields := make([]*data.Field, 0)
|
newFields := make([]*data.Field, 0)
|
||||||
for fieldName, vals := range fieldValues {
|
for _, fieldName := range fieldNames {
|
||||||
newFields = append(newFields, data.NewField(fieldName, nil, vals))
|
newFields = append(newFields, data.NewField(*fieldName, nil, fieldValues[*fieldName]))
|
||||||
|
|
||||||
if fieldName == "@timestamp" {
|
if *fieldName == "@timestamp" {
|
||||||
newFields[len(newFields)-1].SetConfig(&data.FieldConfig{Title: "Time"})
|
newFields[len(newFields)-1].SetConfig(&data.FieldConfig{Title: "Time"})
|
||||||
} else if fieldName == "@logStream" || fieldName == "@log" {
|
} else if *fieldName == "@logStream" || *fieldName == "@log" {
|
||||||
newFields[len(newFields)-1].SetConfig(
|
newFields[len(newFields)-1].SetConfig(
|
||||||
&data.FieldConfig{
|
&data.FieldConfig{
|
||||||
Custom: map[string]interface{}{
|
Custom: map[string]interface{}{
|
||||||
|
@ -65,7 +65,7 @@ const displayCustomError = (title: string, message: string) =>
|
|||||||
store.dispatch(notifyApp(createErrorNotification(title, message)));
|
store.dispatch(notifyApp(createErrorNotification(title, message)));
|
||||||
|
|
||||||
// TODO: Temporary times here, could just change to some fixed number.
|
// TODO: Temporary times here, could just change to some fixed number.
|
||||||
const MAX_ATTEMPTS = 8;
|
export const MAX_ATTEMPTS = 8;
|
||||||
const POLLING_TIMES = [100, 200, 500, 1000];
|
const POLLING_TIMES = [100, 200, 500, 1000];
|
||||||
|
|
||||||
export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWatchJsonData> {
|
export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWatchJsonData> {
|
||||||
@ -199,15 +199,35 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
|
|||||||
logsQuery(queryParams: Array<{ queryId: string; limit?: number; region: string }>): Observable<DataQueryResponse> {
|
logsQuery(queryParams: Array<{ queryId: string; limit?: number; region: string }>): Observable<DataQueryResponse> {
|
||||||
this.logQueries.clear();
|
this.logQueries.clear();
|
||||||
queryParams.forEach(param => this.logQueries.add({ id: param.queryId, region: param.region }));
|
queryParams.forEach(param => this.logQueries.add({ id: param.queryId, region: param.region }));
|
||||||
|
let prevRecordsMatched: Record<string, number> = {};
|
||||||
|
|
||||||
return withTeardown(
|
return withTeardown(
|
||||||
this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
|
this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
|
||||||
expand((dataFrames, i) => {
|
expand((dataFrames, i) => {
|
||||||
return dataFrames.every(
|
const allFramesCompleted = dataFrames.every(
|
||||||
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
|
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
|
||||||
) || i >= MAX_ATTEMPTS
|
);
|
||||||
|
return allFramesCompleted
|
||||||
? empty()
|
? empty()
|
||||||
: this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
|
: this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
|
||||||
|
map(frames => {
|
||||||
|
let moreRecordsMatched = false;
|
||||||
|
for (const frame of frames) {
|
||||||
|
const recordsMatched = frame.meta?.custom?.['Statistics']['RecordsMatched'];
|
||||||
|
if (recordsMatched > (prevRecordsMatched[frame.refId!] ?? 0)) {
|
||||||
|
moreRecordsMatched = true;
|
||||||
|
}
|
||||||
|
prevRecordsMatched[frame.refId!] = recordsMatched;
|
||||||
|
}
|
||||||
|
const noProgressMade = i >= MAX_ATTEMPTS - 2 && !moreRecordsMatched;
|
||||||
|
if (noProgressMade) {
|
||||||
|
for (const frame of frames) {
|
||||||
|
_.set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Complete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return frames;
|
||||||
|
}),
|
||||||
delay(POLLING_TIMES[Math.min(i, POLLING_TIMES.length - 1)])
|
delay(POLLING_TIMES[Math.min(i, POLLING_TIMES.length - 1)])
|
||||||
);
|
);
|
||||||
}),
|
}),
|
||||||
|
@ -1,14 +1,27 @@
|
|||||||
import '../datasource';
|
import '../datasource';
|
||||||
import { CloudWatchDatasource } from '../datasource';
|
import { CloudWatchDatasource, MAX_ATTEMPTS } from '../datasource';
|
||||||
import * as redux from 'app/store/store';
|
import * as redux from 'app/store/store';
|
||||||
import { DataSourceInstanceSettings, dateMath, getFrameDisplayTitle, DataQueryResponse } from '@grafana/data';
|
import {
|
||||||
|
DataSourceInstanceSettings,
|
||||||
|
dateMath,
|
||||||
|
getFrameDisplayTitle,
|
||||||
|
DataFrame,
|
||||||
|
DataQueryResponse,
|
||||||
|
} from '@grafana/data';
|
||||||
import { TemplateSrv } from 'app/features/templating/template_srv';
|
import { TemplateSrv } from 'app/features/templating/template_srv';
|
||||||
import { CustomVariable } from 'app/features/templating/all';
|
import { CustomVariable } from 'app/features/templating/all';
|
||||||
import { CloudWatchQuery, CloudWatchMetricsQuery } from '../types';
|
import { CloudWatchQuery, CloudWatchMetricsQuery, CloudWatchLogsQueryStatus, LogAction } from '../types';
|
||||||
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
|
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
|
||||||
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
|
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
|
||||||
import { convertToStoreState } from '../../../../../test/helpers/convertToStoreState';
|
import { convertToStoreState } from '../../../../../test/helpers/convertToStoreState';
|
||||||
import { getTemplateSrvDependencies } from 'test/helpers/getTemplateSrvDependencies';
|
import { getTemplateSrvDependencies } from 'test/helpers/getTemplateSrvDependencies';
|
||||||
|
import { of } from 'rxjs';
|
||||||
|
|
||||||
|
jest.mock('rxjs/operators', () => {
|
||||||
|
const operators = jest.requireActual('rxjs/operators');
|
||||||
|
operators.delay = jest.fn(() => (s: any) => s);
|
||||||
|
return operators;
|
||||||
|
});
|
||||||
|
|
||||||
jest.mock('@grafana/runtime', () => ({
|
jest.mock('@grafana/runtime', () => ({
|
||||||
...jest.requireActual('@grafana/runtime'),
|
...jest.requireActual('@grafana/runtime'),
|
||||||
@ -156,6 +169,86 @@ describe('CloudWatchDatasource', () => {
|
|||||||
],
|
],
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
it('should stop querying when no more data retrieved past max attempts', async () => {
|
||||||
|
const fakeFrames = genMockFrames(10);
|
||||||
|
for (let i = 7; i < fakeFrames.length; i++) {
|
||||||
|
fakeFrames[i].meta!.custom!['Statistics']['RecordsMatched'] = fakeFrames[6].meta!.custom!['Statistics'][
|
||||||
|
'RecordsMatched'
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
let i = 0;
|
||||||
|
jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
|
||||||
|
if (subtype === 'GetQueryResults') {
|
||||||
|
const mockObservable = of([fakeFrames[i]]);
|
||||||
|
i++;
|
||||||
|
return mockObservable;
|
||||||
|
} else {
|
||||||
|
return of([]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise();
|
||||||
|
|
||||||
|
const expectedData = [
|
||||||
|
{
|
||||||
|
...fakeFrames[MAX_ATTEMPTS - 1],
|
||||||
|
meta: { custom: { ...fakeFrames[MAX_ATTEMPTS - 1].meta!.custom, Status: 'Complete' } },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
expect(myResponse).toEqual({
|
||||||
|
data: expectedData,
|
||||||
|
key: 'test-key',
|
||||||
|
state: 'Done',
|
||||||
|
});
|
||||||
|
expect(i).toBe(MAX_ATTEMPTS);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should continue querying as long as new data is being received', async () => {
|
||||||
|
const fakeFrames = genMockFrames(15);
|
||||||
|
|
||||||
|
let i = 0;
|
||||||
|
jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
|
||||||
|
if (subtype === 'GetQueryResults') {
|
||||||
|
const mockObservable = of([fakeFrames[i]]);
|
||||||
|
i++;
|
||||||
|
return mockObservable;
|
||||||
|
} else {
|
||||||
|
return of([]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise();
|
||||||
|
expect(myResponse).toEqual({
|
||||||
|
data: [fakeFrames[fakeFrames.length - 1]],
|
||||||
|
key: 'test-key',
|
||||||
|
state: 'Done',
|
||||||
|
});
|
||||||
|
expect(i).toBe(15);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should stop querying when results come back with status "Complete"', async () => {
|
||||||
|
const fakeFrames = genMockFrames(3);
|
||||||
|
let i = 0;
|
||||||
|
jest.spyOn(ctx.ds, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
|
||||||
|
if (subtype === 'GetQueryResults') {
|
||||||
|
const mockObservable = of([fakeFrames[i]]);
|
||||||
|
i++;
|
||||||
|
return mockObservable;
|
||||||
|
} else {
|
||||||
|
return of([]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const myResponse = await ctx.ds.logsQuery([{ queryId: 'fake-query-id', region: 'default' }]).toPromise();
|
||||||
|
|
||||||
|
expect(myResponse).toEqual({
|
||||||
|
data: [fakeFrames[2]],
|
||||||
|
key: 'test-key',
|
||||||
|
state: 'Done',
|
||||||
|
});
|
||||||
|
expect(i).toBe(3);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('When performing CloudWatch metrics query', () => {
|
describe('When performing CloudWatch metrics query', () => {
|
||||||
@ -925,3 +1018,25 @@ describe('CloudWatchDatasource', () => {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
function genMockFrames(numResponses: number): DataFrame[] {
|
||||||
|
const recordIncrement = 50;
|
||||||
|
const mockFrames: DataFrame[] = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < numResponses; i++) {
|
||||||
|
mockFrames.push({
|
||||||
|
fields: [],
|
||||||
|
meta: {
|
||||||
|
custom: {
|
||||||
|
Status: i === numResponses - 1 ? CloudWatchLogsQueryStatus.Complete : CloudWatchLogsQueryStatus.Running,
|
||||||
|
Statistics: {
|
||||||
|
RecordsMatched: (i + 1) * recordIncrement,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
length: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return mockFrames;
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user