import { Observable, debounce, debounceTime, defer, finalize, first, interval, map, of } from 'rxjs'; import { DataSourceApi, DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings, TestDataSourceResponse, ScopedVar, DataTopic, PanelData, DataFrame, LoadingState, Field, FieldType, AdHocVariableFilter, MetricFindValue, getValueMatcher, ValueMatcherID, } from '@grafana/data'; import { config } from '@grafana/runtime'; import { SceneDataProvider, SceneDataTransformer, SceneObject } from '@grafana/scenes'; import { activateSceneObjectAndParentTree, findOriginalVizPanelByKey, getVizPanelKeyForPanelId, } from 'app/features/dashboard-scene/utils/utils'; import { MIXED_REQUEST_PREFIX } from '../mixed/MixedDataSource'; import { DashboardQuery } from './types'; /** * This should not really be called */ export class DashboardDatasource extends DataSourceApi { constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } getCollapsedText(query: DashboardQuery) { return `Dashboard Reference: ${query.panelId}`; } query(options: DataQueryRequest): Observable { const sceneScopedVar: ScopedVar | undefined = options.scopedVars?.__sceneObject; let scene: SceneObject | undefined = sceneScopedVar ? (sceneScopedVar.value.valueOf() as SceneObject) : undefined; if (!scene) { throw new Error('Can only be called from a scene'); } const query = options.targets[0]; if (!query) { return of({ data: [] }); } const panelId = query.panelId; if (!panelId) { return of({ data: [] }); } let sourcePanel = this.findSourcePanel(scene, panelId); if (!sourcePanel) { return of({ data: [], error: { message: 'Could not find source panel' } }); } let sourceDataProvider: SceneDataProvider | undefined = sourcePanel.state.$data; if (!query.withTransforms && sourceDataProvider instanceof SceneDataTransformer) { sourceDataProvider = sourceDataProvider.state.$data; } if (!sourceDataProvider || !sourceDataProvider.getResultsStream) { return of({ data: [] }); } // Extract AdHoc filters from the request const adHocFilters = options.filters || []; return defer(() => { if (!sourceDataProvider!.isActive && sourceDataProvider?.setContainerWidth) { sourceDataProvider?.setContainerWidth(500); } const cleanUp = activateSceneObjectAndParentTree(sourceDataProvider!); return sourceDataProvider!.getResultsStream!().pipe( debounceTime(50), map((result) => { return { data: this.getDataFramesForQueryTopic(result.data, query, adHocFilters), state: result.data.state, errors: result.data.errors, error: result.data.error, key: 'source-ds-provider', }; }), this.emitFirstLoadedDataIfMixedDS(options.requestId), finalize(() => cleanUp?.()) ); }); } private getDataFramesForQueryTopic( data: PanelData, query: DashboardQuery, filters: AdHocVariableFilter[] ): DataFrame[] { const annotations = data.annotations ?? []; if (query.topic === DataTopic.Annotations) { return annotations.map((frame) => ({ ...frame, meta: { ...frame.meta, dataTopic: DataTopic.Series, }, })); } else { const series = data.series.map((s) => { return { ...s, fields: s.fields.map((field: Field) => ({ ...field, config: { ...field.config, // Enable AdHoc filtering for string and numeric fields only when feature toggle is enabled filterable: config.featureToggles.dashboardDsAdHocFiltering ? field.type === FieldType.string || field.type === FieldType.number : field.config.filterable, }, state: { ...field.state, }, })), }; }); if (!config.featureToggles.dashboardDsAdHocFiltering || filters.length === 0) { return [...series, ...annotations]; } // Apply AdHoc filters to series data const filteredSeries = series.map((frame) => this.applyAdHocFilters(frame, filters)); return [...filteredSeries, ...annotations]; } } /** * Apply AdHoc filters to a DataFrame * Optimized version with pre-computed field indices and value matchers for better performance */ private applyAdHocFilters(frame: DataFrame, filters: AdHocVariableFilter[]): DataFrame { if (filters.length === 0 || frame.length === 0) { return frame; } // Pre-compute field indices and value matchers for better performance const filterFieldIndices = filters .map((filter) => { const fieldIndex = frame.fields.findIndex((f) => f.name === filter.key); return { filter, fieldIndex, matcher: this.createValueMatcher(filter, fieldIndex, frame) }; }) .filter(({ filter, fieldIndex, matcher }) => { // If field is not present: // - Keep filters with '=' operator (will always be false - reject rows) // - Remove filters with '!=' operator (will always be true - no effect) if (fieldIndex === -1) { return filter.operator === '='; } // Only keep filters with valid matchers return matcher !== null; }); // If no filters remain after optimization, return original frame if (filterFieldIndices.length === 0) { return frame; } // Short-circuit: if any filter has '=' operator with missing field, reject all rows const hasImpossibleFilter = filterFieldIndices.some(({ fieldIndex }) => fieldIndex === -1); if (hasImpossibleFilter) { return this.reconstructDataFrame(frame); } const matchingRows = new Set(); // Check each row to see if it matches all filters (AND logic) for (let rowIndex = 0; rowIndex < frame.length; rowIndex++) { const rowMatches = filterFieldIndices.every(({ matcher, fieldIndex }) => { const field = frame.fields[fieldIndex]; // Use Grafana's value matcher system return matcher?.(rowIndex, field, frame, [frame]) ?? false; }); if (rowMatches) { matchingRows.add(rowIndex); } } // Early return if no filtering occurred if (matchingRows.size === frame.length) { return frame; } return this.reconstructDataFrame(frame, matchingRows); } /** * Create a value matcher from an AdHoc filter */ private createValueMatcher(filter: AdHocVariableFilter, fieldIndex: number, frame: DataFrame) { // Return null for missing fields - they are handled separately if (fieldIndex === -1) { return null; } const field = frame.fields[fieldIndex]; // Only support string and numeric fields when feature toggle is enabled if (config.featureToggles.dashboardDsAdHocFiltering) { if (field.type !== FieldType.string && field.type !== FieldType.number) { return null; } } // Map operator to matcher ID let matcherId: ValueMatcherID; switch (filter.operator) { case '=': matcherId = ValueMatcherID.equal; break; case '!=': matcherId = ValueMatcherID.notEqual; break; default: return null; // Unknown operator } try { return getValueMatcher({ id: matcherId, options: { value: filter.value }, }); } catch (error) { console.warn('Failed to create value matcher for filter:', filter, error); return null; } } /** * Reconstruct DataFrame with only matching rows * Optimized to avoid repeated array operations */ private reconstructDataFrame(frame: DataFrame, matchingRows?: Set): DataFrame { // Default to empty set if no matching rows provided (reject all rows) const rows = matchingRows ?? new Set(); const fields: Field[] = frame.fields.map((field) => { // Pre-allocate array and use direct assignment for better performance with large datasets const newValues = new Array(rows.size); let i = 0; for (const rowIndex of rows) { newValues[i++] = field.values[rowIndex]; } return { ...field, values: newValues, state: {}, // Clean the state as it's being recalculated }; }); return { ...frame, fields: fields, length: rows.size, }; } private findSourcePanel(scene: SceneObject, panelId: number) { // We're trying to find the original panel, not a cloned one, since `panelId` alone cannot resolve clones return findOriginalVizPanelByKey(scene, getVizPanelKeyForPanelId(panelId)); } private emitFirstLoadedDataIfMixedDS( requestId: string ): (source: Observable) => Observable { return (source: Observable) => { if (requestId.includes(MIXED_REQUEST_PREFIX)) { let count = 0; return source.pipe( /* * We can have the following piped values scenarios: * Loading -> Done - initial load * Done -> Loading -> Done - refresh * Done - adding another query in editor * * When we see Done as a first element this is because of ReplaySubject in SceneQueryRunner * * we use first(...) below to emit correct result which is last value with Done/Error states * * to avoid emitting first Done/Error (due to ReplaySubject) we selectively debounce only first value with such states */ debounce((val) => { if ([LoadingState.Done, LoadingState.Error].includes(val.state!) && count === 0) { count++; // in the refresh scenario we need to debounce first Done/Error until Loading arrives // 400ms here is a magic number that was sufficient enough with the 20x cpu throttle // this still might affect slower machines but the issue affects only panel view/edit modes return interval(400); } count++; return interval(0); }), first((val) => val.state === LoadingState.Done || val.state === LoadingState.Error) ); } return source; }; } testDatasource(): Promise { return Promise.resolve({ message: '', status: '' }); } getTagKeys(): Promise { // Stub implementation to indicate AdHoc filter support // Full implementation will be added in future PRs return Promise.resolve([]); } }