mirror of
https://github.com/grafana/grafana.git
synced 2025-09-19 12:44:04 +08:00
Prometheus: Remove prometheusRunQueriesInParallel feature toggle (#103824)
* remove prometheusRunQueriesInParallel feature toggle * fix the unit test
This commit is contained in:
@ -32,7 +32,6 @@ Most [generally available](https://grafana.com/docs/release-life-cycle/#general-
|
||||
| `logsContextDatasourceUi` | Allow datasource to provide custom UI for context view | Yes |
|
||||
| `lokiQuerySplitting` | Split large interval queries into subqueries with smaller time intervals | Yes |
|
||||
| `influxdbBackendMigration` | Query InfluxDB InfluxQL without the proxy | Yes |
|
||||
| `prometheusRunQueriesInParallel` | Enables running Prometheus queries in parallel | Yes |
|
||||
| `dataplaneFrontendFallback` | Support dataplane contract field name change for transformations and field name matchers where the name is different | Yes |
|
||||
| `unifiedRequestLog` | Writes error logs to the request logger | Yes |
|
||||
| `pluginsDetailsRightPanel` | Enables right panel for the plugins details page | Yes |
|
||||
|
@ -132,11 +132,6 @@ export interface FeatureToggles {
|
||||
*/
|
||||
influxdbRunQueriesInParallel?: boolean;
|
||||
/**
|
||||
* Enables running Prometheus queries in parallel
|
||||
* @default true
|
||||
*/
|
||||
prometheusRunQueriesInParallel?: boolean;
|
||||
/**
|
||||
* Changes logs responses from Loki to be compliant with the dataplane specification.
|
||||
*/
|
||||
lokiLogsDataplane?: boolean;
|
||||
|
@ -124,8 +124,10 @@ func getPluginContext() backend.PluginContext {
|
||||
User: nil,
|
||||
AppInstanceSettings: nil,
|
||||
DataSourceInstanceSettings: getPromInstanceSettings(),
|
||||
GrafanaConfig: backend.NewGrafanaCfg(map[string]string{"concurrent_query_count": "10"}),
|
||||
}
|
||||
}
|
||||
|
||||
func getPromInstanceSettings() *backend.DataSourceInstanceSettings {
|
||||
return &backend.DataSourceInstanceSettings{
|
||||
ID: 0,
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -20,7 +21,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/promlib/models"
|
||||
)
|
||||
|
||||
var update = true
|
||||
var update = false
|
||||
|
||||
func TestRangeResponses(t *testing.T) {
|
||||
tt := []struct {
|
||||
@ -145,10 +146,42 @@ func runQuery(response []byte, q *backend.QueryDataRequest) (*backend.QueryDataR
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create initial response
|
||||
res := &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewReader(response)),
|
||||
Request: &http.Request{
|
||||
URL: &url.URL{
|
||||
Path: "api/v1/query_range",
|
||||
},
|
||||
},
|
||||
}
|
||||
tCtx.httpProvider.setResponse(res, res)
|
||||
return tCtx.queryData.Execute(context.Background(), q)
|
||||
|
||||
// Create a proper clone for the exemplar response with a different path
|
||||
exemplarRes := &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewReader(response)),
|
||||
Request: &http.Request{
|
||||
URL: &url.URL{
|
||||
Path: "api/v1/query_exemplars",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tCtx.httpProvider.setResponse(res, exemplarRes)
|
||||
|
||||
// Add GrafanaConfig to the context to prevent nil pointer dereference
|
||||
ctx := backend.WithGrafanaConfig(context.Background(), backend.NewGrafanaCfg(map[string]string{
|
||||
"concurrent_query_count": "10",
|
||||
}))
|
||||
|
||||
// Add a PluginContext with GrafanaConfig to the request
|
||||
q.PluginContext = backend.PluginContext{
|
||||
GrafanaConfig: backend.NewGrafanaCfg(map[string]string{
|
||||
"concurrent_query_count": "10",
|
||||
}),
|
||||
}
|
||||
|
||||
return tCtx.queryData.Execute(ctx, q)
|
||||
}
|
||||
|
@ -101,45 +101,32 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest)
|
||||
}
|
||||
|
||||
var (
|
||||
hasPromQLScopeFeatureFlag = s.featureToggles.IsEnabled("promQLScope")
|
||||
hasPrometheusRunQueriesInParallel = s.featureToggles.IsEnabled("prometheusRunQueriesInParallel")
|
||||
hasPromQLScopeFeatureFlag = s.featureToggles.IsEnabled("promQLScope")
|
||||
m sync.Mutex
|
||||
)
|
||||
|
||||
if hasPrometheusRunQueriesInParallel {
|
||||
var (
|
||||
m sync.Mutex
|
||||
)
|
||||
|
||||
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
|
||||
if err != nil {
|
||||
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), "prometheusRunQueriesInParallel")
|
||||
concurrentQueryCount = 10
|
||||
}
|
||||
|
||||
_ = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
|
||||
query := req.Queries[idx]
|
||||
r := s.handleQuery(ctx, query, fromAlert, hasPromQLScopeFeatureFlag, true)
|
||||
if r != nil {
|
||||
m.Lock()
|
||||
result.Responses[query.RefID] = *r
|
||||
m.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
for _, q := range req.Queries {
|
||||
r := s.handleQuery(ctx, q, fromAlert, hasPromQLScopeFeatureFlag, false)
|
||||
if r != nil {
|
||||
result.Responses[q.RefID] = *r
|
||||
}
|
||||
}
|
||||
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
|
||||
if err != nil {
|
||||
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), "prometheusRunQueriesInParallel")
|
||||
concurrentQueryCount = 10
|
||||
}
|
||||
|
||||
_ = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
|
||||
query := req.Queries[idx]
|
||||
r := s.handleQuery(ctx, query, fromAlert, hasPromQLScopeFeatureFlag)
|
||||
if r != nil {
|
||||
m.Lock()
|
||||
result.Responses[query.RefID] = *r
|
||||
m.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromAlert,
|
||||
hasPromQLScopeFeatureFlag, hasPrometheusRunQueriesInParallel bool) *backend.DataResponse {
|
||||
hasPromQLScopeFeatureFlag bool) *backend.DataResponse {
|
||||
traceCtx, span := s.tracer.Start(ctx, "datasource.prometheus")
|
||||
defer span.End()
|
||||
query, err := models.Parse(span, bq, s.TimeInterval, s.intervalCalculator, fromAlert, hasPromQLScopeFeatureFlag)
|
||||
@ -149,14 +136,14 @@ func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromA
|
||||
}
|
||||
}
|
||||
|
||||
r := s.fetch(traceCtx, s.client, query, hasPrometheusRunQueriesInParallel)
|
||||
r := s.fetch(traceCtx, s.client, query)
|
||||
if r == nil {
|
||||
s.log.FromContext(ctx).Debug("Received nil response from runQuery", "query", query.Expr)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *models.Query, hasPrometheusRunQueriesInParallel bool) *backend.DataResponse {
|
||||
func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *models.Query) *backend.DataResponse {
|
||||
logger := s.log.FromContext(traceCtx)
|
||||
logger.Debug("Sending query", "start", q.Start, "end", q.End, "step", q.Step, "query", q.Expr /*, "queryTimeout", s.QueryTimeout*/)
|
||||
|
||||
@ -171,61 +158,41 @@ func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *mo
|
||||
)
|
||||
|
||||
if q.InstantQuery {
|
||||
if hasPrometheusRunQueriesInParallel {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.instantQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
addDataResponse(&res, dr)
|
||||
m.Unlock()
|
||||
}()
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.instantQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
addDataResponse(&res, dr)
|
||||
}
|
||||
m.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
if q.RangeQuery {
|
||||
if hasPrometheusRunQueriesInParallel {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.rangeQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
addDataResponse(&res, dr)
|
||||
m.Unlock()
|
||||
}()
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.rangeQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
addDataResponse(&res, dr)
|
||||
}
|
||||
m.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
if q.ExemplarQuery {
|
||||
if hasPrometheusRunQueriesInParallel {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.exemplarQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
if res.Error != nil {
|
||||
// If exemplar query returns error, we want to only log it and
|
||||
// continue with other results processing
|
||||
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
|
||||
}
|
||||
dr.Frames = append(dr.Frames, res.Frames...)
|
||||
m.Unlock()
|
||||
}()
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res := s.exemplarQuery(traceCtx, client, q)
|
||||
m.Lock()
|
||||
if res.Error != nil {
|
||||
// If exemplar query returns error, we want to only log it and
|
||||
// continue with other results processing
|
||||
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
|
||||
}
|
||||
dr.Frames = append(dr.Frames, res.Frames...)
|
||||
}
|
||||
m.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
|
@ -96,15 +96,26 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
|
||||
|
||||
// Test fields
|
||||
require.Len(t, res, 2)
|
||||
require.Equal(t, res[0].Name, "exemplar")
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.Len(t, res[0].Fields, 6)
|
||||
// Find the exemplar frame
|
||||
var exemplarFrame *data.Frame
|
||||
var rangeFrame *data.Frame
|
||||
for _, frame := range res {
|
||||
if frame.Name == "exemplar" {
|
||||
exemplarFrame = frame
|
||||
} else {
|
||||
rangeFrame = frame
|
||||
}
|
||||
}
|
||||
require.NotNil(t, exemplarFrame)
|
||||
require.NotNil(t, rangeFrame)
|
||||
require.Equal(t, "Time", exemplarFrame.Fields[0].Name)
|
||||
require.Equal(t, "Value", exemplarFrame.Fields[1].Name)
|
||||
require.Len(t, exemplarFrame.Fields, 6)
|
||||
|
||||
// Test correct values (sampled to 2)
|
||||
require.Equal(t, res[0].Fields[1].Len(), 4)
|
||||
require.Equal(t, res[0].Fields[1].At(0), 0.009545445)
|
||||
require.Equal(t, res[0].Fields[1].At(3), 0.003535405)
|
||||
require.Equal(t, 4, exemplarFrame.Fields[1].Len())
|
||||
require.Equal(t, 0.009545445, exemplarFrame.Fields[1].At(0))
|
||||
require.Equal(t, 0.003535405, exemplarFrame.Fields[1].At(3))
|
||||
})
|
||||
|
||||
t.Run("matrix response should be parsed normally", func(t *testing.T) {
|
||||
@ -379,6 +390,11 @@ func executeWithHeaders(tctx *testContext, query backend.DataQuery, rqr any, eqr
|
||||
req := backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{query},
|
||||
Headers: headers,
|
||||
PluginContext: backend.PluginContext{
|
||||
GrafanaConfig: backend.NewGrafanaCfg(map[string]string{
|
||||
"concurrent_query_count": "10",
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
rangeRes, err := toAPIResponse(rqr)
|
||||
@ -399,7 +415,12 @@ func executeWithHeaders(tctx *testContext, query backend.DataQuery, rqr any, eqr
|
||||
}()
|
||||
tctx.httpProvider.setResponse(rangeRes, exemplarRes)
|
||||
|
||||
res, err := tctx.queryData.Execute(context.Background(), &req)
|
||||
// Create context with GrafanaConfig
|
||||
ctx := backend.WithGrafanaConfig(context.Background(), backend.NewGrafanaCfg(map[string]string{
|
||||
"concurrent_query_count": "10",
|
||||
}))
|
||||
|
||||
res, err := tctx.queryData.Execute(ctx, &req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -505,7 +526,37 @@ func (p *fakeHttpClientProvider) GetTransport(opts ...httpclient.Options) (http.
|
||||
|
||||
func (p *fakeHttpClientProvider) setResponse(rangeRes *http.Response, exemplarRes *http.Response) {
|
||||
p.rangeRes = rangeRes
|
||||
p.exemplarRes = exemplarRes
|
||||
|
||||
// Create a proper clone manually ensuring we have a fresh response
|
||||
if exemplarRes != nil {
|
||||
bodyBytes, _ := io.ReadAll(exemplarRes.Body)
|
||||
err := exemplarRes.Body.Close() // Close the original
|
||||
if err != nil {
|
||||
fmt.Println(fmt.Errorf("exemplarRes body close error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new request if the original has one
|
||||
var newRequest *http.Request
|
||||
if exemplarRes.Request != nil {
|
||||
newRequest = &http.Request{
|
||||
Method: exemplarRes.Request.Method,
|
||||
URL: exemplarRes.Request.URL,
|
||||
Header: exemplarRes.Request.Header.Clone(),
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new response with the same data but new body
|
||||
p.exemplarRes = &http.Response{
|
||||
StatusCode: exemplarRes.StatusCode,
|
||||
Body: io.NopCloser(bytes.NewReader(bodyBytes)),
|
||||
Request: newRequest,
|
||||
Header: exemplarRes.Header.Clone(),
|
||||
}
|
||||
|
||||
// Reset the original body with a new reader
|
||||
exemplarRes.Body = io.NopCloser(bytes.NewReader(bodyBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakeHttpClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
|
1044
pkg/promlib/testdata/exemplar.result.golden.jsonc
vendored
1044
pkg/promlib/testdata/exemplar.result.golden.jsonc
vendored
File diff suppressed because it is too large
Load Diff
@ -206,14 +206,6 @@ var (
|
||||
FrontendOnly: false,
|
||||
Owner: grafanaPartnerPluginsSquad,
|
||||
},
|
||||
{
|
||||
Name: "prometheusRunQueriesInParallel",
|
||||
Description: "Enables running Prometheus queries in parallel",
|
||||
Stage: FeatureStageGeneralAvailability,
|
||||
FrontendOnly: false,
|
||||
Owner: grafanaOSSBigTent,
|
||||
Expression: "true", // enabled by default
|
||||
},
|
||||
{
|
||||
Name: "lokiLogsDataplane",
|
||||
Description: "Changes logs responses from Loki to be compliant with the dataplane specification.",
|
||||
|
@ -25,7 +25,6 @@ individualCookiePreferences,experimental,@grafana/grafana-backend-group,false,fa
|
||||
influxdbBackendMigration,GA,@grafana/partner-datasources,false,false,true
|
||||
influxqlStreamingParser,experimental,@grafana/partner-datasources,false,false,false
|
||||
influxdbRunQueriesInParallel,privatePreview,@grafana/partner-datasources,false,false,false
|
||||
prometheusRunQueriesInParallel,GA,@grafana/oss-big-tent,false,false,false
|
||||
lokiLogsDataplane,experimental,@grafana/observability-logs,false,false,false
|
||||
dataplaneFrontendFallback,GA,@grafana/observability-metrics,false,false,true
|
||||
disableSSEDataplane,experimental,@grafana/observability-metrics,false,false,false
|
||||
|
|
@ -111,10 +111,6 @@ const (
|
||||
// Enables running InfluxDB Influxql queries in parallel
|
||||
FlagInfluxdbRunQueriesInParallel = "influxdbRunQueriesInParallel"
|
||||
|
||||
// FlagPrometheusRunQueriesInParallel
|
||||
// Enables running Prometheus queries in parallel
|
||||
FlagPrometheusRunQueriesInParallel = "prometheusRunQueriesInParallel"
|
||||
|
||||
// FlagLokiLogsDataplane
|
||||
// Changes logs responses from Loki to be compliant with the dataplane specification.
|
||||
FlagLokiLogsDataplane = "lokiLogsDataplane"
|
||||
|
@ -2583,7 +2583,8 @@
|
||||
"metadata": {
|
||||
"name": "prometheusRunQueriesInParallel",
|
||||
"resourceVersion": "1743693517832",
|
||||
"creationTimestamp": "2024-08-12T12:31:39Z"
|
||||
"creationTimestamp": "2024-08-12T12:31:39Z",
|
||||
"deletionTimestamp": "2025-04-10T18:16:18Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Enables running Prometheus queries in parallel",
|
||||
|
Reference in New Issue
Block a user