diff --git a/pkg/tsdb/prometheus/framing_test.go b/pkg/tsdb/prometheus/buffered/framing_test.go similarity index 79% rename from pkg/tsdb/prometheus/framing_test.go rename to pkg/tsdb/prometheus/buffered/framing_test.go index c2ddadd0dac..1bc75c102c7 100644 --- a/pkg/tsdb/prometheus/framing_test.go +++ b/pkg/tsdb/prometheus/buffered/framing_test.go @@ -1,4 +1,4 @@ -package prometheus +package buffered import ( "bytes" @@ -15,7 +15,9 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/tsdb/intervalv2" "github.com/prometheus/client_golang/api" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" ) @@ -33,9 +35,9 @@ func TestMatrixResponses(t *testing.T) { for _, test := range tt { t.Run(test.name, func(t *testing.T) { - queryFileName := filepath.Join("testdata", test.filepath+".query.json") - responseFileName := filepath.Join("testdata", test.filepath+".result.json") - goldenFileName := filepath.Join("testdata", test.filepath+".result.golden.txt") + queryFileName := filepath.Join("../testdata", test.filepath+".query.json") + responseFileName := filepath.Join("../testdata", test.filepath+".result.json") + goldenFileName := filepath.Join("../testdata", test.filepath+".result.golden.txt") query, err := loadStoredPrometheusQuery(queryFileName) require.NoError(t, err) @@ -131,6 +133,20 @@ func runQuery(response []byte, query PrometheusQuery) (*backend.QueryDataRespons return nil, err } - s := Service{tracer: tracer} + s := Buffered{ + intervalCalculator: intervalv2.NewCalculator(), + tracer: tracer, + TimeInterval: "15s", + log: &fakeLogger{}, + } return s.runQueries(context.Background(), api, []*PrometheusQuery{&query}) } + +type fakeLogger struct { + log.Logger +} + +func (fl *fakeLogger) Debug(testMessage string, ctx ...interface{}) {} +func (fl *fakeLogger) Info(testMessage string, ctx ...interface{}) {} +func (fl *fakeLogger) Warn(testMessage string, ctx ...interface{}) {} +func (fl *fakeLogger) Error(testMessage string, ctx ...interface{}) {} diff --git a/pkg/tsdb/prometheus/promclient/cache.go b/pkg/tsdb/prometheus/buffered/promclient/cache.go similarity index 100% rename from pkg/tsdb/prometheus/promclient/cache.go rename to pkg/tsdb/prometheus/buffered/promclient/cache.go diff --git a/pkg/tsdb/prometheus/promclient/cache_test.go b/pkg/tsdb/prometheus/buffered/promclient/cache_test.go similarity index 97% rename from pkg/tsdb/prometheus/promclient/cache_test.go rename to pkg/tsdb/prometheus/buffered/promclient/cache_test.go index 2c27f6c3fea..d1cbd7b8f5d 100644 --- a/pkg/tsdb/prometheus/promclient/cache_test.go +++ b/pkg/tsdb/prometheus/buffered/promclient/cache_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "github.com/grafana/grafana/pkg/tsdb/prometheus/promclient" + "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" diff --git a/pkg/tsdb/prometheus/promclient/provider.go b/pkg/tsdb/prometheus/buffered/promclient/provider.go similarity index 100% rename from pkg/tsdb/prometheus/promclient/provider.go rename to pkg/tsdb/prometheus/buffered/promclient/provider.go diff --git a/pkg/tsdb/prometheus/promclient/provider_azure.go b/pkg/tsdb/prometheus/buffered/promclient/provider_azure.go similarity index 100% rename from pkg/tsdb/prometheus/promclient/provider_azure.go rename to pkg/tsdb/prometheus/buffered/promclient/provider_azure.go diff --git a/pkg/tsdb/prometheus/promclient/provider_azure_test.go b/pkg/tsdb/prometheus/buffered/promclient/provider_azure_test.go similarity index 100% rename from pkg/tsdb/prometheus/promclient/provider_azure_test.go rename to pkg/tsdb/prometheus/buffered/promclient/provider_azure_test.go diff --git a/pkg/tsdb/prometheus/promclient/provider_test.go b/pkg/tsdb/prometheus/buffered/promclient/provider_test.go similarity index 98% rename from pkg/tsdb/prometheus/promclient/provider_test.go rename to pkg/tsdb/prometheus/buffered/promclient/provider_test.go index b62af351c39..cc340e3d330 100644 --- a/pkg/tsdb/prometheus/promclient/provider_test.go +++ b/pkg/tsdb/prometheus/buffered/promclient/provider_test.go @@ -5,7 +5,7 @@ import ( "net/http" "testing" - "github.com/grafana/grafana/pkg/tsdb/prometheus/promclient" + "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" diff --git a/pkg/tsdb/prometheus/prometeus_bench_test.go b/pkg/tsdb/prometheus/buffered/prometeus_bench_test.go similarity index 97% rename from pkg/tsdb/prometheus/prometeus_bench_test.go rename to pkg/tsdb/prometheus/buffered/prometeus_bench_test.go index b7365e30c44..122dc19bb0d 100644 --- a/pkg/tsdb/prometheus/prometeus_bench_test.go +++ b/pkg/tsdb/prometheus/buffered/prometeus_bench_test.go @@ -1,4 +1,4 @@ -package prometheus +package buffered import ( "context" @@ -24,7 +24,7 @@ func BenchmarkJson(b *testing.B) { tracer, err := tracing.InitializeTracerForTest() require.NoError(b, err) - s := Service{tracer: tracer} + s := Buffered{tracer: tracer, log: &fakeLogger{}} b.ResetTimer() for n := 0; n < b.N; n++ { diff --git a/pkg/tsdb/prometheus/time_series_query.go b/pkg/tsdb/prometheus/buffered/time_series_query.go similarity index 83% rename from pkg/tsdb/prometheus/time_series_query.go rename to pkg/tsdb/prometheus/buffered/time_series_query.go index d768738d8a5..e0a7fad20f1 100644 --- a/pkg/tsdb/prometheus/time_series_query.go +++ b/pkg/tsdb/prometheus/buffered/time_series_query.go @@ -1,10 +1,11 @@ -package prometheus +package buffered import ( "context" "encoding/json" "fmt" "math" + "regexp" "sort" "strconv" "strings" @@ -12,7 +13,14 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/httpclient" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/intervalv2" + "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient" + "github.com/grafana/grafana/pkg/util/maputil" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "go.opentelemetry.io/otel/attribute" @@ -41,23 +49,57 @@ const ( const legendFormatAuto = "__auto" -type TimeSeriesQueryType string - -const ( - RangeQueryType TimeSeriesQueryType = "range" - InstantQueryType TimeSeriesQueryType = "instant" - ExemplarQueryType TimeSeriesQueryType = "exemplar" +var ( + legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) + safeRes = 11000 ) -func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) { +type Buffered struct { + intervalCalculator intervalv2.Calculator + tracer tracing.Tracer + getClient clientGetter + log log.Logger + ID int64 + URL string + TimeInterval string +} + +func New(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer, settings backend.DataSourceInstanceSettings, plog log.Logger) (*Buffered, error) { + var jsonData map[string]interface{} + if err := json.Unmarshal(settings.JSONData, &jsonData); err != nil { + return nil, fmt.Errorf("error reading settings: %w", err) + } + + timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval") + if err != nil { + return nil, err + } + + p := promclient.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog) + pc, err := promclient.NewProviderCache(p) + if err != nil { + return nil, err + } + return &Buffered{ + intervalCalculator: intervalv2.NewCalculator(), + tracer: tracer, + log: plog, + getClient: pc.GetClient, + TimeInterval: timeInterval, + ID: settings.ID, + URL: settings.URL, + }, nil +} + +func (b *Buffered) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } for _, query := range queries { - plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr) + b.log.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr) - ctx, span := s.tracer.Start(ctx, "datasource.prometheus") + ctx, span := b.tracer.Start(ctx, "datasource.prometheus") span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr)) span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano())) span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano())) @@ -75,7 +117,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P if query.RangeQuery { rangeResponse, _, err := client.QueryRange(ctx, query.Expr, timeRange) if err != nil { - plog.Error("Range query failed", "query", query.Expr, "err", err) + b.log.Error("Range query failed", "query", query.Expr, "err", err) result.Responses[query.RefId] = backend.DataResponse{Error: err} continue } @@ -85,7 +127,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P if query.InstantQuery { instantResponse, _, err := client.Query(ctx, query.Expr, query.End) if err != nil { - plog.Error("Instant query failed", "query", query.Expr, "err", err) + b.log.Error("Instant query failed", "query", query.Expr, "err", err) result.Responses[query.RefId] = backend.DataResponse{Error: err} continue } @@ -97,7 +139,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P if query.ExemplarQuery { exemplarResponse, err := client.QueryExemplars(ctx, query.Expr, timeRange.Start, timeRange.End) if err != nil { - plog.Error("Exemplar query failed", "query", query.Expr, "err", err) + b.log.Error("Exemplar query failed", "query", query.Expr, "err", err) } else { response[ExemplarQueryType] = exemplarResponse } @@ -121,13 +163,13 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P return &result, nil } -func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo *DatasourceInfo) (*backend.QueryDataResponse, error) { - client, err := dsInfo.getClient(req.Headers) +func (b *Buffered) ExecuteTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + client, err := b.getClient(req.Headers) if err != nil { return nil, err } - queries, err := s.parseTimeSeriesQuery(req, dsInfo) + queries, err := b.parseTimeSeriesQuery(req) if err != nil { result := backend.QueryDataResponse{ Responses: backend.Responses{}, @@ -135,7 +177,7 @@ func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.Query return &result, err } - return s.runQueries(ctx, client, queries) + return b.runQueries(ctx, client, queries) } func formatLegend(metric model.Metric, query *PrometheusQuery) string { @@ -167,7 +209,7 @@ func formatLegend(metric model.Metric, query *PrometheusQuery) string { return legend } -func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, dsInfo *DatasourceInfo) ([]*PrometheusQuery, error) { +func (b *Buffered) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest) ([]*PrometheusQuery, error) { qs := []*PrometheusQuery{} for _, query := range queryContext.Queries { model := &QueryModel{} @@ -176,14 +218,14 @@ func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, d return nil, err } //Final interval value - interval, err := calculatePrometheusInterval(model, dsInfo, query, s.intervalCalculator) + interval, err := calculatePrometheusInterval(model, b.TimeInterval, query, b.intervalCalculator) if err != nil { return nil, err } // Interpolate variables in expr timeRange := query.TimeRange.To.Sub(query.TimeRange.From) - expr := interpolateVariables(model, interval, timeRange, s.intervalCalculator, dsInfo.TimeInterval) + expr := interpolateVariables(model, interval, timeRange, b.intervalCalculator, b.TimeInterval) rangeQuery := model.RangeQuery if !model.InstantQuery && !model.RangeQuery { // In older dashboards, we were not setting range query param and !range && !instant was run as range query @@ -232,8 +274,7 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P case []apiv1.ExemplarQueryResult: nextFrames = exemplarToDataFrames(v, query, nextFrames) default: - plog.Error("Query returned unexpected result type", "type", v, "query", query.Expr) - continue + return nil, fmt.Errorf("unexpected result type: %s query: %s", v, query.Expr) } frames = append(frames, nextFrames...) @@ -242,7 +283,7 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P return frames, nil } -func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) { +func calculatePrometheusInterval(model *QueryModel, timeInterval string, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) { queryInterval := model.Interval //If we are using variable for interval/step, we will replace it with calculated interval @@ -250,7 +291,7 @@ func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, quer queryInterval = "" } - minInterval, err := intervalv2.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, model.IntervalMS, 15*time.Second) + minInterval, err := intervalv2.GetIntervalFrom(timeInterval, queryInterval, model.IntervalMS, 15*time.Second) if err != nil { return time.Duration(0), err } @@ -264,7 +305,7 @@ func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, quer if model.Interval == varRateInterval || model.Interval == varRateIntervalAlt { // Rate interval is final and is not affected by resolution - return calculateRateInterval(adjustedInterval, dsInfo.TimeInterval, intervalCalculator), nil + return calculateRateInterval(adjustedInterval, timeInterval, intervalCalculator), nil } else { intervalFactor := model.IntervalFactor if intervalFactor == 0 { diff --git a/pkg/tsdb/prometheus/time_series_query_test.go b/pkg/tsdb/prometheus/buffered/time_series_query_test.go similarity index 90% rename from pkg/tsdb/prometheus/time_series_query_test.go rename to pkg/tsdb/prometheus/buffered/time_series_query_test.go index 6b1cdc6f7a0..f4bef8555ba 100644 --- a/pkg/tsdb/prometheus/time_series_query_test.go +++ b/pkg/tsdb/prometheus/buffered/time_series_query_test.go @@ -1,4 +1,4 @@ -package prometheus +package buffered import ( "math" @@ -79,7 +79,7 @@ func TestPrometheus_timeSeriesQuery_formatLeged(t *testing.T) { } func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { - service := Service{ + service := Buffered{ intervalCalculator: intervalv2.NewCalculator(), } @@ -108,8 +108,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { }, } - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, false, models[0].ExemplarQuery) }) @@ -126,8 +126,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, time.Second*30, models[0].Step) }) @@ -145,8 +145,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, time.Second*15, models[0].Step) }) @@ -164,8 +164,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, time.Minute*20, models[0].Step) }) @@ -183,8 +183,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, time.Minute*2, models[0].Step) }) @@ -202,10 +202,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{ - TimeInterval: "240s", - } - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "240s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, time.Minute*4, models[0].Step) }) @@ -223,8 +221,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr) }) @@ -242,8 +240,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr) }) @@ -261,8 +259,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [120000]})", models[0].Expr) }) @@ -280,8 +278,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr) }) @@ -299,8 +297,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr) }) @@ -318,8 +316,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr) }) @@ -337,8 +335,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [172800]})", models[0].Expr) }) @@ -356,8 +354,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr) }) @@ -375,8 +373,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [0]})", models[0].Expr) }) @@ -394,8 +392,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [1]})", models[0].Expr) }) @@ -413,8 +411,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [172800000]})", models[0].Expr) }) @@ -432,8 +430,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [20]})", models[0].Expr) }) @@ -452,8 +450,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [5m15s]})", models[0].Expr) }) @@ -472,8 +470,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, "rate(ALERTS{job=\"test\" [1m0s]})", models[0].Expr) require.Equal(t, 1*time.Minute, models[0].Step) @@ -493,8 +491,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "range": true }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, true, models[0].RangeQuery) }) @@ -514,8 +512,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "instant": true }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, true, models[0].RangeQuery) require.Equal(t, true, models[0].InstantQuery) @@ -534,8 +532,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) { "refId": "A" }`, timeRange) - dsInfo := &DatasourceInfo{} - models, err := service.parseTimeSeriesQuery(query, dsInfo) + service.TimeInterval = "15s" + models, err := service.parseTimeSeriesQuery(query) require.NoError(t, err) require.Equal(t, true, models[0].RangeQuery) }) diff --git a/pkg/tsdb/prometheus/types.go b/pkg/tsdb/prometheus/buffered/types.go similarity index 81% rename from pkg/tsdb/prometheus/types.go rename to pkg/tsdb/prometheus/buffered/types.go index 92784411b13..8dec9d19dca 100644 --- a/pkg/tsdb/prometheus/types.go +++ b/pkg/tsdb/prometheus/buffered/types.go @@ -1,4 +1,4 @@ -package prometheus +package buffered import ( "time" @@ -6,14 +6,6 @@ import ( apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" ) -type DatasourceInfo struct { - ID int64 - URL string - TimeInterval string - - getClient clientGetter -} - type clientGetter func(map[string]string) (apiv1.API, error) type PrometheusQuery struct { @@ -47,3 +39,11 @@ type QueryModel struct { IntervalFactor int64 `json:"intervalFactor"` UtcOffsetSec int64 `json:"utcOffsetSec"` } + +type TimeSeriesQueryType string + +const ( + RangeQueryType TimeSeriesQueryType = "range" + InstantQueryType TimeSeriesQueryType = "instant" + ExemplarQueryType TimeSeriesQueryType = "exemplar" +) diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index 71c1726e996..49b2b9a78eb 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "regexp" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -15,34 +14,28 @@ import ( "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/tsdb/intervalv2" - "github.com/grafana/grafana/pkg/tsdb/prometheus/promclient" - "github.com/grafana/grafana/pkg/util/maputil" + "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" ) -var ( - plog = log.New("tsdb.prometheus") - legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) - safeRes = 11000 -) +var plog = log.New("tsdb.prometheus") type Service struct { - intervalCalculator intervalv2.Calculator - im instancemgmt.InstanceManager - tracer tracing.Tracer + im instancemgmt.InstanceManager +} + +type instance struct { + Buffered *buffered.Buffered } func ProvideService(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) *Service { plog.Debug("initializing") return &Service{ - intervalCalculator: intervalv2.NewCalculator(), - im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, cfg, features)), - tracer: tracer, + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, cfg, features, tracer)), } } -func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles) datasource.InstanceFactoryFunc { +func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) datasource.InstanceFactoryFunc { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { var jsonData map[string]interface{} err := json.Unmarshal(settings.JSONData, &jsonData) @@ -50,25 +43,14 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cf return nil, fmt.Errorf("error reading settings: %w", err) } - p := promclient.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog) - pc, err := promclient.NewProviderCache(p) + buf, err := buffered.New(httpClientProvider, cfg, features, tracer, settings, plog) if err != nil { return nil, err } - timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval") - if err != nil { - return nil, err - } - - mdl := DatasourceInfo{ - ID: settings.ID, - URL: settings.URL, - TimeInterval: timeInterval, - getClient: pc.GetClient, - } - - return mdl, nil + return instance{ + Buffered: buf, + }, nil } } @@ -77,32 +59,21 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - q := req.Queries[0] - dsInfo, err := s.getDSInfo(req.PluginContext) + i, err := s.getInstance(req.PluginContext) if err != nil { return nil, err } - var result *backend.QueryDataResponse - switch q.QueryType { - case "timeSeriesQuery": - fallthrough - default: - result, err = s.executeTimeSeriesQuery(ctx, req, dsInfo) - } - - return result, err + return i.Buffered.ExecuteTimeSeriesQuery(ctx, req) } -func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*DatasourceInfo, error) { +func (s *Service) getInstance(pluginCtx backend.PluginContext) (*instance, error) { i, err := s.im.Get(pluginCtx) if err != nil { return nil, err } - - instance := i.(DatasourceInfo) - - return &instance, nil + in := i.(instance) + return &in, nil } // IsAPIError returns whether err is or wraps a Prometheus error.