datasources: allow opt-in to new features using a http header

This commit is contained in:
Gábor Farkas
2025-08-08 08:51:21 +02:00
committed by Sarah Zinger
parent 84de7ce8cd
commit 9cea5f5aca
3 changed files with 54 additions and 6 deletions

View File

@ -71,7 +71,16 @@ func (hs *HTTPServer) QueryMetricsV2(c *contextmodel.ReqContext) response.Respon
return response.Error(http.StatusBadRequest, "bad request data", err)
}
resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO)
handleTimeInQuery := c.Req.Header.Get("X-Query-V2") == "true"
var resp *backend.QueryDataResponse
var err error
if handleTimeInQuery {
resp, err = hs.queryDataService.QueryDataNew(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO)
} else {
resp, err = hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO)
}
if err != nil {
return hs.handleQueryMetricsError(err)
}

View File

@ -70,6 +70,9 @@ func ProvideService(
type Service interface {
Run(ctx context.Context) error
QueryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error)
// this is more "forward compatible", for example supports per-query time ranges
QueryDataNew(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error)
}
// Gives us compile time error if the service does not adhere to the contract of the interface
@ -86,7 +89,6 @@ type ServiceImpl struct {
concurrentQueryLimit int
mtDatasourceClientBuilder mtdsclient.MTDatasourceClientBuilder
headers map[string]string
supportLocalTimeRange bool
}
// Run ServiceImpl.
@ -96,7 +98,7 @@ func (s *ServiceImpl) Run(ctx context.Context) error {
}
// QueryData processes queries and returns query responses. It handles queries to single or mixed datasources, as well as expressions.
func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
func (s *ServiceImpl) queryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest, supportLocaltimeRange bool) (*backend.QueryDataResponse, error) {
fromAlert := false
for header, val := range s.headers {
if header == models.FromAlertHeaderName && val == "true" {
@ -104,7 +106,7 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, sk
}
}
// Parse the request into parsed queries grouped by datasource uid
parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO, s.supportLocalTimeRange)
parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO, supportLocaltimeRange)
if err != nil {
return nil, err
}
@ -121,6 +123,14 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, sk
return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries)
}
func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
return s.queryData(ctx, user, skipDSCache, reqDTO, false)
}
func (s *ServiceImpl) QueryDataNew(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
return s.queryData(ctx, user, skipDSCache, reqDTO, true)
}
// splitResponse contains the results of a concurrent data source query - the response and any headers
type splitResponse struct {
responses backend.Responses
@ -223,9 +233,8 @@ func QueryData(ctx context.Context, log log.Logger, dscache datasources.CacheSer
mtDatasourceClientBuilder: mtDatasourceClientBuilder,
headers: headers,
concurrentQueryLimit: 16, // TODO: make it configurable
supportLocalTimeRange: true,
}
return s.QueryData(ctx, nil, false, reqDTO)
return s.QueryDataNew(ctx, nil, false, reqDTO)
}
// handleExpressions handles queries when there is an expression.

View File

@ -49,6 +49,36 @@ func (_m *FakeQueryService) QueryData(ctx context.Context, user identity.Request
return r0, r1
}
// QueryDataNew provides a mock function with given fields: ctx, user, skipDSCache, reqDTO
func (_m *FakeQueryService) QueryDataNew(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
ret := _m.Called(ctx, user, skipDSCache, reqDTO)
if len(ret) == 0 {
panic("no return value specified for QueryDataNew")
}
var r0 *backend.QueryDataResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, identity.Requester, bool, dtos.MetricRequest) (*backend.QueryDataResponse, error)); ok {
return rf(ctx, user, skipDSCache, reqDTO)
}
if rf, ok := ret.Get(0).(func(context.Context, identity.Requester, bool, dtos.MetricRequest) *backend.QueryDataResponse); ok {
r0 = rf(ctx, user, skipDSCache, reqDTO)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*backend.QueryDataResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, identity.Requester, bool, dtos.MetricRequest) error); ok {
r1 = rf(ctx, user, skipDSCache, reqDTO)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Run provides a mock function with given fields: ctx
func (_m *FakeQueryService) Run(ctx context.Context) error {
ret := _m.Called(ctx)