mirror of
https://github.com/grafana/grafana.git
synced 2025-09-17 16:03:51 +08:00
[OpenTSDB]: Support queries with different time ranges (#110818)
This commit is contained in:
@ -83,52 +83,52 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||||
var tsdbQuery OpenTsdbQuery
|
|
||||||
|
|
||||||
logger := logger.FromContext(ctx)
|
logger := logger.FromContext(ctx)
|
||||||
|
|
||||||
q := req.Queries[0]
|
|
||||||
|
|
||||||
refID := q.RefID
|
|
||||||
|
|
||||||
tsdbQuery.Start = q.TimeRange.From.UnixNano() / int64(time.Millisecond)
|
|
||||||
tsdbQuery.End = q.TimeRange.To.UnixNano() / int64(time.Millisecond)
|
|
||||||
|
|
||||||
for _, query := range req.Queries {
|
|
||||||
metric := s.buildMetric(query)
|
|
||||||
tsdbQuery.Queries = append(tsdbQuery.Queries, metric)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Don't use global variable
|
|
||||||
if setting.Env == setting.Dev {
|
|
||||||
logger.Debug("OpenTsdb request", "params", tsdbQuery)
|
|
||||||
}
|
|
||||||
|
|
||||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
request, err := s.createRequest(ctx, logger, dsInfo, tsdbQuery)
|
result := backend.NewQueryDataResponse()
|
||||||
if err != nil {
|
|
||||||
return &backend.QueryDataResponse{}, err
|
for _, query := range req.Queries {
|
||||||
|
// Build OpenTsdbQuery with per-query time range
|
||||||
|
tsdbQuery := OpenTsdbQuery{
|
||||||
|
Start: query.TimeRange.From.Unix(),
|
||||||
|
End: query.TimeRange.To.Unix(),
|
||||||
|
Queries: []map[string]any{
|
||||||
|
s.buildMetric(query),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := dsInfo.HTTPClient.Do(request)
|
if setting.Env == setting.Dev {
|
||||||
|
logger.Debug("OpenTsdb request", "refId", query.RefID, "params", tsdbQuery)
|
||||||
|
}
|
||||||
|
|
||||||
|
httpReq, err := s.createRequest(ctx, logger, dsInfo, tsdbQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &backend.QueryDataResponse{}, err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
httpRes, err := dsInfo.HTTPClient.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
err := res.Body.Close()
|
if cerr := httpRes.Body.Close(); cerr != nil {
|
||||||
if err != nil {
|
logger.Warn("failed to close response body", "error", cerr)
|
||||||
logger.Warn("failed to close response body", "error", err)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
result, err := s.parseResponse(logger, res, refID, dsInfo.TSDBVersion)
|
queryRes, err := s.parseResponse(logger, httpRes, query.RefID, dsInfo.TSDBVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &backend.QueryDataResponse{}, err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attach parsed result for this query's RefID
|
||||||
|
result.Responses[query.RefID] = queryRes.Responses[query.RefID]
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
@ -344,7 +344,7 @@ func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext
|
|||||||
|
|
||||||
instance, ok := i.(*datasourceInfo)
|
instance, ok := i.(*datasourceInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to cast datsource info")
|
return nil, fmt.Errorf("failed to cast datasource info")
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance, nil
|
return instance, nil
|
||||||
|
@ -4,13 +4,16 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@ -403,4 +406,58 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
require.Equal(t, float64(45), metricRateOptions["counterMax"])
|
require.Equal(t, float64(45), metricRateOptions["counterMax"])
|
||||||
require.Equal(t, float64(60), metricRateOptions["resetValue"])
|
require.Equal(t, float64(60), metricRateOptions["resetValue"])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("createRequest uses per-query time range", func(t *testing.T) {
|
||||||
|
qA := backend.DataQuery{
|
||||||
|
RefID: "A",
|
||||||
|
JSON: []byte(`{"metric":"mA","aggregator":"avg"}`),
|
||||||
|
TimeRange: backend.TimeRange{From: time.Unix(1000, 0), To: time.Unix(2000, 0)},
|
||||||
|
}
|
||||||
|
qB := backend.DataQuery{
|
||||||
|
RefID: "B",
|
||||||
|
JSON: []byte(`{"metric":"mB","aggregator":"avg"}`),
|
||||||
|
TimeRange: backend.TimeRange{From: time.Unix(3000, 0), To: time.Unix(4000, 0)},
|
||||||
|
}
|
||||||
|
|
||||||
|
var bodies []string
|
||||||
|
hits := 0 // count how many times the test server was hit
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
b, _ := io.ReadAll(r.Body)
|
||||||
|
_ = r.Body.Close()
|
||||||
|
bodies = append(bodies, string(b))
|
||||||
|
hits++
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write([]byte(`[]`))
|
||||||
|
}))
|
||||||
|
t.Cleanup(srv.Close)
|
||||||
|
|
||||||
|
service := &Service{
|
||||||
|
im: datasource.NewInstanceManager(newInstanceSettings(httpclient.NewProvider())),
|
||||||
|
}
|
||||||
|
|
||||||
|
settings := &backend.DataSourceInstanceSettings{
|
||||||
|
UID: "opentsdb-test",
|
||||||
|
URL: srv.URL,
|
||||||
|
JSONData: []byte(`{"tsdbVersion":4,"httpMethod":"post"}`),
|
||||||
|
}
|
||||||
|
|
||||||
|
req := backend.QueryDataRequest{
|
||||||
|
PluginContext: backend.PluginContext{
|
||||||
|
DataSourceInstanceSettings: settings,
|
||||||
|
},
|
||||||
|
Queries: []backend.DataQuery{qA, qB},
|
||||||
|
}
|
||||||
|
_, err := service.QueryData(context.Background(), &req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 2, hits)
|
||||||
|
require.Len(t, bodies, 2)
|
||||||
|
|
||||||
|
require.Contains(t, bodies[0], `"start":1000`)
|
||||||
|
require.Contains(t, bodies[0], `"end":2000`)
|
||||||
|
require.Contains(t, bodies[1], `"start":3000`)
|
||||||
|
require.Contains(t, bodies[1], `"end":4000`)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user