From b36bd7dd6a5af7f79918fe5e12da69e1691a418c Mon Sep 17 00:00:00 2001 From: Alyssa Joyner <58453566+alyssajoyner@users.noreply.github.com> Date: Wed, 10 Sep 2025 08:04:04 -0600 Subject: [PATCH] [OpenTSDB]: Support queries with different time ranges (#110818) --- pkg/tsdb/opentsdb/opentsdb.go | 74 +++++++++++++++--------------- pkg/tsdb/opentsdb/opentsdb_test.go | 57 +++++++++++++++++++++++ 2 files changed, 94 insertions(+), 37 deletions(-) diff --git a/pkg/tsdb/opentsdb/opentsdb.go b/pkg/tsdb/opentsdb/opentsdb.go index 798b27e57ab..92fe36c3d26 100644 --- a/pkg/tsdb/opentsdb/opentsdb.go +++ b/pkg/tsdb/opentsdb/opentsdb.go @@ -83,52 +83,52 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst } func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - var tsdbQuery OpenTsdbQuery - logger := logger.FromContext(ctx) - q := req.Queries[0] - - refID := q.RefID - - tsdbQuery.Start = q.TimeRange.From.UnixNano() / int64(time.Millisecond) - tsdbQuery.End = q.TimeRange.To.UnixNano() / int64(time.Millisecond) - - for _, query := range req.Queries { - metric := s.buildMetric(query) - tsdbQuery.Queries = append(tsdbQuery.Queries, metric) - } - - // TODO: Don't use global variable - if setting.Env == setting.Dev { - logger.Debug("OpenTsdb request", "params", tsdbQuery) - } - dsInfo, err := s.getDSInfo(ctx, req.PluginContext) if err != nil { return nil, err } - request, err := s.createRequest(ctx, logger, dsInfo, tsdbQuery) - if err != nil { - return &backend.QueryDataResponse{}, err - } + result := backend.NewQueryDataResponse() - res, err := dsInfo.HTTPClient.Do(request) - if err != nil { - return &backend.QueryDataResponse{}, err - } - - defer func() { - err := res.Body.Close() - if err != nil { - logger.Warn("failed to close response body", "error", err) + for _, query := range req.Queries { + // Build OpenTsdbQuery with per-query time range + tsdbQuery := OpenTsdbQuery{ + Start: query.TimeRange.From.Unix(), + End: query.TimeRange.To.Unix(), + Queries: []map[string]any{ + s.buildMetric(query), + }, } - }() - result, err := s.parseResponse(logger, res, refID, dsInfo.TSDBVersion) - if err != nil { - return &backend.QueryDataResponse{}, err + if setting.Env == setting.Dev { + logger.Debug("OpenTsdb request", "refId", query.RefID, "params", tsdbQuery) + } + + httpReq, err := s.createRequest(ctx, logger, dsInfo, tsdbQuery) + if err != nil { + return nil, err + } + + httpRes, err := dsInfo.HTTPClient.Do(httpReq) + if err != nil { + return nil, err + } + + defer func() { + if cerr := httpRes.Body.Close(); cerr != nil { + logger.Warn("failed to close response body", "error", cerr) + } + }() + + queryRes, err := s.parseResponse(logger, httpRes, query.RefID, dsInfo.TSDBVersion) + if err != nil { + return nil, err + } + + // Attach parsed result for this query's RefID + result.Responses[query.RefID] = queryRes.Responses[query.RefID] } return result, nil @@ -344,7 +344,7 @@ func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext instance, ok := i.(*datasourceInfo) if !ok { - return nil, fmt.Errorf("failed to cast datsource info") + return nil, fmt.Errorf("failed to cast datasource info") } return instance, nil diff --git a/pkg/tsdb/opentsdb/opentsdb_test.go b/pkg/tsdb/opentsdb/opentsdb_test.go index 44c21f6df15..11a13d7ddd5 100644 --- a/pkg/tsdb/opentsdb/opentsdb_test.go +++ b/pkg/tsdb/opentsdb/opentsdb_test.go @@ -4,13 +4,16 @@ import ( "context" "io" "net/http" + "net/http/httptest" "strings" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -403,4 +406,58 @@ func TestOpenTsdbExecutor(t *testing.T) { require.Equal(t, float64(45), metricRateOptions["counterMax"]) require.Equal(t, float64(60), metricRateOptions["resetValue"]) }) + + t.Run("createRequest uses per-query time range", func(t *testing.T) { + qA := backend.DataQuery{ + RefID: "A", + JSON: []byte(`{"metric":"mA","aggregator":"avg"}`), + TimeRange: backend.TimeRange{From: time.Unix(1000, 0), To: time.Unix(2000, 0)}, + } + qB := backend.DataQuery{ + RefID: "B", + JSON: []byte(`{"metric":"mB","aggregator":"avg"}`), + TimeRange: backend.TimeRange{From: time.Unix(3000, 0), To: time.Unix(4000, 0)}, + } + + var bodies []string + hits := 0 // count how many times the test server was hit + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + _ = r.Body.Close() + bodies = append(bodies, string(b)) + hits++ + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`[]`)) + })) + t.Cleanup(srv.Close) + + service := &Service{ + im: datasource.NewInstanceManager(newInstanceSettings(httpclient.NewProvider())), + } + + settings := &backend.DataSourceInstanceSettings{ + UID: "opentsdb-test", + URL: srv.URL, + JSONData: []byte(`{"tsdbVersion":4,"httpMethod":"post"}`), + } + + req := backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: settings, + }, + Queries: []backend.DataQuery{qA, qB}, + } + _, err := service.QueryData(context.Background(), &req) + require.NoError(t, err) + + require.Equal(t, 2, hits) + require.Len(t, bodies, 2) + + require.Contains(t, bodies[0], `"start":1000`) + require.Contains(t, bodies[0], `"end":2000`) + require.Contains(t, bodies[1], `"start":3000`) + require.Contains(t, bodies[1], `"end":4000`) + }) }