diff --git a/package.json b/package.json index 5da725c2f02..8dc951b8371 100644 --- a/package.json +++ b/package.json @@ -230,7 +230,6 @@ "react-window": "1.7.1", "redux": "4.0.1", "redux-logger": "3.0.6", - "redux-observable": "1.1.0", "redux-thunk": "2.3.0", "reselect": "4.0.0", "rst2html": "github:thoward/rst2html#990cb89", diff --git a/public/app/core/utils/explore.ts b/public/app/core/utils/explore.ts index 9a9aa05bd6d..4099259130c 100644 --- a/public/app/core/utils/explore.ts +++ b/public/app/core/utils/explore.ts @@ -2,24 +2,9 @@ import _ from 'lodash'; import { from } from 'rxjs'; import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; - // Services & Utils -import { dateMath } from '@grafana/data'; -import { renderUrl } from 'app/core/utils/url'; -import kbn from 'app/core/utils/kbn'; -import store from 'app/core/store'; -import { getNextRefIdChar } from './query'; - -// Types -import { - DataQuery, - DataSourceApi, - DataQueryError, - DataSourceJsonData, - DataQueryRequest, - DataStreamObserver, -} from '@grafana/ui'; import { + dateMath, toUtc, TimeRange, RawTimeRange, @@ -30,6 +15,19 @@ import { LogsModel, LogsDedupStrategy, } from '@grafana/data'; +import { renderUrl } from 'app/core/utils/url'; +import kbn from 'app/core/utils/kbn'; +import store from 'app/core/store'; +import { getNextRefIdChar } from './query'; +// Types +import { + DataQuery, + DataSourceApi, + DataQueryError, + DataSourceJsonData, + DataQueryRequest, + DataStreamObserver, +} from '@grafana/ui'; import { ExploreUrlState, HistoryItem, @@ -39,6 +37,7 @@ import { ExploreMode, } from 'app/types/explore'; import { config } from '../config'; +import { PanelQueryState } from '../../features/dashboard/state/PanelQueryState'; export const DEFAULT_RANGE = { from: 'now-1h', @@ -145,6 +144,7 @@ export function buildQueryTransaction( panelId, targets: configuredQueries, // Datasources rely on DataQueries being passed under the targets key. range, + requestId: 'explore', rangeRaw: range.raw, scopedVars: { __interval: { text: interval, value: interval }, @@ -542,3 +542,10 @@ export const getQueryResponse = ( ) => { return from(datasourceInstance.query(options, observer)); }; + +export const stopQueryState = (queryState: PanelQueryState, reason: string) => { + if (queryState && queryState.isStarted()) { + queryState.cancel(reason); + queryState.closeStreams(false); + } +}; diff --git a/public/app/features/dashboard/state/PanelQueryState.test.ts b/public/app/features/dashboard/state/PanelQueryState.test.ts index 24c95ca4cf5..c1dbbbfb66a 100644 --- a/public/app/features/dashboard/state/PanelQueryState.test.ts +++ b/public/app/features/dashboard/state/PanelQueryState.test.ts @@ -207,4 +207,19 @@ describe('stream handling', () => { expect(data.series[0].refId).toBe('F'); expect(state.streams.length).toBe(0); // no streams }); + + it('should close streams on error', () => { + // Post a stream event + state.dataStreamObserver({ + state: LoadingState.Error, + key: 'C', + error: { message: 'EEEEE' }, + data: [], + request: state.request, + unsubscribe: () => {}, + }); + + expect(state.streams.length).toBe(0); + expect(state.response.state).toBe(LoadingState.Error); + }); }); diff --git a/public/app/features/dashboard/state/PanelQueryState.ts b/public/app/features/dashboard/state/PanelQueryState.ts index e1dda30834e..0a034c4af66 100644 --- a/public/app/features/dashboard/state/PanelQueryState.ts +++ b/public/app/features/dashboard/state/PanelQueryState.ts @@ -1,10 +1,9 @@ // Libraries import { isArray, isEqual, isString } from 'lodash'; - // Utils & Services import { getBackendSrv } from 'app/core/services/backend_srv'; -import { dateMath } from '@grafana/data'; import { + dateMath, guessFieldTypes, LoadingState, toLegacyResponseData, @@ -12,7 +11,6 @@ import { toDataFrame, isDataFrame, } from '@grafana/data'; - // Types import { DataSourceApi, @@ -161,6 +159,12 @@ export class PanelQueryState { // Streams only work with the 'series' format this.sendFrames = true; + if (stream.state === LoadingState.Error) { + this.setError(stream.error); + this.onStreamingDataUpdated(); + return; + } + // Add the stream to our list let found = false; const active = this.streams.map(s => { diff --git a/public/app/features/explore/QueryField.tsx b/public/app/features/explore/QueryField.tsx index 28fff337948..fd3d4fdb09f 100644 --- a/public/app/features/explore/QueryField.tsx +++ b/public/app/features/explore/QueryField.tsx @@ -8,6 +8,7 @@ import { Editor } from 'slate-react'; // @ts-ignore import Plain from 'slate-plain-serializer'; import classnames from 'classnames'; +// @ts-ignore import { isKeyHotkey } from 'is-hotkey'; import { CompletionItem, CompletionItemGroup, TypeaheadOutput } from 'app/types/explore'; diff --git a/public/app/features/explore/state/actionTypes.ts b/public/app/features/explore/state/actionTypes.ts index 44c441f382f..5698b673c28 100644 --- a/public/app/features/explore/state/actionTypes.ts +++ b/public/app/features/explore/state/actionTypes.ts @@ -1,25 +1,9 @@ // Types import { Emitter } from 'app/core/core'; -import { - DataQuery, - DataSourceSelectItem, - DataSourceApi, - QueryFixAction, - DataQueryError, - DataQueryResponseData, -} from '@grafana/ui'; +import { DataQuery, DataSourceSelectItem, DataSourceApi, QueryFixAction, DataQueryError } from '@grafana/ui'; -import { - RawTimeRange, - LogLevel, - TimeRange, - DataFrame, - LogsModel, - LoadingState, - AbsoluteTimeRange, - GraphSeriesXY, -} from '@grafana/data'; -import { ExploreId, ExploreItemState, HistoryItem, ExploreUIState, ExploreMode, QueryOptions } from 'app/types/explore'; +import { LogLevel, TimeRange, LogsModel, LoadingState, AbsoluteTimeRange, GraphSeriesXY } from '@grafana/data'; +import { ExploreId, ExploreItemState, HistoryItem, ExploreUIState, ExploreMode } from 'app/types/explore'; import { actionCreatorFactory, noPayloadActionCreatorFactory, ActionOf } from 'app/core/redux/actionCreatorFactory'; import TableModel from 'app/core/table_model'; @@ -230,42 +214,15 @@ export interface SetUrlReplacedPayload { exploreId: ExploreId; } -export interface ProcessQueryErrorsPayload { - exploreId: ExploreId; - response: any; - datasourceId: string; -} - -export interface ProcessQueryResultsPayload { - exploreId: ExploreId; - latency: number; - datasourceId: string; - loadingState: LoadingState; - series?: DataQueryResponseData[]; - delta?: DataFrame[]; -} - -export interface RunQueriesBatchPayload { - exploreId: ExploreId; - queryOptions: QueryOptions; -} - -export interface LimitMessageRatePayload { - series: DataFrame[]; - exploreId: ExploreId; - datasourceId: string; -} - export interface ChangeRangePayload { exploreId: ExploreId; range: TimeRange; absoluteRange: AbsoluteTimeRange; } -export interface UpdateTimeRangePayload { +export interface ChangeLoadingStatePayload { exploreId: ExploreId; - rawRange?: RawTimeRange; - absoluteRange?: AbsoluteTimeRange; + loadingState: LoadingState; } /** @@ -410,8 +367,6 @@ export const splitCloseAction = actionCreatorFactory('e */ export const splitOpenAction = actionCreatorFactory('explore/SPLIT_OPEN').create(); -export const stateSaveAction = noPayloadActionCreatorFactory('explore/STATE_SAVE').create(); - /** * Update state of Explores UI elements (panels visiblity and deduplication strategy) */ @@ -460,23 +415,11 @@ export const resetQueryErrorAction = actionCreatorFactory('explore/SET_URL_REPLACED').create(); -export const processQueryErrorsAction = actionCreatorFactory( - 'explore/PROCESS_QUERY_ERRORS' -).create(); - -export const processQueryResultsAction = actionCreatorFactory( - 'explore/PROCESS_QUERY_RESULTS' -).create(); - -export const runQueriesBatchAction = actionCreatorFactory('explore/RUN_QUERIES_BATCH').create(); - -export const limitMessageRatePayloadAction = actionCreatorFactory( - 'explore/LIMIT_MESSAGE_RATE_PAYLOAD' -).create(); - export const changeRangeAction = actionCreatorFactory('explore/CHANGE_RANGE').create(); -export const updateTimeRangeAction = actionCreatorFactory('explore/UPDATE_TIMERANGE').create(); +export const changeLoadingStateAction = actionCreatorFactory( + 'changeLoadingStateAction' +).create(); export type HigherOrderAction = | ActionOf diff --git a/public/app/features/explore/state/actions.test.ts b/public/app/features/explore/state/actions.test.ts index 38e0f5f4822..6cd3e03379e 100644 --- a/public/app/features/explore/state/actions.test.ts +++ b/public/app/features/explore/state/actions.test.ts @@ -11,14 +11,12 @@ import { testDataSourceFailureAction, loadDatasourcePendingAction, loadDatasourceReadyAction, - updateTimeRangeAction, } from './actionTypes'; import { Emitter } from 'app/core/core'; import { ActionOf } from 'app/core/redux/actionCreatorFactory'; import { makeInitialUpdateState } from './reducers'; import { DataQuery } from '@grafana/ui/src/types/datasource'; -import { DefaultTimeZone, RawTimeRange, LogsDedupStrategy } from '@grafana/data'; -import { toUtc } from '@grafana/data'; +import { DefaultTimeZone, RawTimeRange, LogsDedupStrategy, toUtc } from '@grafana/data'; jest.mock('app/features/plugins/datasource_srv', () => ({ getDatasourceSrv: () => ({ @@ -30,6 +28,12 @@ jest.mock('app/features/plugins/datasource_srv', () => ({ }), })); +jest.mock('../../dashboard/services/TimeSrv', () => ({ + getTimeSrv: jest.fn().mockReturnValue({ + init: jest.fn(), + }), +})); + const t = toUtc(); const testRange = { from: t, @@ -62,6 +66,7 @@ const setup = (updateOverides?: Partial) => { const update = { ...updateDefaults, ...updateOverides }; const initialState = { user: { + orgId: '1', timeZone, }, explore: { @@ -118,19 +123,6 @@ describe('refreshExplore', () => { }); }); - describe('and update range is set', () => { - it('then it should dispatch updateTimeRangeAction', async () => { - const { exploreId, range, initialState } = setup({ range: true }); - - const dispatchedActions = await thunkTester(initialState) - .givenThunk(refreshExplore) - .whenThunkIsDispatched(exploreId); - - expect(dispatchedActions[0].type).toEqual(updateTimeRangeAction.type); - expect(dispatchedActions[0].payload).toEqual({ exploreId, rawRange: range.raw }); - }); - }); - describe('and update ui is set', () => { it('then it should dispatch updateUIStateAction', async () => { const { exploreId, initialState, ui } = setup({ ui: true }); diff --git a/public/app/features/explore/state/actions.ts b/public/app/features/explore/state/actions.ts index c6d3ac6c079..29c8b16a71b 100644 --- a/public/app/features/explore/state/actions.ts +++ b/public/app/features/explore/state/actions.ts @@ -1,6 +1,4 @@ // Libraries -import _ from 'lodash'; - // Services & Utils import store from 'app/core/store'; import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; @@ -13,13 +11,36 @@ import { getTimeRangeFromUrl, generateNewKeyAndAddRefIdIfMissing, lastUsedDatasourceKeyForOrgId, + hasNonEmptyQuery, + buildQueryTransaction, + updateHistory, + getRefIds, + instanceOfDataQueryError, + clearQueryKeys, + serializeStateToUrlParam, + stopQueryState, } from 'app/core/utils/explore'; - // Types -import { ThunkResult } from 'app/types'; -import { DataSourceApi, DataQuery, DataSourceSelectItem, QueryFixAction } from '@grafana/ui'; +import { ThunkResult, ExploreUrlState } from 'app/types'; +import { + DataSourceApi, + DataQuery, + DataSourceSelectItem, + QueryFixAction, + PanelData, + DataQueryResponseData, +} from '@grafana/ui'; -import { RawTimeRange, LogsDedupStrategy, AbsoluteTimeRange } from '@grafana/data'; +import { + RawTimeRange, + LogsDedupStrategy, + AbsoluteTimeRange, + LoadingState, + DataFrame, + TimeRange, + isDateTime, + dateTimeForTimeZone, +} from '@grafana/data'; import { ExploreId, ExploreUIState, QueryTransaction, ExploreMode } from 'app/types/explore'; import { updateDatasourceInstanceAction, @@ -52,14 +73,24 @@ import { loadExploreDatasources, changeModeAction, scanStopAction, - runQueriesAction, - stateSaveAction, - updateTimeRangeAction, + changeLoadingStateAction, + historyUpdatedAction, + queryStartAction, + resetQueryErrorAction, + querySuccessAction, + queryFailureAction, + setUrlReplacedAction, + changeRangeAction, } from './actionTypes'; import { ActionOf, ActionCreator } from 'app/core/redux/actionCreatorFactory'; import { getTimeZone } from 'app/features/profile/state/selectors'; import { offOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; import { getShiftedTimeRange } from 'app/core/utils/timePicker'; +import { ResultProcessor } from '../utils/ResultProcessor'; +import _ from 'lodash'; +import { toDataQueryError } from '../../dashboard/state/PanelQueryState'; +import { updateLocation } from '../../../core/actions'; +import { getTimeSrv } from '../../dashboard/services/TimeSrv'; /** * Updates UI state and save it to the URL @@ -67,7 +98,7 @@ import { getShiftedTimeRange } from 'app/core/utils/timePicker'; const updateExploreUIState = (exploreId: ExploreId, uiStateFragment: Partial): ThunkResult => { return dispatch => { dispatch(updateUIStateAction({ exploreId, ...uiStateFragment })); - dispatch(stateSaveAction()); + dispatch(stateSave()); }; }; @@ -165,7 +196,7 @@ export const updateTimeRange = (options: { absoluteRange?: AbsoluteTimeRange; }): ThunkResult => { return dispatch => { - dispatch(updateTimeRangeAction({ ...options })); + dispatch(updateTime({ ...options })); dispatch(runQueries(options.exploreId)); }; }; @@ -187,7 +218,7 @@ export function clearQueries(exploreId: ExploreId): ThunkResult { return dispatch => { dispatch(scanStopAction({ exploreId })); dispatch(clearQueriesAction({ exploreId })); - dispatch(stateSaveAction()); + dispatch(stateSave()); }; } @@ -250,7 +281,7 @@ export function initializeExplore( ui, }) ); - dispatch(updateTimeRangeAction({ exploreId })); + dispatch(updateTime({ exploreId })); }; } @@ -401,11 +432,302 @@ export function modifyQueries( */ export function runQueries(exploreId: ExploreId): ThunkResult { return (dispatch, getState) => { - dispatch(updateTimeRangeAction({ exploreId })); - dispatch(runQueriesAction({ exploreId })); + dispatch(updateTime({ exploreId })); + + const exploreItemState = getState().explore[exploreId]; + const { + datasourceInstance, + queries, + datasourceError, + containerWidth, + isLive: live, + queryState, + queryIntervals, + range, + scanning, + history, + } = exploreItemState; + + if (datasourceError) { + // let's not run any queries if data source is in a faulty state + return; + } + + if (!hasNonEmptyQuery(queries)) { + dispatch(clearQueriesAction({ exploreId })); + dispatch(stateSave()); // Remember to save to state and update location + return; + } + + // Some datasource's query builders allow per-query interval limits, + // but we're using the datasource interval limit for now + const interval = datasourceInstance.interval; + + stopQueryState(queryState, 'New request issued'); + + queryState.sendFrames = true; + queryState.sendLegacy = true; // temporary hack until we switch to PanelData + + const queryOptions = { interval, maxDataPoints: containerWidth, live }; + const datasourceId = datasourceInstance.meta.id; + const now = Date.now(); + const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning); + + // temporary hack until we switch to PanelData, Loki already converts to DataFrame so using legacy will destroy the format + const isLokiDataSource = datasourceInstance.meta.name === 'Loki'; + + queryState.onStreamingDataUpdated = () => { + const data = queryState.validateStreamsAndGetPanelData(); + const { state, error, legacy, series } = data; + if (!data && !error && !legacy && !series) { + return; + } + + if (state === LoadingState.Error) { + dispatch(processErrorResults({ exploreId, response: error, datasourceId })); + return; + } + + if (state === LoadingState.Streaming) { + dispatch(limitMessageRate(exploreId, isLokiDataSource ? series : legacy, datasourceId)); + return; + } + + if (state === LoadingState.Done) { + dispatch(changeLoadingStateAction({ exploreId, loadingState: state })); + } + }; + + dispatch(queryStartAction({ exploreId })); + + queryState + .execute(datasourceInstance, transaction.options) + .then((response: PanelData) => { + const { legacy, error, series } = response; + if (error) { + dispatch(processErrorResults({ exploreId, response: error, datasourceId })); + return; + } + + const latency = Date.now() - now; + // Side-effect: Saving history in localstorage + const nextHistory = updateHistory(history, datasourceId, queries); + dispatch(historyUpdatedAction({ exploreId, history: nextHistory })); + dispatch( + processQueryResults({ + exploreId, + latency, + datasourceId, + loadingState: LoadingState.Done, + series: isLokiDataSource ? series : legacy, + }) + ); + dispatch(stateSave()); + }) + .catch(error => { + dispatch(processErrorResults({ exploreId, response: error, datasourceId })); + }); }; } +export const limitMessageRate = ( + exploreId: ExploreId, + series: DataFrame[] | any[], + datasourceId: string +): ThunkResult => { + return (dispatch, getState) => { + dispatch( + processQueryResults({ + exploreId, + latency: 0, + datasourceId, + loadingState: LoadingState.Streaming, + series, + }) + ); + }; +}; + +export const processQueryResults = (config: { + exploreId: ExploreId; + latency: number; + datasourceId: string; + loadingState: LoadingState; + series?: DataQueryResponseData[]; +}): ThunkResult => { + return (dispatch, getState) => { + const { exploreId, datasourceId, latency, loadingState, series } = config; + const { datasourceInstance, scanning, eventBridge } = getState().explore[exploreId]; + + // If datasource already changed, results do not matter + if (datasourceInstance.meta.id !== datasourceId) { + return; + } + + const result = series || []; + const replacePreviousResults = loadingState === LoadingState.Done && series ? true : false; + const resultProcessor = new ResultProcessor(getState().explore[exploreId], replacePreviousResults, result); + const graphResult = resultProcessor.getGraphResult(); + const tableResult = resultProcessor.getTableResult(); + const logsResult = resultProcessor.getLogsResult(); + const refIds = getRefIds(result); + + // For Angular editors + eventBridge.emit('data-received', resultProcessor.getRawData()); + + // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly + dispatch(resetQueryErrorAction({ exploreId, refIds })); + + dispatch( + querySuccessAction({ + exploreId, + latency, + loadingState, + graphResult, + tableResult, + logsResult, + }) + ); + + // Keep scanning for results if this was the last scanning transaction + if (scanning) { + if (_.size(result) === 0) { + const range = getShiftedTimeRange(-1, getState().explore[exploreId].range); + dispatch(updateTime({ exploreId, absoluteRange: range })); + dispatch(runQueries(exploreId)); + } else { + // We can stop scanning if we have a result + dispatch(scanStopAction({ exploreId })); + } + } + }; +}; + +export const processErrorResults = (config: { + exploreId: ExploreId; + response: any; + datasourceId: string; +}): ThunkResult => { + return (dispatch, getState) => { + const { exploreId, datasourceId } = config; + let { response } = config; + const { datasourceInstance, eventBridge } = getState().explore[exploreId]; + + if (datasourceInstance.meta.id !== datasourceId || response.cancelled) { + // Navigated away, queries did not matter + return; + } + + // For Angular editors + eventBridge.emit('data-error', response); + + console.error(response); // To help finding problems with query syntax + + if (!instanceOfDataQueryError(response)) { + response = toDataQueryError(response); + } + + dispatch(queryFailureAction({ exploreId, response })); + }; +}; + +const toRawTimeRange = (range: TimeRange): RawTimeRange => { + let from = range.raw.from; + if (isDateTime(from)) { + from = from.valueOf().toString(10); + } + + let to = range.raw.to; + if (isDateTime(to)) { + to = to.valueOf().toString(10); + } + + return { + from, + to, + }; +}; + +export const stateSave = (): ThunkResult => { + return (dispatch, getState) => { + const { left, right, split } = getState().explore; + const orgId = getState().user.orgId.toString(); + const replace = left && left.urlReplaced === false; + const urlStates: { [index: string]: string } = { orgId }; + const leftUrlState: ExploreUrlState = { + datasource: left.datasourceInstance.name, + queries: left.queries.map(clearQueryKeys), + range: toRawTimeRange(left.range), + mode: left.mode, + ui: { + showingGraph: left.showingGraph, + showingLogs: true, + showingTable: left.showingTable, + dedupStrategy: left.dedupStrategy, + }, + }; + urlStates.left = serializeStateToUrlParam(leftUrlState, true); + if (split) { + const rightUrlState: ExploreUrlState = { + datasource: right.datasourceInstance.name, + queries: right.queries.map(clearQueryKeys), + range: toRawTimeRange(right.range), + mode: right.mode, + ui: { + showingGraph: right.showingGraph, + showingLogs: true, + showingTable: right.showingTable, + dedupStrategy: right.dedupStrategy, + }, + }; + + urlStates.right = serializeStateToUrlParam(rightUrlState, true); + } + + dispatch(updateLocation({ query: urlStates, replace })); + if (replace) { + dispatch(setUrlReplacedAction({ exploreId: ExploreId.left })); + } + }; +}; + +export const updateTime = (config: { + exploreId: ExploreId; + rawRange?: RawTimeRange; + absoluteRange?: AbsoluteTimeRange; +}): ThunkResult => { + return (dispatch, getState) => { + const { exploreId, absoluteRange: absRange, rawRange: actionRange } = config; + const itemState = getState().explore[exploreId]; + const timeZone = getTimeZone(getState().user); + const { range: rangeInState } = itemState; + let rawRange: RawTimeRange = rangeInState.raw; + + if (absRange) { + rawRange = { + from: dateTimeForTimeZone(timeZone, absRange.from), + to: dateTimeForTimeZone(timeZone, absRange.to), + }; + } + + if (actionRange) { + rawRange = actionRange; + } + + const range = getTimeRange(timeZone, rawRange); + const absoluteRange: AbsoluteTimeRange = { from: range.from.valueOf(), to: range.to.valueOf() }; + + getTimeSrv().init({ + time: range.raw, + refresh: false, + getTimezone: () => timeZone, + timeRangeUpdated: (): any => undefined, + }); + + dispatch(changeRangeAction({ exploreId, range, absoluteRange })); + }; +}; + /** * Start a scan for more results using the given scanner. * @param exploreId Explore area @@ -418,8 +740,8 @@ export function scanStart(exploreId: ExploreId): ThunkResult { // Scanning must trigger query run, and return the new range const range = getShiftedTimeRange(-1, getState().explore[exploreId].range); // Set the new range to be displayed - dispatch(updateTimeRangeAction({ exploreId, absoluteRange: range })); - dispatch(runQueriesAction({ exploreId })); + dispatch(updateTime({ exploreId, absoluteRange: range })); + dispatch(runQueries(exploreId)); }; } @@ -443,7 +765,7 @@ export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): Thunk export function splitClose(itemId: ExploreId): ThunkResult { return dispatch => { dispatch(splitCloseAction({ itemId })); - dispatch(stateSaveAction()); + dispatch(stateSave()); }; } @@ -467,7 +789,7 @@ export function splitOpen(): ThunkResult { urlState, }; dispatch(splitOpenAction({ itemState })); - dispatch(stateSaveAction()); + dispatch(stateSave()); }; } @@ -544,7 +866,7 @@ export function refreshExplore(exploreId: ExploreId): ThunkResult { } if (update.range) { - dispatch(updateTimeRangeAction({ exploreId, rawRange: range.raw })); + dispatch(updateTime({ exploreId, rawRange: range.raw })); } // need to refresh ui state diff --git a/public/app/features/explore/state/epics/limitMessageRateEpic.ts b/public/app/features/explore/state/epics/limitMessageRateEpic.ts deleted file mode 100644 index a2eb256e312..00000000000 --- a/public/app/features/explore/state/epics/limitMessageRateEpic.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { Epic } from 'redux-observable'; -import { map, throttleTime } from 'rxjs/operators'; -import { LoadingState } from '@grafana/data'; - -import { StoreState } from 'app/types'; -import { ActionOf } from '../../../../core/redux/actionCreatorFactory'; -import { limitMessageRatePayloadAction, LimitMessageRatePayload, processQueryResultsAction } from '../actionTypes'; -import { EpicDependencies } from 'app/store/configureStore'; - -export const limitMessageRateEpic: Epic, ActionOf, StoreState, EpicDependencies> = action$ => { - return action$.ofType(limitMessageRatePayloadAction.type).pipe( - throttleTime(1), - map((action: ActionOf) => { - const { exploreId, series, datasourceId } = action.payload; - return processQueryResultsAction({ - exploreId, - latency: 0, - datasourceId, - loadingState: LoadingState.Streaming, - series: null, - delta: series, - }); - }) - ); -}; diff --git a/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts b/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts deleted file mode 100644 index 7cdaca78f7d..00000000000 --- a/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { mockExploreState } from 'test/mocks/mockExploreState'; -import { epicTester } from 'test/core/redux/epicTester'; -import { processQueryErrorsAction, queryFailureAction } from '../actionTypes'; -import { processQueryErrorsEpic } from './processQueryErrorsEpic'; - -describe('processQueryErrorsEpic', () => { - let originalConsoleError = console.error; - - beforeEach(() => { - originalConsoleError = console.error; - console.error = jest.fn(); - }); - - afterEach(() => { - console.error = originalConsoleError; - }); - - describe('when processQueryErrorsAction is dispatched', () => { - describe('and datasourceInstance is the same', () => { - describe('and the response is not cancelled', () => { - it('then queryFailureAction is dispatched', () => { - const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); - const response = { message: 'Something went terribly wrong!' }; - - epicTester(processQueryErrorsEpic, state) - .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response })) - .thenResultingActionsEqual(queryFailureAction({ exploreId, response })); - - expect(console.error).toBeCalledTimes(1); - expect(console.error).toBeCalledWith(response); - expect(eventBridge.emit).toBeCalledTimes(1); - expect(eventBridge.emit).toBeCalledWith('data-error', response); - }); - }); - - describe('and the response is cancelled', () => { - it('then no actions are dispatched', () => { - const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); - const response = { cancelled: true, message: 'Something went terribly wrong!' }; - - epicTester(processQueryErrorsEpic, state) - .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response })) - .thenNoActionsWhereDispatched(); - - expect(console.error).not.toBeCalled(); - expect(eventBridge.emit).not.toBeCalled(); - }); - }); - }); - - describe('and datasourceInstance is not the same', () => { - describe('and the response is not cancelled', () => { - it('then no actions are dispatched', () => { - const { exploreId, state, eventBridge } = mockExploreState(); - const response = { message: 'Something went terribly wrong!' }; - - epicTester(processQueryErrorsEpic, state) - .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId: 'other id', response })) - .thenNoActionsWhereDispatched(); - - expect(console.error).not.toBeCalled(); - expect(eventBridge.emit).not.toBeCalled(); - }); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/processQueryErrorsEpic.ts b/public/app/features/explore/state/epics/processQueryErrorsEpic.ts deleted file mode 100644 index ea029186dc8..00000000000 --- a/public/app/features/explore/state/epics/processQueryErrorsEpic.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { Epic } from 'redux-observable'; -import { mergeMap } from 'rxjs/operators'; -import { NEVER, of } from 'rxjs'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { instanceOfDataQueryError } from 'app/core/utils/explore'; -import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState'; -import { processQueryErrorsAction, ProcessQueryErrorsPayload, queryFailureAction } from '../actionTypes'; - -export const processQueryErrorsEpic: Epic, ActionOf, StoreState> = (action$, state$) => { - return action$.ofType(processQueryErrorsAction.type).pipe( - mergeMap((action: ActionOf) => { - const { exploreId, datasourceId } = action.payload; - let { response } = action.payload; - const { datasourceInstance, eventBridge } = state$.value.explore[exploreId]; - - if (datasourceInstance.meta.id !== datasourceId || response.cancelled) { - // Navigated away, queries did not matter - return NEVER; - } - - // For Angular editors - eventBridge.emit('data-error', response); - - console.error(response); // To help finding problems with query syntax - - if (!instanceOfDataQueryError(response)) { - response = toDataQueryError(response); - } - - return of( - queryFailureAction({ - exploreId, - response, - }) - ); - }) - ); -}; diff --git a/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts b/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts deleted file mode 100644 index fabf426cd31..00000000000 --- a/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts +++ /dev/null @@ -1,119 +0,0 @@ -import { mockExploreState } from 'test/mocks/mockExploreState'; -import { epicTester, MOCKED_ABSOLUTE_RANGE } from 'test/core/redux/epicTester'; -import { - processQueryResultsAction, - resetQueryErrorAction, - querySuccessAction, - scanStopAction, - updateTimeRangeAction, - runQueriesAction, -} from '../actionTypes'; -import { DataFrame, LoadingState, toDataFrame } from '@grafana/data'; -import { processQueryResultsEpic } from './processQueryResultsEpic'; -import TableModel from 'app/core/table_model'; - -const testContext = () => { - const serieA: DataFrame = toDataFrame({ - fields: [], - refId: 'A', - }); - const serieB: DataFrame = toDataFrame({ - fields: [], - refId: 'B', - }); - const series = [serieA, serieB]; - const latency = 0; - const loadingState = LoadingState.Done; - - return { - latency, - series, - loadingState, - }; -}; - -describe('processQueryResultsEpic', () => { - describe('when processQueryResultsAction is dispatched', () => { - describe('and datasourceInstance is the same', () => { - describe('and explore is not scanning', () => { - it('then resetQueryErrorAction and querySuccessAction are dispatched and eventBridge emits correct message', () => { - const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); - const { latency, series, loadingState } = testContext(); - const graphResult: any[] = []; - const tableResult = new TableModel(); - const logsResult: any = null; - - epicTester(processQueryResultsEpic, state) - .whenActionIsDispatched( - processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency }) - ) - .thenResultingActionsEqual( - resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }), - querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }) - ); - - expect(eventBridge.emit).toBeCalledTimes(1); - expect(eventBridge.emit).toBeCalledWith('data-received', series); - }); - }); - - describe('and explore is scanning', () => { - describe('and we have a result', () => { - it('then correct actions are dispatched', () => { - const { datasourceId, exploreId, state } = mockExploreState({ scanning: true }); - const { latency, series, loadingState } = testContext(); - const graphResult: any[] = []; - const tableResult = new TableModel(); - const logsResult: any = null; - - epicTester(processQueryResultsEpic, state) - .whenActionIsDispatched( - processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency }) - ) - .thenResultingActionsEqual( - resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }), - querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }), - scanStopAction({ exploreId }) - ); - }); - }); - - describe('and we do not have a result', () => { - it('then correct actions are dispatched', () => { - const { datasourceId, exploreId, state } = mockExploreState({ scanning: true }); - const { latency, loadingState } = testContext(); - const graphResult: any[] = []; - const tableResult = new TableModel(); - const logsResult: any = null; - - epicTester(processQueryResultsEpic, state) - .whenActionIsDispatched( - processQueryResultsAction({ exploreId, datasourceId, loadingState, series: [], latency }) - ) - .thenResultingActionsEqual( - resetQueryErrorAction({ exploreId, refIds: [] }), - querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }), - updateTimeRangeAction({ exploreId, absoluteRange: MOCKED_ABSOLUTE_RANGE }), - runQueriesAction({ exploreId }) - ); - }); - }); - }); - }); - - describe('and datasourceInstance is not the same', () => { - it('then no actions are dispatched and eventBridge does not emit message', () => { - const { exploreId, state, eventBridge } = mockExploreState(); - const { series, loadingState } = testContext(); - - epicTester(processQueryResultsEpic, state) - .whenActionIsDispatched( - processQueryResultsAction({ exploreId, datasourceId: 'other id', loadingState, series, latency: 0 }) - ) - .thenNoActionsWhereDispatched(); - - expect(eventBridge.emit).not.toBeCalled(); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/processQueryResultsEpic.ts b/public/app/features/explore/state/epics/processQueryResultsEpic.ts deleted file mode 100644 index e2328d730fb..00000000000 --- a/public/app/features/explore/state/epics/processQueryResultsEpic.ts +++ /dev/null @@ -1,82 +0,0 @@ -import _ from 'lodash'; -import { Epic } from 'redux-observable'; -import { mergeMap } from 'rxjs/operators'; -import { NEVER } from 'rxjs'; -import { LoadingState } from '@grafana/data'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { getRefIds } from 'app/core/utils/explore'; -import { - processQueryResultsAction, - ProcessQueryResultsPayload, - querySuccessAction, - resetQueryErrorAction, - scanStopAction, - updateTimeRangeAction, - runQueriesAction, -} from '../actionTypes'; -import { ResultProcessor } from '../../utils/ResultProcessor'; - -export const processQueryResultsEpic: Epic, ActionOf, StoreState> = ( - action$, - state$, - { getTimeZone, getShiftedTimeRange } -) => { - return action$.ofType(processQueryResultsAction.type).pipe( - mergeMap((action: ActionOf) => { - const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload; - const { datasourceInstance, scanning, eventBridge } = state$.value.explore[exploreId]; - - // If datasource already changed, results do not matter - if (datasourceInstance.meta.id !== datasourceId) { - return NEVER; - } - - const result = series || delta || []; - const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false; - const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result); - const graphResult = resultProcessor.getGraphResult(); - const tableResult = resultProcessor.getTableResult(); - const logsResult = resultProcessor.getLogsResult(); - const refIds = getRefIds(result); - const actions: Array> = []; - - // For Angular editors - eventBridge.emit('data-received', resultProcessor.getRawData()); - - // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly - actions.push( - resetQueryErrorAction({ - exploreId, - refIds, - }) - ); - - actions.push( - querySuccessAction({ - exploreId, - latency, - loadingState, - graphResult, - tableResult, - logsResult, - }) - ); - - // Keep scanning for results if this was the last scanning transaction - if (scanning) { - if (_.size(result) === 0) { - const range = getShiftedTimeRange(-1, state$.value.explore[exploreId].range, getTimeZone(state$.value.user)); - actions.push(updateTimeRangeAction({ exploreId, absoluteRange: range })); - actions.push(runQueriesAction({ exploreId })); - } else { - // We can stop scanning if we have a result - actions.push(scanStopAction({ exploreId })); - } - } - - return actions; - }) - ); -}; diff --git a/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts b/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts deleted file mode 100644 index 8e612e80cb4..00000000000 --- a/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts +++ /dev/null @@ -1,425 +0,0 @@ -import { mockExploreState } from 'test/mocks/mockExploreState'; -import { epicTester } from 'test/core/redux/epicTester'; -import { runQueriesBatchEpic } from './runQueriesBatchEpic'; -import { - runQueriesBatchAction, - queryStartAction, - historyUpdatedAction, - processQueryResultsAction, - processQueryErrorsAction, - limitMessageRatePayloadAction, - resetExploreAction, - updateDatasourceInstanceAction, - changeRefreshIntervalAction, - clearQueriesAction, - stateSaveAction, -} from '../actionTypes'; -import { LoadingState, DataFrame, FieldType, DataFrameHelper } from '@grafana/data'; -import { DataQueryRequest } from '@grafana/ui'; - -const testContext = () => { - const series: DataFrame[] = [ - new DataFrameHelper({ - fields: [ - { - name: 'Value', - values: [], - }, - { - name: 'Time', - type: FieldType.time, - config: { - unit: 'dateTimeAsIso', - }, - values: [], - }, - ], - refId: 'A', - }), - ]; - const response = { data: series }; - - return { - response, - series, - }; -}; - -describe('runQueriesBatchEpic', () => { - let originalDateNow = Date.now; - beforeEach(() => { - originalDateNow = Date.now; - Date.now = () => 1337; - }); - - afterEach(() => { - Date.now = originalDateNow; - }); - - describe('when runQueriesBatchAction is dispatched', () => { - describe('and query targets are not live', () => { - describe('and query is successful', () => { - it('then correct actions are dispatched', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId } = mockExploreState(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryReceivesResponse(response) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - }); - }); - - describe('and query is not successful', () => { - it('then correct actions are dispatched', () => { - const error = { - message: 'Error parsing line x', - }; - const { exploreId, state, datasourceId } = mockExploreState(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryThrowsError(error) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - processQueryErrorsAction({ exploreId, response: error, datasourceId }) - ); - }); - }); - }); - - describe('and query targets are live', () => { - describe('and state equals Streaming', () => { - it('then correct actions are dispatched', () => { - const { exploreId, state, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - const serieA: any = { - fields: [], - rows: [], - refId: 'A', - }; - const serieB: any = { - fields: [], - rows: [], - refId: 'B', - }; - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryObserverReceivesEvent({ - state: LoadingState.Streaming, - delta: [serieA], - key: 'some key', - request: {} as DataQueryRequest, - unsubscribe, - }) - .whenQueryObserverReceivesEvent({ - state: LoadingState.Streaming, - delta: [serieB], - key: 'some key', - request: {} as DataQueryRequest, - unsubscribe, - }) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - limitMessageRatePayloadAction({ exploreId, series: [serieA], datasourceId }), - limitMessageRatePayloadAction({ exploreId, series: [serieB], datasourceId }) - ); - }); - }); - - describe('and state equals Error', () => { - it('then correct actions are dispatched', () => { - const { exploreId, state, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - const error = { message: 'Something went really wrong!' }; - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryObserverReceivesEvent({ - state: LoadingState.Error, - error, - key: 'some key', - request: {} as DataQueryRequest, - unsubscribe, - }) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - processQueryErrorsAction({ exploreId, response: error, datasourceId }) - ); - }); - }); - - describe('and state equals Done', () => { - it('then correct actions are dispatched', () => { - const { exploreId, state, datasourceId, history } = mockExploreState(); - const unsubscribe = jest.fn(); - const serieA: any = { - fields: [], - rows: [], - refId: 'A', - }; - const serieB: any = { - fields: [], - rows: [], - refId: 'B', - }; - const delta = [serieA, serieB]; - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryObserverReceivesEvent({ - state: LoadingState.Done, - data: null, - delta, - key: 'some key', - request: {} as DataQueryRequest, - unsubscribe, - }) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta, - series: null, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - }); - }); - }); - - describe('and another runQueriesBatchAction is dispatched', () => { - it('then the observable should be unsubscribed', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) // first observable - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .whenActionIsDispatched( - // second observable and unsubscribes the first observable - runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 800 } }) - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .thenResultingActionsEqual( - queryStartAction({ exploreId }), // output from first observable - historyUpdatedAction({ exploreId, history }), // output from first observable - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction(), - // output from first observable - queryStartAction({ exploreId }), // output from second observable - historyUpdatedAction({ exploreId, history }), // output from second observable - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - // output from second observable - ); - - expect(unsubscribe).toBeCalledTimes(1); // first unsubscribe should be called but not second as that isn't unsubscribed - }); - }); - - describe('and resetExploreAction is dispatched', () => { - it('then the observable should be unsubscribed', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .whenActionIsDispatched(resetExploreAction()) // unsubscribes the observable - .whenQueryReceivesResponse(response) // new updates will not reach anywhere - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - - expect(unsubscribe).toBeCalledTimes(1); - }); - }); - - describe('and updateDatasourceInstanceAction is dispatched', () => { - it('then the observable should be unsubscribed', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId, datasourceInstance } = mockExploreState(); - const unsubscribe = jest.fn(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .whenActionIsDispatched(updateDatasourceInstanceAction({ exploreId, datasourceInstance })) // unsubscribes the observable - .whenQueryReceivesResponse(response) // new updates will not reach anywhere - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - - expect(unsubscribe).toBeCalledTimes(1); - }); - }); - - describe('and changeRefreshIntervalAction is dispatched', () => { - it('then the observable should be unsubscribed', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: '' })) // unsubscribes the observable - .whenQueryReceivesResponse(response) // new updates will not reach anywhere - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - - expect(unsubscribe).toBeCalledTimes(1); - }); - }); - - describe('and clearQueriesAction is dispatched', () => { - it('then the observable should be unsubscribed', () => { - const { response, series } = testContext(); - const { exploreId, state, history, datasourceId } = mockExploreState(); - const unsubscribe = jest.fn(); - - epicTester(runQueriesBatchEpic, state) - .whenActionIsDispatched( - runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) - ) - .whenQueryReceivesResponse(response) - .whenQueryObserverReceivesEvent({ - key: 'some key', - request: {} as DataQueryRequest, - state: LoadingState.Loading, // fake just to setup and test unsubscribe - unsubscribe, - }) - .whenActionIsDispatched(clearQueriesAction({ exploreId })) // unsubscribes the observable - .whenQueryReceivesResponse(response) // new updates will not reach anywhere - .thenResultingActionsEqual( - queryStartAction({ exploreId }), - historyUpdatedAction({ exploreId, history }), - processQueryResultsAction({ - exploreId, - delta: null, - series, - latency: 0, - datasourceId, - loadingState: LoadingState.Done, - }), - stateSaveAction() - ); - - expect(unsubscribe).toBeCalledTimes(1); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/runQueriesBatchEpic.ts b/public/app/features/explore/state/epics/runQueriesBatchEpic.ts deleted file mode 100644 index 4d7876a37f7..00000000000 --- a/public/app/features/explore/state/epics/runQueriesBatchEpic.ts +++ /dev/null @@ -1,231 +0,0 @@ -import { Epic } from 'redux-observable'; -import { Observable, Subject } from 'rxjs'; -import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators'; -import _, { isString } from 'lodash'; -import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; -import { DataStreamState, DataQueryResponse, DataQueryResponseData } from '@grafana/ui'; - -import { LoadingState, DataFrame, AbsoluteTimeRange } from '@grafana/data'; -import { dateMath } from '@grafana/data'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore'; -import { - clearQueriesAction, - historyUpdatedAction, - resetExploreAction, - updateDatasourceInstanceAction, - changeRefreshIntervalAction, - processQueryErrorsAction, - processQueryResultsAction, - runQueriesBatchAction, - RunQueriesBatchPayload, - queryStartAction, - limitMessageRatePayloadAction, - stateSaveAction, - changeRangeAction, -} from '../actionTypes'; -import { ExploreId, ExploreItemState } from 'app/types'; - -const publishActions = (outerObservable: Subject, actions: Array>) => { - for (const action of actions) { - outerObservable.next(action); - } -}; - -interface ProcessResponseConfig { - exploreId: ExploreId; - exploreItemState: ExploreItemState; - datasourceId: string; - now: number; - loadingState: LoadingState; - series?: DataQueryResponseData[]; - delta?: DataFrame[]; -} - -const processResponse = (config: ProcessResponseConfig) => { - const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config; - const { queries, history } = exploreItemState; - const latency = Date.now() - now; - - // Side-effect: Saving history in localstorage - const nextHistory = updateHistory(history, datasourceId, queries); - return [ - historyUpdatedAction({ exploreId, history: nextHistory }), - processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }), - stateSaveAction(), - ]; -}; - -interface ProcessErrorConfig { - exploreId: ExploreId; - datasourceId: string; - error: any; -} - -const processError = (config: ProcessErrorConfig) => { - const { exploreId, datasourceId, error } = config; - - return [processQueryErrorsAction({ exploreId, response: error, datasourceId })]; -}; - -export const runQueriesBatchEpic: Epic, ActionOf, StoreState> = ( - action$, - state$, - { getQueryResponse } -) => { - return action$.ofType(runQueriesBatchAction.type).pipe( - mergeMap((action: ActionOf) => { - const { exploreId, queryOptions } = action.payload; - const exploreItemState = state$.value.explore[exploreId]; - const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState; - - // Create an observable per run queries action - // Within the observable create two subscriptions - // First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance - // Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback - const observable: Observable> = Observable.create((outerObservable: Subject) => { - const datasourceId = datasourceInstance.meta.id; - const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning); - outerObservable.next(queryStartAction({ exploreId })); - - const now = Date.now(); - let datasourceUnsubscribe: Function = null; - const streamHandler = new Subject(); - const observer = (event: DataStreamState) => { - datasourceUnsubscribe = event.unsubscribe; - if (!streamHandler.closed) { - // their might be a race condition when unsubscribing - streamHandler.next(event); - } - }; - - // observer subscription, handles datasourceInstance.query observer events and pushes that forward - const streamSubscription = streamHandler.subscribe({ - next: event => { - const { state, error, data, delta } = event; - if (!data && !delta && !error) { - return; - } - - if (state === LoadingState.Error) { - const actions = processError({ exploreId, datasourceId, error }); - publishActions(outerObservable, actions); - } - - if (state === LoadingState.Streaming) { - if (event.request && event.request.range) { - let newRange = event.request.range; - let absoluteRange: AbsoluteTimeRange = { - from: newRange.from.valueOf(), - to: newRange.to.valueOf(), - }; - if (isString(newRange.raw.from)) { - newRange = { - from: dateMath.parse(newRange.raw.from, false), - to: dateMath.parse(newRange.raw.to, true), - raw: newRange.raw, - }; - absoluteRange = { - from: newRange.from.valueOf(), - to: newRange.to.valueOf(), - }; - } - outerObservable.next(changeRangeAction({ exploreId, range: newRange, absoluteRange })); - } - - outerObservable.next( - limitMessageRatePayloadAction({ - exploreId, - series: delta, - datasourceId, - }) - ); - } - - if (state === LoadingState.Done || state === LoadingState.Loading) { - const actions = processResponse({ - exploreId, - exploreItemState, - datasourceId, - now, - loadingState: state, - series: null, - delta, - }); - publishActions(outerObservable, actions); - } - }, - }); - - // query subscription, handles datasourceInstance.query response and pushes that forward - const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer) - .pipe( - mergeMap((response: DataQueryResponse) => { - return processResponse({ - exploreId, - exploreItemState, - datasourceId, - now, - loadingState: LoadingState.Done, - series: response && response.data ? response.data : [], - delta: null, - }); - }), - catchError(error => { - return processError({ exploreId, datasourceId, error }); - }) - ) - .subscribe({ next: (action: ActionOf) => outerObservable.next(action) }); - - // this unsubscribe method will be called when any of the takeUntil actions below happen - const unsubscribe = () => { - if (datasourceUnsubscribe) { - datasourceUnsubscribe(); - } - querySubscription.unsubscribe(); - streamSubscription.unsubscribe(); - streamHandler.unsubscribe(); - outerObservable.unsubscribe(); - }; - - return unsubscribe; - }); - - return observable.pipe( - takeUntil( - action$ - .ofType( - runQueriesBatchAction.type, - resetExploreAction.type, - updateDatasourceInstanceAction.type, - changeRefreshIntervalAction.type, - clearQueriesAction.type - ) - .pipe( - filter(action => { - if (action.type === resetExploreAction.type) { - return true; // stops all subscriptions if user navigates away - } - - if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) { - return true; // stops subscriptions if user changes data source - } - - if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) { - return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live' - } - - if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) { - return true; // stops subscriptions if user clears all queries - } - - return action.payload.exploreId === exploreId; - }) - ) - ) - ); - }) - ); -}; diff --git a/public/app/features/explore/state/epics/runQueriesEpic.test.ts b/public/app/features/explore/state/epics/runQueriesEpic.test.ts deleted file mode 100644 index 1006ac5b9ad..00000000000 --- a/public/app/features/explore/state/epics/runQueriesEpic.test.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { mockExploreState } from 'test/mocks/mockExploreState'; -import { epicTester } from 'test/core/redux/epicTester'; -import { runQueriesAction, stateSaveAction, runQueriesBatchAction, clearQueriesAction } from '../actionTypes'; -import { runQueriesEpic } from './runQueriesEpic'; - -describe('runQueriesEpic', () => { - describe('when runQueriesAction is dispatched', () => { - describe('and there is no datasourceError', () => { - describe('and we have non empty queries', () => { - describe('and explore is not live', () => { - it('then runQueriesBatchAction and stateSaveAction are dispatched', () => { - const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }]; - const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ queries }); - - epicTester(runQueriesEpic, state) - .whenActionIsDispatched(runQueriesAction({ exploreId })) - .thenResultingActionsEqual( - runQueriesBatchAction({ - exploreId, - queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: false }, - }) - ); - }); - }); - - describe('and explore is live', () => { - it('then runQueriesBatchAction and stateSaveAction are dispatched', () => { - const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }]; - const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ - queries, - isLive: true, - streaming: true, - }); - - epicTester(runQueriesEpic, state) - .whenActionIsDispatched(runQueriesAction({ exploreId })) - .thenResultingActionsEqual( - runQueriesBatchAction({ - exploreId, - queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: true }, - }) - ); - }); - }); - }); - - describe('and we have no queries', () => { - it('then clearQueriesAction and stateSaveAction are dispatched', () => { - const queries: any[] = []; - const { exploreId, state } = mockExploreState({ queries }); - - epicTester(runQueriesEpic, state) - .whenActionIsDispatched(runQueriesAction({ exploreId })) - .thenResultingActionsEqual(clearQueriesAction({ exploreId }), stateSaveAction()); - }); - }); - }); - - describe('and there is a datasourceError', () => { - it('then no actions are dispatched', () => { - const { exploreId, state } = mockExploreState({ - datasourceError: { message: 'Some error' }, - }); - - epicTester(runQueriesEpic, state) - .whenActionIsDispatched(runQueriesAction({ exploreId })) - .thenNoActionsWhereDispatched(); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/runQueriesEpic.ts b/public/app/features/explore/state/epics/runQueriesEpic.ts deleted file mode 100644 index 2102c11b103..00000000000 --- a/public/app/features/explore/state/epics/runQueriesEpic.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { Epic } from 'redux-observable'; -import { NEVER } from 'rxjs'; -import { mergeMap } from 'rxjs/operators'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { hasNonEmptyQuery } from 'app/core/utils/explore'; -import { - clearQueriesAction, - runQueriesAction, - RunQueriesPayload, - runQueriesBatchAction, - stateSaveAction, -} from '../actionTypes'; - -export const runQueriesEpic: Epic, ActionOf, StoreState> = (action$, state$) => { - return action$.ofType(runQueriesAction.type).pipe( - mergeMap((action: ActionOf) => { - const { exploreId } = action.payload; - const { datasourceInstance, queries, datasourceError, containerWidth, isLive } = state$.value.explore[exploreId]; - - if (datasourceError) { - // let's not run any queries if data source is in a faulty state - return NEVER; - } - - if (!hasNonEmptyQuery(queries)) { - return [clearQueriesAction({ exploreId }), stateSaveAction()]; // Remember to save to state and update location - } - - // Some datasource's query builders allow per-query interval limits, - // but we're using the datasource interval limit for now - const interval = datasourceInstance.interval; - const live = isLive; - - return [runQueriesBatchAction({ exploreId, queryOptions: { interval, maxDataPoints: containerWidth, live } })]; - }) - ); -}; diff --git a/public/app/features/explore/state/epics/stateSaveEpic.test.ts b/public/app/features/explore/state/epics/stateSaveEpic.test.ts deleted file mode 100644 index 9670ae996ee..00000000000 --- a/public/app/features/explore/state/epics/stateSaveEpic.test.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { epicTester } from 'test/core/redux/epicTester'; -import { stateSaveEpic } from './stateSaveEpic'; -import { stateSaveAction, setUrlReplacedAction } from '../actionTypes'; -import { updateLocation } from 'app/core/actions/location'; -import { mockExploreState } from 'test/mocks/mockExploreState'; - -describe('stateSaveEpic', () => { - describe('when stateSaveAction is dispatched', () => { - describe('and there is a left state', () => { - describe('and no split', () => { - it('then the correct actions are dispatched', () => { - const { exploreId, state } = mockExploreState(); - - epicTester(stateSaveEpic, state) - .whenActionIsDispatched(stateSaveAction()) - .thenResultingActionsEqual( - updateLocation({ - query: { orgId: '1', left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]' }, - replace: true, - }), - setUrlReplacedAction({ exploreId }) - ); - }); - }); - - describe('and explore is split', () => { - it('then the correct actions are dispatched', () => { - const { exploreId, state } = mockExploreState({ split: true }); - - epicTester(stateSaveEpic, state) - .whenActionIsDispatched(stateSaveAction()) - .thenResultingActionsEqual( - updateLocation({ - query: { - orgId: '1', - left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]', - right: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]', - }, - replace: true, - }), - setUrlReplacedAction({ exploreId }) - ); - }); - }); - }); - - describe('and urlReplaced is true', () => { - it('then setUrlReplacedAction should not be dispatched', () => { - const { state } = mockExploreState({ urlReplaced: true }); - - epicTester(stateSaveEpic, state) - .whenActionIsDispatched(stateSaveAction()) - .thenResultingActionsEqual( - updateLocation({ - query: { orgId: '1', left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]' }, - replace: false, - }) - ); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/stateSaveEpic.ts b/public/app/features/explore/state/epics/stateSaveEpic.ts deleted file mode 100644 index 1b36f92ecb0..00000000000 --- a/public/app/features/explore/state/epics/stateSaveEpic.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { Epic } from 'redux-observable'; -import { mergeMap } from 'rxjs/operators'; -import { RawTimeRange, TimeRange } from '@grafana/data'; -import { isDateTime } from '@grafana/data'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { ExploreUrlState, ExploreId } from 'app/types/explore'; -import { clearQueryKeys, serializeStateToUrlParam } from 'app/core/utils/explore'; -import { updateLocation } from 'app/core/actions/location'; -import { setUrlReplacedAction, stateSaveAction } from '../actionTypes'; - -const toRawTimeRange = (range: TimeRange): RawTimeRange => { - let from = range.raw.from; - if (isDateTime(from)) { - from = from.valueOf().toString(10); - } - - let to = range.raw.to; - if (isDateTime(to)) { - to = to.valueOf().toString(10); - } - - return { - from, - to, - }; -}; - -export const stateSaveEpic: Epic, ActionOf, StoreState> = (action$, state$) => { - return action$.ofType(stateSaveAction.type).pipe( - mergeMap(() => { - const { left, right, split } = state$.value.explore; - const orgId = state$.value.user.orgId.toString(); - const replace = left && left.urlReplaced === false; - const urlStates: { [index: string]: string } = { orgId }; - const leftUrlState: ExploreUrlState = { - datasource: left.datasourceInstance.name, - queries: left.queries.map(clearQueryKeys), - range: toRawTimeRange(left.range), - mode: left.mode, - ui: { - showingGraph: left.showingGraph, - showingLogs: true, - showingTable: left.showingTable, - dedupStrategy: left.dedupStrategy, - }, - }; - urlStates.left = serializeStateToUrlParam(leftUrlState, true); - if (split) { - const rightUrlState: ExploreUrlState = { - datasource: right.datasourceInstance.name, - queries: right.queries.map(clearQueryKeys), - range: toRawTimeRange(right.range), - mode: right.mode, - ui: { - showingGraph: right.showingGraph, - showingLogs: true, - showingTable: right.showingTable, - dedupStrategy: right.dedupStrategy, - }, - }; - - urlStates.right = serializeStateToUrlParam(rightUrlState, true); - } - - const actions: Array> = [updateLocation({ query: urlStates, replace })]; - if (replace) { - actions.push(setUrlReplacedAction({ exploreId: ExploreId.left })); - } - - return actions; - }) - ); -}; diff --git a/public/app/features/explore/state/epics/timeEpic.test.ts b/public/app/features/explore/state/epics/timeEpic.test.ts deleted file mode 100644 index f1742374469..00000000000 --- a/public/app/features/explore/state/epics/timeEpic.test.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { dateTime, DefaultTimeZone } from '@grafana/data'; - -import { epicTester } from 'test/core/redux/epicTester'; -import { mockExploreState } from 'test/mocks/mockExploreState'; -import { timeEpic } from './timeEpic'; -import { updateTimeRangeAction, changeRangeAction } from '../actionTypes'; -import { EpicDependencies } from 'app/store/configureStore'; - -const from = dateTime('2019-01-01 10:00:00.000Z'); -const to = dateTime('2019-01-01 16:00:00.000Z'); -const rawFrom = 'now-6h'; -const rawTo = 'now'; -const rangeMock = { - from, - to, - raw: { - from: rawFrom, - to: rawTo, - }, -}; - -describe('timeEpic', () => { - describe('when updateTimeRangeAction is dispatched', () => { - describe('and no rawRange is supplied', () => { - describe('and no absoluteRange is supplied', () => { - it('then the correct actions are dispatched', () => { - const { exploreId, state, range } = mockExploreState({ range: rangeMock }); - const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() }; - const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } }; - const getTimeRange = jest.fn().mockReturnValue(rangeMock); - const dependencies: Partial = { - getTimeRange, - }; - - epicTester(timeEpic, stateToTest, dependencies) - .whenActionIsDispatched(updateTimeRangeAction({ exploreId })) - .thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init') - .thenDependencyWasCalledTimes(1, 'getTimeRange') - .thenDependencyWasCalledWith([DefaultTimeZone, rangeMock.raw], 'getTimeRange') - .thenResultingActionsEqual( - changeRangeAction({ - exploreId, - range, - absoluteRange, - }) - ); - }); - }); - - describe('and absoluteRange is supplied', () => { - it('then the correct actions are dispatched', () => { - const { exploreId, state, range } = mockExploreState({ range: rangeMock }); - const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() }; - const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } }; - const getTimeRange = jest.fn().mockReturnValue(rangeMock); - const dependencies: Partial = { - getTimeRange, - }; - - epicTester(timeEpic, stateToTest, dependencies) - .whenActionIsDispatched(updateTimeRangeAction({ exploreId, absoluteRange })) - .thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init') - .thenDependencyWasCalledTimes(1, 'getTimeRange') - .thenDependencyWasCalledWith([DefaultTimeZone, { from: null, to: null }], 'getTimeRange') - .thenDependencyWasCalledTimes(2, 'dateTimeForTimeZone') - .thenResultingActionsEqual( - changeRangeAction({ - exploreId, - range, - absoluteRange, - }) - ); - }); - }); - }); - - describe('and rawRange is supplied', () => { - describe('and no absoluteRange is supplied', () => { - it('then the correct actions are dispatched', () => { - const { exploreId, state, range } = mockExploreState({ range: rangeMock }); - const rawRange = { from: 'now-5m', to: 'now' }; - const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() }; - const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } }; - const getTimeRange = jest.fn().mockReturnValue(rangeMock); - const dependencies: Partial = { - getTimeRange, - }; - - epicTester(timeEpic, stateToTest, dependencies) - .whenActionIsDispatched(updateTimeRangeAction({ exploreId, rawRange })) - .thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init') - .thenDependencyWasCalledTimes(1, 'getTimeRange') - .thenDependencyWasCalledWith([DefaultTimeZone, rawRange], 'getTimeRange') - .thenResultingActionsEqual( - changeRangeAction({ - exploreId, - range, - absoluteRange, - }) - ); - }); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics/timeEpic.ts b/public/app/features/explore/state/epics/timeEpic.ts deleted file mode 100644 index 3b2a9950ef6..00000000000 --- a/public/app/features/explore/state/epics/timeEpic.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Epic } from 'redux-observable'; -import { map } from 'rxjs/operators'; -import { AbsoluteTimeRange, RawTimeRange } from '@grafana/data'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { updateTimeRangeAction, UpdateTimeRangePayload, changeRangeAction } from '../actionTypes'; -import { EpicDependencies } from 'app/store/configureStore'; - -export const timeEpic: Epic, ActionOf, StoreState, EpicDependencies> = ( - action$, - state$, - { getTimeSrv, getTimeRange, getTimeZone, dateTimeForTimeZone } -) => { - return action$.ofType(updateTimeRangeAction.type).pipe( - map((action: ActionOf) => { - const { exploreId, absoluteRange: absRange, rawRange: actionRange } = action.payload; - const itemState = state$.value.explore[exploreId]; - const timeZone = getTimeZone(state$.value.user); - const { range: rangeInState } = itemState; - let rawRange: RawTimeRange = rangeInState.raw; - - if (absRange) { - rawRange = { - from: dateTimeForTimeZone(timeZone, absRange.from), - to: dateTimeForTimeZone(timeZone, absRange.to), - }; - } - - if (actionRange) { - rawRange = actionRange; - } - - const range = getTimeRange(timeZone, rawRange); - const absoluteRange: AbsoluteTimeRange = { from: range.from.valueOf(), to: range.to.valueOf() }; - - getTimeSrv().init({ - time: range.raw, - refresh: false, - getTimezone: () => timeZone, - timeRangeUpdated: (): any => undefined, - }); - - return changeRangeAction({ exploreId, range, absoluteRange }); - }) - ); -}; diff --git a/public/app/features/explore/state/reducers.test.ts b/public/app/features/explore/state/reducers.test.ts index 4cd976b53f5..096e14b22e6 100644 --- a/public/app/features/explore/state/reducers.test.ts +++ b/public/app/features/explore/state/reducers.test.ts @@ -26,12 +26,14 @@ import { serializeStateToUrlParam } from 'app/core/utils/explore'; import TableModel from 'app/core/table_model'; import { DataSourceApi, DataQuery } from '@grafana/ui'; import { LogsModel, LogsDedupStrategy, LoadingState } from '@grafana/data'; +import { PanelQueryState } from '../../dashboard/state/PanelQueryState'; describe('Explore item reducer', () => { describe('scanning', () => { it('should start scanning', () => { const initalState = { ...makeExploreItemState(), + queryState: null as PanelQueryState, scanning: false, }; @@ -40,12 +42,14 @@ describe('Explore item reducer', () => { .whenActionIsDispatched(scanStartAction({ exploreId: ExploreId.left })) .thenStateShouldEqual({ ...makeExploreItemState(), + queryState: null as PanelQueryState, scanning: true, }); }); it('should stop scanning', () => { const initalState = { ...makeExploreItemState(), + queryState: null as PanelQueryState, scanning: true, scanRange: {}, }; @@ -55,6 +59,7 @@ describe('Explore item reducer', () => { .whenActionIsDispatched(scanStopAction({ exploreId: ExploreId.left })) .thenStateShouldEqual({ ...makeExploreItemState(), + queryState: null as PanelQueryState, scanning: false, scanRange: undefined, }); diff --git a/public/app/features/explore/state/reducers.ts b/public/app/features/explore/state/reducers.ts index 58d5739c691..8afa1ff5de6 100644 --- a/public/app/features/explore/state/reducers.ts +++ b/public/app/features/explore/state/reducers.ts @@ -7,6 +7,7 @@ import { DEFAULT_UI_STATE, generateNewKeyAndAddRefIdIfMissing, sortLogsResult, + stopQueryState, refreshIntervalToSortOrder, } from 'app/core/utils/explore'; import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore'; @@ -31,9 +32,6 @@ import { queryStartAction, runQueriesAction, changeRangeAction, -} from './actionTypes'; -import { reducerFactory } from 'app/core/redux'; -import { addQueryRowAction, changeQueryAction, changeSizeAction, @@ -53,11 +51,15 @@ import { queriesImportedAction, updateUIStateAction, toggleLogLevelAction, + changeLoadingStateAction, + resetExploreAction, } from './actionTypes'; +import { reducerFactory } from 'app/core/redux'; import { updateLocation } from 'app/core/actions/location'; import { LocationUpdate } from '@grafana/runtime'; import TableModel from 'app/core/table_model'; import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; +import { PanelQueryState } from '../../dashboard/state/PanelQueryState'; export const DEFAULT_RANGE = { from: 'now-6h', @@ -114,6 +116,7 @@ export const makeExploreItemState = (): ExploreItemState => ({ mode: null, isLive: false, urlReplaced: false, + queryState: new PanelQueryState(), }); /** @@ -186,6 +189,9 @@ export const itemReducer = reducerFactory({} as ExploreItemSta const live = isLive(refreshInterval); const sortOrder = refreshIntervalToSortOrder(refreshInterval); const logsResult = sortLogsResult(state.logsResult, sortOrder); + if (isLive(state.refreshInterval) && !live) { + stopQueryState(state.queryState, 'Live streaming stopped'); + } return { ...state, @@ -200,6 +206,7 @@ export const itemReducer = reducerFactory({} as ExploreItemSta filter: clearQueriesAction, mapper: (state): ExploreItemState => { const queries = ensureQueries(); + stopQueryState(state.queryState, 'Queries cleared'); return { ...state, queries: queries.slice(), @@ -258,6 +265,7 @@ export const itemReducer = reducerFactory({} as ExploreItemSta // Custom components const StartPage = datasourceInstance.components.ExploreStartPage; + stopQueryState(state.queryState, 'Datasource changed'); return { ...state, @@ -577,6 +585,16 @@ export const itemReducer = reducerFactory({} as ExploreItemSta }; }, }) + .addMapper({ + filter: changeLoadingStateAction, + mapper: (state, action): ExploreItemState => { + const { loadingState } = action.payload; + return { + ...state, + loadingState, + }; + }, + }) .create(); export const updateChildRefreshState = ( @@ -664,6 +682,19 @@ export const exploreReducer = (state = initialExploreState, action: HigherOrderA [ExploreId.right]: updateChildRefreshState(rightState, action.payload, ExploreId.right), }; } + + case resetExploreAction.type: { + const leftState = state[ExploreId.left]; + const rightState = state[ExploreId.right]; + stopQueryState(leftState.queryState, 'Navigated away from Explore'); + stopQueryState(rightState.queryState, 'Navigated away from Explore'); + + return { + ...state, + [ExploreId.left]: updateChildRefreshState(leftState, action.payload, ExploreId.left), + [ExploreId.right]: updateChildRefreshState(rightState, action.payload, ExploreId.right), + }; + } } if (action.payload) { diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index 45b6d135e7a..daaa9290192 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -3,14 +3,12 @@ import _ from 'lodash'; import { Subscription, of } from 'rxjs'; import { webSocket } from 'rxjs/webSocket'; import { catchError, map } from 'rxjs/operators'; - // Services & Utils -import { dateMath } from '@grafana/data'; +import { dateMath, DataFrame, LogRowModel, LoadingState, DateTime } from '@grafana/data'; import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query'; import LanguageProvider from './language_provider'; import { logStreamToDataFrame } from './result_transformer'; import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils'; - // Types import { PluginMeta, @@ -22,8 +20,6 @@ import { DataStreamState, DataQueryResponse, } from '@grafana/ui'; - -import { DataFrame, LogRowModel, LoadingState, DateTime } from '@grafana/data'; import { LokiQuery, LokiOptions } from './types'; import { BackendSrv } from 'app/core/services/backend_srv'; import { TemplateSrv } from 'app/features/templating/template_srv'; @@ -179,12 +175,12 @@ export class LokiDatasource extends DataSourceApi { const subscription = webSocket(liveTarget.url) .pipe( map((results: any[]) => { - const delta = this.processResult(results, liveTarget); + const data = this.processResult(results, liveTarget); const state: DataStreamState = { key: `loki-${liveTarget.refId}`, request: options, state: LoadingState.Streaming, - delta, + data, unsubscribe: () => this.unsubscribe(liveTarget.refId), }; diff --git a/public/app/plugins/datasource/prometheus/datasource.ts b/public/app/plugins/datasource/prometheus/datasource.ts index 1e6a3a43ba0..300c6e1d85e 100644 --- a/public/app/plugins/datasource/prometheus/datasource.ts +++ b/public/app/plugins/datasource/prometheus/datasource.ts @@ -1,12 +1,12 @@ // Libraries import _ from 'lodash'; import $ from 'jquery'; -import { from, of, Observable } from 'rxjs'; -import { single, map, filter, catchError } from 'rxjs/operators'; - // Services & Utils import kbn from 'app/core/utils/kbn'; -import { dateMath } from '@grafana/data'; +import { dateMath, TimeRange, DateTime, AnnotationEvent, LoadingState } from '@grafana/data'; +import { Observable, from, of } from 'rxjs'; +import { single, filter, mergeMap, catchError } from 'rxjs/operators'; + import PrometheusMetricFindQuery from './metric_find_query'; import { ResultTransformer } from './result_transformer'; import PrometheusLanguageProvider from './language_provider'; @@ -14,7 +14,6 @@ import { BackendSrv } from 'app/core/services/backend_srv'; import addLabelToQuery from './add_label_to_query'; import { getQueryHints } from './query_hints'; import { expandRecordingRules } from './language_utils'; - // Types import { PromQuery, PromOptions, PromQueryRequest, PromContext } from './types'; import { @@ -23,14 +22,13 @@ import { DataSourceInstanceSettings, DataQueryError, DataStreamObserver, - DataStreamState, DataQueryResponseData, + DataStreamState, } from '@grafana/ui'; import { ExploreUrlState } from 'app/types/explore'; import { safeStringifyValue } from 'app/core/utils/explore'; import { TemplateSrv } from 'app/features/templating/template_srv'; import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; -import { TimeRange, DateTime, LoadingState, AnnotationEvent } from '@grafana/data'; export interface PromDataQueryResponse { data: { @@ -183,6 +181,26 @@ export class PrometheusDatasource extends DataSourceApi activeTargets: PromQuery[], end: number ) => { + // Because we want to get run instant and TimeSeries Prom queries in parallel but this isn't actually streaming + // we need to stop/cancel each posted event with a stop stream event (see below) to the observer so that the + // PanelQueryState stops the stream + const getStopState = (state: DataStreamState): DataStreamState => ({ + ...state, + state: LoadingState.Done, + request: { ...options, requestId: 'done' }, + }); + + const startLoadingEvent: DataStreamState = { + key: `prometheus-loading_indicator`, + state: LoadingState.Loading, + request: options, + data: [], + unsubscribe: () => undefined, + }; + + observer(startLoadingEvent); // Starts the loading indicator + const lastTimeSeriesQuery = queries.filter(query => !query.instant).pop(); + for (let index = 0; index < queries.length; index++) { const query = queries[index]; const target = activeTargets[index]; @@ -198,17 +216,23 @@ export class PrometheusDatasource extends DataSourceApi .pipe( single(), // unsubscribes automatically after first result filter((response: any) => (response.cancelled ? false : true)), - map((response: any) => { - const delta = this.processResult(response, query, target, queries.length); + mergeMap((response: any) => { + const data = this.processResult(response, query, target, queries.length); const state: DataStreamState = { key: `prometheus-${target.refId}`, - state: query.instant ? LoadingState.Loading : LoadingState.Done, + state: LoadingState.Loading, request: options, - delta, + data, unsubscribe: () => undefined, }; - return state; + const states = [state, getStopState(state)]; + + if (target.refId === lastTimeSeriesQuery.refId && target.expr === lastTimeSeriesQuery.expr) { + states.push(getStopState(startLoadingEvent)); // Stops the loading indicator + } + + return states; }), catchError(err => { const error = this.handleErrors(err, target); @@ -282,7 +306,6 @@ export class PrometheusDatasource extends DataSourceApi this.runObserverQueries(options, observer, queries, activeTargets, end); return this.$q.when({ data: [] }) as Promise<{ data: any }>; } - const allQueryPromise = _.map(queries, query => { if (query.instant) { return this.performInstantQuery(query, end); diff --git a/public/app/store/configureStore.ts b/public/app/store/configureStore.ts index bde42dcc5e7..f549f81bcc8 100644 --- a/public/app/store/configureStore.ts +++ b/public/app/store/configureStore.ts @@ -1,6 +1,5 @@ import { createStore, applyMiddleware, compose, combineReducers } from 'redux'; import thunk from 'redux-thunk'; -import { combineEpics, createEpicMiddleware } from 'redux-observable'; import { createLogger } from 'redux-logger'; import sharedReducers from 'app/core/reducers'; import alertingReducers from 'app/features/alerting/state/reducers'; @@ -15,41 +14,8 @@ import usersReducers from 'app/features/users/state/reducers'; import userReducers from 'app/features/profile/state/reducers'; import organizationReducers from 'app/features/org/state/reducers'; import { setStore } from './store'; -import { limitMessageRateEpic } from 'app/features/explore/state/epics/limitMessageRateEpic'; -import { stateSaveEpic } from 'app/features/explore/state/epics/stateSaveEpic'; -import { processQueryResultsEpic } from 'app/features/explore/state/epics/processQueryResultsEpic'; -import { processQueryErrorsEpic } from 'app/features/explore/state/epics/processQueryErrorsEpic'; -import { runQueriesEpic } from 'app/features/explore/state/epics/runQueriesEpic'; -import { runQueriesBatchEpic } from 'app/features/explore/state/epics/runQueriesBatchEpic'; -import { - DataSourceApi, - DataQueryResponse, - DataQuery, - DataSourceJsonData, - DataQueryRequest, - DataStreamObserver, -} from '@grafana/ui'; - -import { - TimeZone, - RawTimeRange, - TimeRange, - DateTimeInput, - FormatInput, - DateTime, - AbsoluteTimeRange, - dateTimeForTimeZone, -} from '@grafana/data'; -import { Observable } from 'rxjs'; -import { getQueryResponse } from 'app/core/utils/explore'; import { StoreState } from 'app/types/store'; import { toggleLogActionsMiddleware } from 'app/core/middlewares/application'; -import { timeEpic } from 'app/features/explore/state/epics/timeEpic'; -import { TimeSrv, getTimeSrv } from 'app/features/dashboard/services/TimeSrv'; -import { UserState } from 'app/types/user'; -import { getTimeRange } from 'app/core/utils/explore'; -import { getTimeZone } from 'app/features/profile/state/selectors'; -import { getShiftedTimeRange } from 'app/core/utils/timePicker'; const rootReducers = { ...sharedReducers, @@ -70,40 +36,6 @@ export function addRootReducer(reducers: any) { Object.assign(rootReducers, ...reducers); } -export const rootEpic: any = combineEpics( - limitMessageRateEpic, - stateSaveEpic, - runQueriesEpic, - runQueriesBatchEpic, - processQueryResultsEpic, - processQueryErrorsEpic, - timeEpic -); - -export interface EpicDependencies { - getQueryResponse: ( - datasourceInstance: DataSourceApi, - options: DataQueryRequest, - observer?: DataStreamObserver - ) => Observable; - getTimeSrv: () => TimeSrv; - getTimeRange: (timeZone: TimeZone, rawRange: RawTimeRange) => TimeRange; - getTimeZone: (state: UserState) => TimeZone; - getShiftedTimeRange: (direction: number, origRange: TimeRange, timeZone: TimeZone) => AbsoluteTimeRange; - dateTimeForTimeZone: (timezone?: TimeZone, input?: DateTimeInput, formatInput?: FormatInput) => DateTime; -} - -const dependencies: EpicDependencies = { - getQueryResponse, - getTimeSrv, - getTimeRange, - getTimeZone, - getShiftedTimeRange, - dateTimeForTimeZone, -}; - -const epicMiddleware = createEpicMiddleware({ dependencies }); - export function configureStore() { const composeEnhancers = (window as any).__REDUX_DEVTOOLS_EXTENSION_COMPOSE__ || compose; const rootReducer = combineReducers(rootReducers); @@ -114,11 +46,10 @@ export function configureStore() { }); const storeEnhancers = process.env.NODE_ENV !== 'production' - ? applyMiddleware(toggleLogActionsMiddleware, thunk, epicMiddleware, logger) - : applyMiddleware(thunk, epicMiddleware); + ? applyMiddleware(toggleLogActionsMiddleware, thunk, logger) + : applyMiddleware(thunk); const store = createStore(rootReducer, {}, composeEnhancers(storeEnhancers)); setStore(store); - epicMiddleware.run(rootEpic); return store; } diff --git a/public/app/types/explore.ts b/public/app/types/explore.ts index 1d31966d1b0..8c7d5036da1 100644 --- a/public/app/types/explore.ts +++ b/public/app/types/explore.ts @@ -21,6 +21,7 @@ import { import { Emitter } from 'app/core/core'; import TableModel from 'app/core/table_model'; +import { PanelQueryState } from '../features/dashboard/state/PanelQueryState'; export enum ExploreMode { Metrics = 'Metrics', @@ -255,6 +256,8 @@ export interface ExploreItemState { isLive: boolean; urlReplaced: boolean; + + queryState: PanelQueryState; } export interface ExploreUpdateState { diff --git a/public/test/core/redux/epicTester.ts b/public/test/core/redux/epicTester.ts deleted file mode 100644 index c9ca7bb7820..00000000000 --- a/public/test/core/redux/epicTester.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { Epic, ActionsObservable, StateObservable } from 'redux-observable'; -import { Subject } from 'rxjs'; -import { - DataSourceApi, - DataQuery, - DataSourceJsonData, - DataQueryRequest, - DataStreamObserver, - DataQueryResponse, - DataStreamState, -} from '@grafana/ui'; -import { DefaultTimeZone } from '@grafana/data'; - -import { ActionOf } from 'app/core/redux/actionCreatorFactory'; -import { StoreState } from 'app/types/store'; -import { EpicDependencies } from 'app/store/configureStore'; -import { TimeSrv } from 'app/features/dashboard/services/TimeSrv'; -import { DEFAULT_RANGE } from 'app/core/utils/explore'; - -export const MOCKED_ABSOLUTE_RANGE = { from: 1, to: 2 }; - -export const epicTester = ( - epic: Epic, ActionOf, StoreState, EpicDependencies>, - state?: Partial, - dependencies?: Partial -) => { - const resultingActions: Array> = []; - const action$ = new Subject>(); - const state$ = new Subject(); - const actionObservable$ = new ActionsObservable(action$); - const stateObservable$ = new StateObservable(state$, (state as StoreState) || ({} as StoreState)); - const queryResponse$ = new Subject(); - const observer$ = new Subject(); - const getQueryResponse = ( - datasourceInstance: DataSourceApi, - options: DataQueryRequest, - observer?: DataStreamObserver - ) => { - if (observer) { - observer$.subscribe({ next: event => observer(event) }); - } - return queryResponse$; - }; - const init = jest.fn(); - const getTimeSrv = (): TimeSrv => { - const timeSrvMock: TimeSrv = {} as TimeSrv; - - return Object.assign(timeSrvMock, { init }); - }; - - const getTimeRange = jest.fn().mockReturnValue(DEFAULT_RANGE); - - const getShiftedTimeRange = jest.fn().mockReturnValue(MOCKED_ABSOLUTE_RANGE); - - const getTimeZone = jest.fn().mockReturnValue(DefaultTimeZone); - - const dateTimeForTimeZone = jest.fn().mockReturnValue(null); - - const defaultDependencies: EpicDependencies = { - getQueryResponse, - getTimeSrv, - getTimeRange, - getTimeZone, - getShiftedTimeRange, - dateTimeForTimeZone, - }; - - const theDependencies: EpicDependencies = { ...defaultDependencies, ...dependencies }; - - epic(actionObservable$, stateObservable$, theDependencies).subscribe({ - next: action => resultingActions.push(action), - }); - - const whenActionIsDispatched = (action: ActionOf) => { - action$.next(action); - - return instance; - }; - - const whenQueryReceivesResponse = (response: DataQueryResponse) => { - queryResponse$.next(response); - - return instance; - }; - - const whenQueryThrowsError = (error: any) => { - queryResponse$.error(error); - - return instance; - }; - - const whenQueryObserverReceivesEvent = (event: DataStreamState) => { - observer$.next(event); - - return instance; - }; - - const thenResultingActionsEqual = (...actions: Array>) => { - expect(actions).toEqual(resultingActions); - - return instance; - }; - - const thenNoActionsWhereDispatched = () => { - expect(resultingActions).toEqual([]); - - return instance; - }; - - const getDependencyMock = (dependency: string, method?: string) => { - // @ts-ignore - const dep = theDependencies[dependency]; - let mock = null; - if (dep instanceof Function) { - mock = method ? dep()[method] : dep(); - } else { - mock = method ? dep[method] : dep; - } - - return mock; - }; - - const thenDependencyWasCalledTimes = (times: number, dependency: string, method?: string) => { - const mock = getDependencyMock(dependency, method); - expect(mock).toBeCalledTimes(times); - - return instance; - }; - - const thenDependencyWasCalledWith = (args: any[], dependency: string, method?: string) => { - const mock = getDependencyMock(dependency, method); - expect(mock).toBeCalledWith(...args); - - return instance; - }; - - const instance = { - whenActionIsDispatched, - whenQueryReceivesResponse, - whenQueryThrowsError, - whenQueryObserverReceivesEvent, - thenResultingActionsEqual, - thenNoActionsWhereDispatched, - thenDependencyWasCalledTimes, - thenDependencyWasCalledWith, - }; - - return instance; -};