Logs: Added query splitting support for forward searching logs (#93450)

Query splitting: add support for forward searching logs
This commit is contained in:
Matias Chomicki
2024-09-23 11:22:06 +02:00
committed by GitHub
parent 7189a4af81
commit bca8bd3c8b
2 changed files with 62 additions and 15 deletions

View File

@ -9,7 +9,7 @@ import * as logsTimeSplit from './logsTimeSplitting';
import * as metricTimeSplit from './metricTimeSplitting';
import { runSplitQuery } from './querySplitting';
import { trackGroupedQueries } from './tracking';
import { LokiQuery, LokiQueryType } from './types';
import { LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
jest.mock('./tracking');
jest.mock('uuid', () => ({
@ -550,4 +550,39 @@ describe('runSplitQuery()', () => {
});
});
});
describe('Forward search queries', () => {
const request = createRequest([
{ expr: '{a="b"}', refId: 'A', direction: LokiQueryDirection.Backward },
{ expr: '{c="d"}', refId: 'A', direction: undefined },
{ expr: '{e="f"}', refId: 'B', direction: LokiQueryDirection.Forward },
]);
const { logFrameA } = getMockFrames();
beforeEach(() => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
});
test('Sends forward and backward queries in different groups', async () => {
jest.spyOn(datasource, 'runQuery');
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
// Forward
expect(jest.mocked(datasource.runQuery).mock.calls[1][0].targets[0].expr).toBe('{e="f"}');
expect(jest.mocked(datasource.runQuery).mock.calls[1][0].range.from.toString()).toContain('Feb 08 2023');
expect(jest.mocked(datasource.runQuery).mock.calls[3][0].targets[0].expr).toBe('{e="f"}');
expect(jest.mocked(datasource.runQuery).mock.calls[3][0].range.from.toString()).toContain('Feb 08 2023');
expect(jest.mocked(datasource.runQuery).mock.calls[5][0].targets[0].expr).toBe('{e="f"}');
expect(jest.mocked(datasource.runQuery).mock.calls[5][0].range.from.toString()).toContain('Feb 09 2023');
// Backward
expect(jest.mocked(datasource.runQuery).mock.calls[0][0].targets[0].expr).toBe('{a="b"}');
expect(jest.mocked(datasource.runQuery).mock.calls[0][0].range.from.toString()).toContain('Feb 09 2023');
expect(jest.mocked(datasource.runQuery).mock.calls[2][0].targets[0].expr).toBe('{a="b"}');
expect(jest.mocked(datasource.runQuery).mock.calls[2][0].range.from.toString()).toContain('Feb 08 2023');
expect(jest.mocked(datasource.runQuery).mock.calls[4][0].targets[0].expr).toBe('{a="b"}');
expect(jest.mocked(datasource.runQuery).mock.calls[4][0].range.from.toString()).toContain('Feb 08 2023');
// 3 days, 3 chunks, 2 groups logs, 6 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(6);
});
});
});
});

View File

@ -21,7 +21,7 @@ import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
import { trackGroupedQueries } from './tracking';
import { LokiGroupedRequest, LokiQuery, LokiQueryType } from './types';
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
export function partitionTimeRange(
isLogsQuery: boolean,
@ -219,24 +219,36 @@ export function runSplitQuery(datasource: LokiDatasource, request: DataQueryRequ
request.queryGroupId = uuidv4();
const oneDayMs = 24 * 60 * 60 * 1000;
const rangePartitionedLogQueries = groupBy(logQueries, (query) =>
query.splitDuration ? durationToMilliseconds(parseDuration(query.splitDuration)) : oneDayMs
const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
);
const requests: LokiGroupedRequest[] = [];
for (const direction in directionPartitionedLogQueries) {
const rangePartitionedLogQueries = groupBy(directionPartitionedLogQueries[direction], (query) =>
query.splitDuration ? durationToMilliseconds(parseDuration(query.splitDuration)) : oneDayMs
);
for (const [chunkRangeMs, queries] of Object.entries(rangePartitionedLogQueries)) {
const resolutionPartition = groupBy(queries, (query) => query.resolution || 1);
for (const resolution in resolutionPartition) {
const groupedRequest = {
request: { ...request, targets: resolutionPartition[resolution] },
partition: partitionTimeRange(true, request.range, request.intervalMs, Number(chunkRangeMs)),
};
if (direction === LokiQueryDirection.Forward) {
groupedRequest.partition.reverse();
}
requests.push(groupedRequest);
}
}
}
const rangePartitionedMetricQueries = groupBy(metricQueries, (query) =>
query.splitDuration ? durationToMilliseconds(parseDuration(query.splitDuration)) : oneDayMs
);
const requests: LokiGroupedRequest[] = [];
for (const [chunkRangeMs, queries] of Object.entries(rangePartitionedLogQueries)) {
const resolutionPartition = groupBy(queries, (query) => query.resolution || 1);
for (const resolution in resolutionPartition) {
requests.push({
request: { ...request, targets: resolutionPartition[resolution] },
partition: partitionTimeRange(true, request.range, request.intervalMs, Number(chunkRangeMs)),
});
}
}
for (const [chunkRangeMs, queries] of Object.entries(rangePartitionedMetricQueries)) {
const stepMsPartition = groupBy(queries, (query) =>
calculateStep(request.intervalMs, request.range, query.resolution || 1, query.step)