diff --git a/pkg/tsdb/tempo/metrics_stream.go b/pkg/tsdb/tempo/metrics_stream.go new file mode 100644 index 00000000000..1c4bba6f713 --- /dev/null +++ b/pkg/tsdb/tempo/metrics_stream.go @@ -0,0 +1,103 @@ +package tempo + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/grafana/grafana/pkg/tsdb/tempo/traceql" + "google.golang.org/grpc/metadata" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" + "github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery" + "github.com/grafana/tempo/pkg/tempopb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +const MetricsPathPrefix = "metrics/" + +func (s *Service) runMetricsStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *Datasource) error { + ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runMetricsStream") + defer span.End() + + response := &backend.DataResponse{} + + var backendQuery *backend.DataQuery + err := json.Unmarshal(req.Data, &backendQuery) + if err != nil { + response.Error = fmt.Errorf("error unmarshaling backend query model: %v", err) + span.RecordError(response.Error) + span.SetStatus(codes.Error, response.Error.Error()) + return err + } + + var qrr *tempopb.QueryRangeRequest + err = json.Unmarshal(req.Data, &qrr) + if err != nil { + response.Error = fmt.Errorf("error unmarshaling Tempo query model: %v", err) + span.RecordError(response.Error) + span.SetStatus(codes.Error, response.Error.Error()) + return err + } + + if qrr.GetQuery() == "" { + return fmt.Errorf("query is empty") + } + + qrr.Start = uint64(backendQuery.TimeRange.From.UnixNano()) + qrr.End = uint64(backendQuery.TimeRange.To.UnixNano()) + + // Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config + // changes or updates, so we have to get it from context. + // Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now. + ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String()) + + stream, err := datasource.StreamingClient.MetricsQueryRange(ctx, qrr) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + s.logger.Error("Error Search()", "err", err) + return err + } + + return s.processMetricsStream(ctx, qrr.Query, stream, sender) +} + +func (s *Service) processMetricsStream(ctx context.Context, query string, stream tempopb.StreamingQuerier_MetricsQueryRangeClient, sender StreamSender) error { + ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.processStream") + defer span.End() + messageCount := 0 + for { + msg, err := stream.Recv() + messageCount++ + span.SetAttributes(attribute.Int("message_count", messageCount)) + if errors.Is(err, io.EOF) { + if err := s.sendResponse(ctx, nil, nil, dataquery.SearchStreamingStateDone, sender); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + break + } + if err != nil { + s.logger.Error("Error receiving message", "err", err) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + transformed := traceql.TransformMetricsResponse(query, *msg) + + if err := s.sendResponse(ctx, transformed, msg.Metrics, dataquery.SearchStreamingStateStreaming, sender); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + } + + return nil +} diff --git a/pkg/tsdb/tempo/search_stream.go b/pkg/tsdb/tempo/search_stream.go index d96fec7a216..527a1789f32 100644 --- a/pkg/tsdb/tempo/search_stream.go +++ b/pkg/tsdb/tempo/search_stream.go @@ -89,7 +89,7 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue messageCount++ span.SetAttributes(attribute.Int("message_count", messageCount)) if errors.Is(err, io.EOF) { - if err := s.sendResponse(ctx, &ExtendedResponse{ + if err := s.sendSearchResponse(ctx, &ExtendedResponse{ State: dataquery.SearchStreamingStateDone, SearchResponse: &tempopb.SearchResponse{ Metrics: metrics, @@ -114,7 +114,7 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue traceList = removeDuplicates(traceList) span.SetAttributes(attribute.Int("traces_count", len(traceList))) - if err := s.sendResponse(ctx, &ExtendedResponse{ + if err := s.sendSearchResponse(ctx, &ExtendedResponse{ State: dataquery.SearchStreamingStateStreaming, SearchResponse: &tempopb.SearchResponse{ Metrics: metrics, @@ -130,34 +130,43 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue return nil } -func (s *Service) sendResponse(ctx context.Context, response *ExtendedResponse, sender StreamSender) error { - _, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendResponse") +func (s *Service) sendSearchResponse(ctx context.Context, response *ExtendedResponse, sender StreamSender) error { + _, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendSearchResponse") defer span.End() frame := createResponseDataFrame() if response != nil { span.SetAttributes(attribute.Int("trace_count", len(response.Traces)), attribute.String("state", string(response.State))) - - tracesAsJson, err := json.Marshal(response.Traces) - if err != nil { - return err - } - tracesRawMessage := json.RawMessage(tracesAsJson) - frame.Fields[0].Append(tracesRawMessage) - - metricsAsJson, err := json.Marshal(response.Metrics) - if err != nil { - return err - } - metricsRawMessage := json.RawMessage(metricsAsJson) - frame.Fields[1].Append(metricsRawMessage) - frame.Fields[2].Append(string(response.State)) - frame.Fields[3].Append("") + return s.sendResponse(ctx, response.Traces, response.Metrics, response.State, sender) } return sender.SendFrame(frame, data.IncludeAll) } +func (s *Service) sendResponse(ctx context.Context, result interface{}, metrics *tempopb.SearchMetrics, state dataquery.SearchStreamingState, sender StreamSender) error { + _, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendResponse") + defer span.End() + frame := createResponseDataFrame() + + tracesAsJson, err := json.Marshal(result) + if err != nil { + return err + } + tracesRawMessage := json.RawMessage(tracesAsJson) + frame.Fields[0].Append(tracesRawMessage) + + metricsAsJson, err := json.Marshal(metrics) + if err != nil { + return err + } + metricsRawMessage := json.RawMessage(metricsAsJson) + frame.Fields[1].Append(metricsRawMessage) + frame.Fields[2].Append(string(state)) + frame.Fields[3].Append("") + + return sender.SendFrame(frame, data.IncludeAll) +} + func sendError(searchErr error, sender StreamSender) error { frame := createResponseDataFrame() @@ -173,7 +182,7 @@ func sendError(searchErr error, sender StreamSender) error { func createResponseDataFrame() *data.Frame { frame := data.NewFrame("response") - frame.Fields = append(frame.Fields, data.NewField("traces", nil, []json.RawMessage{})) + frame.Fields = append(frame.Fields, data.NewField("result", nil, []json.RawMessage{})) frame.Fields = append(frame.Fields, data.NewField("metrics", nil, []json.RawMessage{})) frame.Fields = append(frame.Fields, data.NewField("state", nil, []string{})) frame.Fields = append(frame.Fields, data.NewField("error", nil, []string{})) diff --git a/pkg/tsdb/tempo/stream_handler.go b/pkg/tsdb/tempo/stream_handler.go index a9d07b0e4b3..6cae17206c4 100644 --- a/pkg/tsdb/tempo/stream_handler.go +++ b/pkg/tsdb/tempo/stream_handler.go @@ -8,19 +8,22 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" ) -func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { +func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { s.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User) status := backend.SubscribeStreamStatusPermissionDenied if strings.HasPrefix(req.Path, SearchPathPrefix) { status = backend.SubscribeStreamStatusOK } + if strings.HasPrefix(req.Path, MetricsPathPrefix) { + status = backend.SubscribeStreamStatusOK + } return &backend.SubscribeStreamResponse{ Status: status, }, nil } -func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { +func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { s.logger.Debug("PublishStream called") // Do not allow publishing at all. @@ -31,9 +34,9 @@ func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamR func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error { s.logger.Debug("New stream call", "path", request.Path) + tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext) if strings.HasPrefix(request.Path, SearchPathPrefix) { - tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext) if err != nil { return err } @@ -43,6 +46,16 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque return nil } } + if strings.HasPrefix(request.Path, MetricsPathPrefix) { + if err != nil { + return err + } + if err = s.runMetricsStream(ctx, request, sender, tempoDatasource); err != nil { + return sendError(err, sender) + } else { + return nil + } + } return fmt.Errorf("unknown path %s", request.Path) } diff --git a/pkg/tsdb/tempo/trace.go b/pkg/tsdb/tempo/trace.go index 5ad83502f82..a974390e358 100644 --- a/pkg/tsdb/tempo/trace.go +++ b/pkg/tsdb/tempo/trace.go @@ -155,8 +155,10 @@ func (s *Service) performTraceRequest(ctx context.Context, dsInfo *Datasource, a } defer func() { - if err := resp.Body.Close(); err != nil { - ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint()) + if resp != nil && resp.Body != nil { + if err := resp.Body.Close(); err != nil { + ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint()) + } } }() diff --git a/pkg/tsdb/tempo/traceql/metrics.go b/pkg/tsdb/tempo/traceql/metrics.go index 54e5e34c7e6..7ed97e963a5 100644 --- a/pkg/tsdb/tempo/traceql/metrics.go +++ b/pkg/tsdb/tempo/traceql/metrics.go @@ -3,6 +3,7 @@ package traceql import ( "fmt" "regexp" + "sort" "strconv" "strings" "time" @@ -13,7 +14,7 @@ import ( v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" ) -func TransformMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryRangeResponse) []*data.Frame { +func TransformMetricsResponse(query string, resp tempopb.QueryRangeResponse) []*data.Frame { // prealloc frames frames := make([]*data.Frame, len(resp.Series)) var exemplarFrames []*data.Frame @@ -37,10 +38,11 @@ func TransformMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryRan }, Meta: &data.FrameMeta{ PreferredVisualization: data.VisTypeGraph, + Type: data.FrameTypeTimeSeriesMulti, }, } - isHistogram := isHistogramQuery(*query.Query) + isHistogram := isHistogramQuery(query) if isHistogram { frame.Meta.PreferredVisualizationPluginID = "heatmap" } @@ -128,10 +130,18 @@ func transformLabelsAndGetName(seriesLabels []v1.KeyValue) (string, data.Labels) if len(seriesLabels) == 1 { _, name = metricsValueToString(seriesLabels[0].GetValue()) } else { - var labelStrings []string - for key, val := range labels { - labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", key, val)) + keys := make([]string, 0, len(labels)) + + for k := range labels { + keys = append(keys, k) } + sort.Strings(keys) + + var labelStrings []string + for _, key := range keys { + labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", key, labels[key])) + } + name = fmt.Sprintf("{%s}", strings.Join(labelStrings, ", ")) } } diff --git a/pkg/tsdb/tempo/traceql/metrics_test.go b/pkg/tsdb/tempo/traceql/metrics_test.go index 8254af5f8b5..d0dc1196008 100644 --- a/pkg/tsdb/tempo/traceql/metrics_test.go +++ b/pkg/tsdb/tempo/traceql/metrics_test.go @@ -13,9 +13,7 @@ import ( func TestTransformMetricsResponse_EmptyResponse(t *testing.T) { resp := tempopb.QueryRangeResponse{} - queryStr := "" - query := &dataquery.TempoQuery{Query: &queryStr} - frames := TransformMetricsResponse(query, resp) + frames := TransformMetricsResponse("", resp) assert.Empty(t, frames) } @@ -32,9 +30,7 @@ func TestTransformMetricsResponse_SingleSeriesSingleLabel(t *testing.T) { }, }, } - queryStr := "" - query := &dataquery.TempoQuery{Query: &queryStr} - frames := TransformMetricsResponse(query, resp) + frames := TransformMetricsResponse("", resp) assert.Len(t, frames, 1) assert.Equal(t, "value1", frames[0].RefID) assert.Equal(t, "value1", frames[0].Name) @@ -47,9 +43,6 @@ func TestTransformMetricsResponse_SingleSeriesSingleLabel(t *testing.T) { } func TestTransformMetricsResponse_SingleSeriesMultipleLabels(t *testing.T) { - // Skipping for now because this test is broken. - t.Skip() - resp := tempopb.QueryRangeResponse{ Series: []*tempopb.TimeSeries{ { @@ -65,9 +58,7 @@ func TestTransformMetricsResponse_SingleSeriesMultipleLabels(t *testing.T) { }, }, } - queryStr := "" - query := &dataquery.TempoQuery{Query: &queryStr} - frames := TransformMetricsResponse(query, resp) + frames := TransformMetricsResponse("", resp) assert.Len(t, frames, 1) assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].RefID) assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].Name) @@ -100,9 +91,7 @@ func TestTransformMetricsResponse_MultipleSeries(t *testing.T) { }, }, } - queryStr := "" - query := &dataquery.TempoQuery{Query: &queryStr} - frames := TransformMetricsResponse(query, resp) + frames := TransformMetricsResponse("", resp) assert.Len(t, frames, 2) assert.Equal(t, "value1", frames[0].RefID) assert.Equal(t, "value1", frames[0].Name) diff --git a/pkg/tsdb/tempo/traceql_query.go b/pkg/tsdb/tempo/traceql_query.go index 5265097ad41..39476c35fd5 100644 --- a/pkg/tsdb/tempo/traceql_query.go +++ b/pkg/tsdb/tempo/traceql_query.go @@ -71,8 +71,10 @@ func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.Plugi resp, responseBody, err := s.performMetricsQuery(ctx, dsInfo, tempoQuery, backendQuery, span) defer func() { - if err := resp.Body.Close(); err != nil { - ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint()) + if resp != nil && resp.Body != nil { + if err := resp.Body.Close(); err != nil { + ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint()) + } } }() if err != nil { @@ -105,7 +107,7 @@ func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.Plugi return res, err } - frames := traceql.TransformMetricsResponse(tempoQuery, queryResponse) + frames := traceql.TransformMetricsResponse(*tempoQuery.Query, queryResponse) result.Frames = frames } diff --git a/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.test.tsx b/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.test.tsx index 73ac6675e9a..69ee73631f7 100644 --- a/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.test.tsx +++ b/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.test.tsx @@ -65,6 +65,7 @@ describe('TraceQLSearch', () => { }, } as TempoDatasource; datasource.isStreamingSearchEnabled = () => false; + datasource.isStreamingMetricsEnabled = () => false; const lp = new TempoLanguageProvider(datasource); lp.getIntrinsics = () => ['duration']; lp.generateQueryFromFilters = () => '{}'; @@ -221,6 +222,8 @@ describe('TraceQLSearch', () => { }, } as TempoDatasource; datasource.isStreamingSearchEnabled = () => false; + datasource.isStreamingMetricsEnabled = () => false; + const lp = new TempoLanguageProvider(datasource); lp.getIntrinsics = () => ['duration']; lp.generateQueryFromFilters = () => '{}'; diff --git a/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.tsx b/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.tsx index aeb0f6dc811..7ef9bd88303 100644 --- a/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.tsx +++ b/public/app/plugins/datasource/tempo/SearchTraceQLEditor/TraceQLSearch.tsx @@ -274,7 +274,8 @@ const TraceQLSearch = ({ datasource, query, onChange, onClearResults, app, addVa {error ? ( diff --git a/public/app/plugins/datasource/tempo/_importedDependencies/datasources/prometheus/QueryOptionGroup.tsx b/public/app/plugins/datasource/tempo/_importedDependencies/datasources/prometheus/QueryOptionGroup.tsx index f8b6a4273a3..4541016132b 100644 --- a/public/app/plugins/datasource/tempo/_importedDependencies/datasources/prometheus/QueryOptionGroup.tsx +++ b/public/app/plugins/datasource/tempo/_importedDependencies/datasources/prometheus/QueryOptionGroup.tsx @@ -13,9 +13,11 @@ export interface Props { collapsedInfo: string[]; queryStats?: QueryStats | null; children: React.ReactNode; + onToggle?: (isOpen: boolean) => void; + isOpen?: boolean; } -export function QueryOptionGroup({ title, children, collapsedInfo, queryStats }: Props) { +export function QueryOptionGroup({ title, children, collapsedInfo, queryStats, onToggle, isOpen: propsIsOpen }: Props) { const [isOpen, toggleOpen] = useToggle(false); const styles = useStyles2(getStyles); @@ -24,8 +26,8 @@ export function QueryOptionGroup({ title, children, collapsedInfo, queryStats }: {title} diff --git a/public/app/plugins/datasource/tempo/configuration/StreamingSection.tsx b/public/app/plugins/datasource/tempo/configuration/StreamingSection.tsx index e3d63e1ae26..d5a389da04a 100644 --- a/public/app/plugins/datasource/tempo/configuration/StreamingSection.tsx +++ b/public/app/plugins/datasource/tempo/configuration/StreamingSection.tsx @@ -15,6 +15,7 @@ import { FeatureName, featuresToTempoVersion } from '../datasource'; interface StreamingOptions extends DataSourceJsonData { streamingEnabled?: { search?: boolean; + metrics?: boolean; }; } interface Props extends DataSourcePluginOptionsEditorProps {} @@ -27,8 +28,7 @@ export const StreamingSection = ({ options, onOptionsChange }: Props) => { isCollapsible={false} description={ - {`Enable streaming for different Tempo features. - Currently supported only for search queries and from Tempo version ${featuresToTempoVersion[FeatureName.streaming]} onwards.`} + Enable streaming for different Tempo features. { { /> + + + ) => { + updateDatasourcePluginJsonDataOption({ onOptionsChange, options }, 'streamingEnabled', { + ...options.jsonData.streamingEnabled, + metrics: event.currentTarget.checked, + }); + }} + /> + + ); }; diff --git a/public/app/plugins/datasource/tempo/datasource.ts b/public/app/plugins/datasource/tempo/datasource.ts index 603e33f1a4c..a9a4cae07e3 100644 --- a/public/app/plugins/datasource/tempo/datasource.ts +++ b/public/app/plugins/datasource/tempo/datasource.ts @@ -58,7 +58,7 @@ import { transformFromOTLP as transformFromOTEL, transformTrace, } from './resultTransformer'; -import { doTempoChannelStream } from './streaming'; +import { doTempoMetricsStreaming, doTempoSearchStreaming } from './streaming'; import { TempoJsonData, TempoQuery } from './types'; import { getErrorMessage, migrateFromSearchToTraceQLSearch } from './utils'; import { TempoVariableSupport } from './variables'; @@ -67,7 +67,8 @@ export const DEFAULT_LIMIT = 20; export const DEFAULT_SPSS = 3; // spans per span set export enum FeatureName { - streaming = 'streaming', + searchStreaming = 'searchStreaming', + metricsStreaming = 'metricsStreaming', } /* Map, for each feature (e.g., streaming), the minimum Tempo version required to have that @@ -75,7 +76,8 @@ export enum FeatureName { ** target version, the feature is disabled in Grafana (frontend). */ export const featuresToTempoVersion = { - [FeatureName.streaming]: '2.2.0', + [FeatureName.searchStreaming]: '2.2.0', + [FeatureName.metricsStreaming]: '2.7.0', }; // The version that we use as default in case we cannot retrieve it from the backend. @@ -115,6 +117,7 @@ export class TempoDatasource extends DataSourceWithBackend - doTempoChannelStream( + doTempoSearchStreaming( { ...target, query }, this, // the datasource options, @@ -699,6 +719,28 @@ export class TempoDatasource extends DataSourceWithBackend, + targets: TempoQuery[], + query: string + ): Observable { + if (query === '') { + return EMPTY; + } + + return merge( + ...targets.map((target) => + doTempoMetricsStreaming( + { ...target, query }, + this, // the datasource + options + ) + ) + ); + } + makeTraceIdRequest(options: DataQueryRequest, targets: TempoQuery[]): DataQueryRequest { const request = { ...options, diff --git a/public/app/plugins/datasource/tempo/streaming.ts b/public/app/plugins/datasource/tempo/streaming.ts index 333faca7388..a1dd6e7ea68 100644 --- a/public/app/plugins/datasource/tempo/streaming.ts +++ b/public/app/plugins/datasource/tempo/streaming.ts @@ -1,31 +1,36 @@ import { capitalize } from 'lodash'; -import { map, Observable, takeWhile } from 'rxjs'; +import { map, Observable, scan, takeWhile } from 'rxjs'; import { v4 as uuidv4 } from 'uuid'; import { DataFrame, + dataFrameFromJSON, DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings, + FieldCache, FieldType, LiveChannelScope, LoadingState, MutableDataFrame, + sortDataFrame, ThresholdsConfig, ThresholdsMode, } from '@grafana/data'; +import { cloneQueryResponse, combineResponses } from '@grafana/o11y-ds-frontend'; import { getGrafanaLiveSrv } from '@grafana/runtime'; import { SearchStreamingState } from './dataquery.gen'; import { DEFAULT_SPSS, TempoDatasource } from './datasource'; import { formatTraceQLResponse } from './resultTransformer'; import { SearchMetrics, TempoJsonData, TempoQuery } from './types'; +import { stepToNanos } from './utils'; function getLiveStreamKey(): string { return uuidv4(); } -export function doTempoChannelStream( +export function doTempoSearchStreaming( query: TempoQuery, ds: TempoDatasource, options: DataQueryRequest, @@ -67,11 +72,14 @@ export function doTempoChannelStream( if ('message' in evt && evt?.message) { const currentTime = performance.now(); const elapsedTime = currentTime - requestTime; - // Schema should be [traces, metrics, state, error] - const traces = evt.message.data.values[0][0]; - const metrics = evt.message.data.values[1][0]; - const frameState: SearchStreamingState = evt.message.data.values[2][0]; - const error = evt.message.data.values[3][0]; + + const messageFrame = dataFrameFromJSON(evt.message); + const fieldCache = new FieldCache(messageFrame); + + const traces = fieldCache.getFieldByName('result')?.values[0]; + const metrics = fieldCache.getFieldByName('metrics')?.values[0]; + const frameState = fieldCache.getFieldByName('state')?.values[0]; + const error = fieldCache.getFieldByName('error')?.values[0]; switch (frameState) { case SearchStreamingState.Done: @@ -100,6 +108,127 @@ export function doTempoChannelStream( ); } +export function doTempoMetricsStreaming( + query: TempoQuery, + ds: TempoDatasource, + options: DataQueryRequest +): Observable { + const range = options.range; + const key = getLiveStreamKey(); + + let state: LoadingState = LoadingState.NotStarted; + const step = stepToNanos(query.step); + + return getGrafanaLiveSrv() + .getStream({ + scope: LiveChannelScope.DataSource, + namespace: ds.uid, + path: `metrics/${key}`, + data: { + ...query, + step, + timeRange: { + from: range.from.toISOString(), + to: range.to.toISOString(), + }, + }, + }) + .pipe( + takeWhile((evt) => { + if ('message' in evt && evt?.message) { + const frameState: SearchStreamingState = evt.message.data.values[2][0]; + if (frameState === SearchStreamingState.Done || frameState === SearchStreamingState.Error) { + return false; + } + } + return true; + }, true), + map((evt) => { + let newResult: DataQueryResponse = { data: [], state: LoadingState.NotStarted }; + if ('message' in evt && evt?.message) { + const messageFrame = dataFrameFromJSON(evt.message); + const fieldCache = new FieldCache(messageFrame); + + const data = fieldCache.getFieldByName('result')?.values[0]; + const frameState = fieldCache.getFieldByName('state')?.values[0]; + const error = fieldCache.getFieldByName('error')?.values[0]; + + switch (frameState) { + case SearchStreamingState.Done: + state = LoadingState.Done; + break; + case SearchStreamingState.Streaming: + state = LoadingState.Streaming; + break; + case SearchStreamingState.Error: + throw new Error(error); + } + + newResult = { + data: data?.map(dataFrameFromJSON) ?? [], + state, + }; + } + + return newResult; + }), + // Merge results on acc + scan((acc, curr) => { + if (!curr) { + return acc; + } + if (!acc) { + return cloneQueryResponse(curr); + } + return mergeFrames(acc, curr); + }) + ); +} + +function mergeFrames(acc: DataQueryResponse, newResult: DataQueryResponse): DataQueryResponse { + const result = combineResponses(cloneQueryResponse(acc), newResult); + + // Remove duplicate time field values for all frames + result.data = result.data.map((frame: DataFrame) => { + let newFrame = frame; + const timeFieldIndex = frame.fields.findIndex((f) => f.type === FieldType.time); + if (timeFieldIndex >= 0) { + removeDuplicateTimeFieldValues(frame, timeFieldIndex); + newFrame = sortDataFrame(frame, timeFieldIndex); + } + return newFrame; + }); + + result.state = newResult.state; + return result; +} + +/** + * Remove duplicate time field values from the DataFrame. This is necessary because Tempo sends partial results to Grafana + * that we append to an existing DataFrame. This can result in duplicate values for the same timestamp so this function removes + * older values and keeps the latest value. + * @param accFrame + * @param timeFieldIndex + */ +function removeDuplicateTimeFieldValues(accFrame: DataFrame, timeFieldIndex: number) { + const duplicatesMap = accFrame.fields[timeFieldIndex].values.reduce((acc: Record, value, index) => { + if (acc[value]) { + acc[value].push(index); + } else { + acc[value] = [index]; + } + return acc; + }, {}); + + const indexesToRemove = Object.values(duplicatesMap) + .filter((indexes) => indexes.length > 1) + .map((indexes) => indexes.slice(1)) + .flat(); + accFrame.fields.forEach((field) => { + field.values = field.values.filter((_, index) => !indexesToRemove.includes(index)); + }); +} + function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState, elapsedTime: number) { const progressThresholds: ThresholdsConfig = { steps: [ diff --git a/public/app/plugins/datasource/tempo/traceql/QueryEditor.tsx b/public/app/plugins/datasource/tempo/traceql/QueryEditor.tsx index b64aa5865e9..c4c90c78041 100644 --- a/public/app/plugins/datasource/tempo/traceql/QueryEditor.tsx +++ b/public/app/plugins/datasource/tempo/traceql/QueryEditor.tsx @@ -71,7 +71,8 @@ export function QueryEditor(props: Props) { > diff --git a/public/app/plugins/datasource/tempo/traceql/TempoQueryBuilderOptions.tsx b/public/app/plugins/datasource/tempo/traceql/TempoQueryBuilderOptions.tsx index a6d0ae61fcd..7552565acc1 100644 --- a/public/app/plugins/datasource/tempo/traceql/TempoQueryBuilderOptions.tsx +++ b/public/app/plugins/datasource/tempo/traceql/TempoQueryBuilderOptions.tsx @@ -1,5 +1,6 @@ import { css } from '@emotion/css'; import * as React from 'react'; +import { useToggle } from 'react-use'; import { GrafanaTheme2 } from '@grafana/data'; import { EditorField, EditorRow } from '@grafana/plugin-ui'; @@ -13,7 +14,8 @@ import { TempoQuery } from '../types'; interface Props { onChange: (value: TempoQuery) => void; query: Partial & TempoQuery; - isStreaming: boolean; + searchStreaming: boolean; + metricsStreaming: boolean; } /** @@ -29,8 +31,9 @@ const parseIntWithFallback = (val: string, fallback: number) => { return isNaN(parsed) ? fallback : parsed; }; -export const TempoQueryBuilderOptions = React.memo(({ onChange, query, isStreaming }) => { +export const TempoQueryBuilderOptions = React.memo(({ onChange, query, searchStreaming, metricsStreaming }) => { const styles = useStyles2(getStyles); + const [isOpen, toggleOpen] = useToggle(false); if (!query.hasOwnProperty('limit')) { query.limit = DEFAULT_LIMIT; @@ -76,19 +79,26 @@ export const TempoQueryBuilderOptions = React.memo(({ onChange, query, is `Spans Limit: ${query.spss || DEFAULT_SPSS}`, `Table Format: ${query.tableType === SearchTableType.Traces ? 'Traces' : 'Spans'}`, '|', - `Streaming: ${isStreaming ? 'Enabled' : 'Disabled'}`, + `Streaming: ${searchStreaming ? 'Enabled' : 'Disabled'}`, ]; const collapsedMetricsOptions = [ `Step: ${query.step || 'auto'}`, `Type: ${query.metricsQueryType === MetricsQueryType.Range ? 'Range' : 'Instant'}`, + '|', + `Streaming: ${metricsStreaming ? 'Enabled' : 'Disabled'}`, // `Exemplars: ${query.exemplars !== undefined ? query.exemplars : 'auto'}`, ]; return ( - + (({ onChange, query, is /> } tooltipInteractive> - {isStreaming ? 'Enabled' : 'Disabled'} + {searchStreaming ? 'Enabled' : 'Disabled'} - + (({ onChange, query, is onChange={onMetricsQueryTypeChange} /> + + } tooltipInteractive> + {metricsStreaming ? 'Enabled' : 'Disabled'} + {/* { }; return migratedQuery; }; + +export const stepToNanos = (step?: string) => { + if (!step) { + return 0; + } + + const match = step.match(/(\d+)(.+)/); + + const rawLength = match?.[1]; + const unit = match?.[2]; + + if (rawLength) { + if (unit === 'ns') { + return parseInt(rawLength, 10); + } + if (unit === 'µs') { + return parseInt(rawLength, 10) * 1000; + } + if (unit === 'ms') { + return parseInt(rawLength, 10) * 1000000; + } + const duration = parseDuration(step); + return ( + (duration.seconds || 0) * 1000000000 + + (duration.minutes || 0) * 60000000000 + + (duration.hours || 0) * 3600000000000 + ); + } + + return 0; +};