mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 12:42:17 +08:00

* parse via sse I need to figure out how to handle the pipeline.execute with our own client. I think this is important for MT reasons, just like using our own cache (via legacy) is important. parsing is done though! * WIP nonsense * horrible code but i think it works * Add support for sql expressions config settings * Cleanup: - remove spew from nodes.go - uncomment out plugin context and use in single tenant flow - make code more readable and add comments * Cleanup: - create separate file for mt ds client builder - ensure error handling is the same for both expressions and regular queries - other cleanup * not working but good thoughts * WIP, vector not working for non sse * super hacky but i think vectors work now * delete delete delete * Comments for future ref * break out query handling and start test * add prom debugger * clean up: remove comments and commented out bits * fix query_test * add prom debugger * create table-driven tests with testsdata files * Fix test * Add test * go mod?? * idk * Remove comment * go enterprise issue maybe * Fix codeowners * Delete * Remove test data * Clean up * logger * Remove go changes hopefully * idk go man * sad * idk i ran go mod tidy and this is what it wants * Fix readme, with much help from adam * some linting and testing errors * lint * fix lint * fix lint register.go * another lint * address lint in test * fix dead code and linters for query_test * Go mod? * Struggling with go mod * Fix test * Fix another test * Revert headers change * Its difficult to test this in OSS as it depends on functionality defined in enterprise, let's bring these tests back in some form in enterprise * Fix codeowners --------- Co-authored-by: Adam Simpson <adam@adamsimpson.net>
324 lines
9.4 KiB
Go
324 lines
9.4 KiB
Go
package query
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"slices"
|
|
"strconv"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
|
|
"github.com/grafana/grafana/pkg/api/dtos"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/expr"
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
|
"github.com/grafana/grafana/pkg/services/mtdsclient"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
errorsK8s "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
|
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
ds_service "github.com/grafana/grafana/pkg/services/datasources/service"
|
|
service "github.com/grafana/grafana/pkg/services/query"
|
|
"github.com/grafana/grafana/pkg/web"
|
|
)
|
|
|
|
type queryREST struct {
|
|
logger log.Logger
|
|
builder *QueryAPIBuilder
|
|
}
|
|
|
|
type MyCacheService struct {
|
|
legacy ds_service.LegacyDataSourceLookup
|
|
}
|
|
|
|
func (mcs *MyCacheService) GetDatasource(ctx context.Context, datasourceID int64, _ identity.Requester, _ bool) (*datasources.DataSource, error) {
|
|
ref, err := mcs.legacy.GetDataSourceFromDeprecatedFields(ctx, "", datasourceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &datasources.DataSource{
|
|
UID: ref.UID,
|
|
Type: ref.Type,
|
|
}, nil
|
|
}
|
|
|
|
func (mcs *MyCacheService) GetDatasourceByUID(ctx context.Context, datasourceUID string, _ identity.Requester, _ bool) (*datasources.DataSource, error) {
|
|
ref, err := mcs.legacy.GetDataSourceFromDeprecatedFields(ctx, datasourceUID, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &datasources.DataSource{
|
|
UID: ref.UID,
|
|
Type: ref.Type,
|
|
}, nil
|
|
}
|
|
|
|
var (
|
|
_ rest.Storage = (*queryREST)(nil)
|
|
_ rest.SingularNameProvider = (*queryREST)(nil)
|
|
_ rest.Connecter = (*queryREST)(nil)
|
|
_ rest.Scoper = (*queryREST)(nil)
|
|
_ rest.StorageMetadata = (*queryREST)(nil)
|
|
)
|
|
|
|
func newQueryREST(builder *QueryAPIBuilder) *queryREST {
|
|
return &queryREST{
|
|
logger: log.New("query"),
|
|
builder: builder,
|
|
}
|
|
}
|
|
|
|
func (r *queryREST) New() runtime.Object {
|
|
// This is added as the "ResponseType" regardless what ProducesObject() says :)
|
|
return &query.QueryDataResponse{}
|
|
}
|
|
|
|
func (r *queryREST) Destroy() {}
|
|
|
|
func (r *queryREST) NamespaceScoped() bool {
|
|
return true
|
|
}
|
|
|
|
func (r *queryREST) GetSingularName() string {
|
|
return "QueryResults" // Used for the
|
|
}
|
|
|
|
func (r *queryREST) ProducesMIMETypes(verb string) []string {
|
|
return []string{"application/json"} // and parquet!
|
|
}
|
|
|
|
func (r *queryREST) ProducesObject(verb string) interface{} {
|
|
return &query.QueryDataResponse{}
|
|
}
|
|
|
|
func (r *queryREST) ConnectMethods() []string {
|
|
return []string{"POST"}
|
|
}
|
|
|
|
func (r *queryREST) NewConnectOptions() (runtime.Object, bool, string) {
|
|
return nil, false, "" // true means you can use the trailing path as a variable
|
|
}
|
|
|
|
func (r *queryREST) Connect(connectCtx context.Context, name string, _ runtime.Object, incomingResponder rest.Responder) (http.Handler, error) {
|
|
// See: /pkg/services/apiserver/builder/helper.go#L34
|
|
// The name is set with a rewriter hack
|
|
if name != "name" {
|
|
r.logger.Debug("Connect name is not name")
|
|
return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name)
|
|
}
|
|
b := r.builder
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) {
|
|
ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.Query")
|
|
defer span.End()
|
|
ctx = request.WithNamespace(ctx, request.NamespaceValue(connectCtx))
|
|
traceId := span.SpanContext().TraceID()
|
|
connectLogger := b.log.New("traceId", traceId.String())
|
|
responder := newResponderWrapper(incomingResponder,
|
|
func(statusCode *int, obj runtime.Object) {
|
|
if *statusCode/100 == 4 {
|
|
span.SetStatus(codes.Error, strconv.Itoa(*statusCode))
|
|
}
|
|
|
|
if *statusCode >= 500 {
|
|
o, ok := obj.(*query.QueryDataResponse)
|
|
if ok && o.Responses != nil {
|
|
for refId, response := range o.Responses {
|
|
if response.ErrorSource == backend.ErrorSourceDownstream {
|
|
*statusCode = http.StatusBadRequest //force this to be a 400 since it's downstream
|
|
span.SetStatus(codes.Error, strconv.Itoa(*statusCode))
|
|
span.SetAttributes(attribute.String("error.source", "downstream"))
|
|
break
|
|
} else if response.Error != nil {
|
|
connectLogger.Debug("500 error without downstream error source", "error", response.Error, "errorSource", response.ErrorSource, "refId", refId)
|
|
span.SetStatus(codes.Error, "500 error without downstream error source")
|
|
} else {
|
|
span.SetStatus(codes.Error, "500 error without downstream error source and no Error message")
|
|
span.SetAttributes(attribute.String("error.ref_id", refId))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
|
|
func(err error) {
|
|
connectLogger.Error("error caught in handler", "err", err)
|
|
span.SetStatus(codes.Error, "query error")
|
|
|
|
if err == nil {
|
|
return
|
|
}
|
|
|
|
span.RecordError(err)
|
|
})
|
|
|
|
raw := &query.QueryDataRequest{}
|
|
err := web.Bind(httpreq, raw)
|
|
if err != nil {
|
|
b.log.Error("Hit unexpected error when reading query", "err", err)
|
|
err = errorsK8s.NewBadRequest("error reading query")
|
|
// TODO: can we wrap the error so details are not lost?!
|
|
// errutil.BadRequest(
|
|
// "query.bind",
|
|
// errutil.WithPublicMessage("Error reading query")).
|
|
// Errorf("error reading: %w", err)
|
|
responder.Error(err)
|
|
return
|
|
}
|
|
|
|
qdr, err := handleQuery(ctx, *raw, *b, httpreq, *responder)
|
|
|
|
if err != nil {
|
|
b.log.Error("execute error", "http code", query.GetResponseCode(qdr), "err", err)
|
|
logEmptyRefids(raw.Queries, b.log)
|
|
if qdr != nil { // if we have a response, we assume the err is set in the response
|
|
responder.Object(query.GetResponseCode(qdr), &query.QueryDataResponse{
|
|
QueryDataResponse: *qdr,
|
|
})
|
|
return
|
|
} else {
|
|
// return the error to the client, will send all non k8s errors as a k8 unexpected error
|
|
b.log.Error("hit unexpected error while executing query, this will show as an unhandled k8s status error", "err", err)
|
|
responder.Error(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
responder.Object(query.GetResponseCode(qdr), &query.QueryDataResponse{
|
|
QueryDataResponse: *qdr, // wrap the backend response as a QueryDataResponse
|
|
})
|
|
}), nil
|
|
}
|
|
|
|
func handleQuery(ctx context.Context, raw query.QueryDataRequest, b QueryAPIBuilder, httpreq *http.Request, responder responderWrapper) (*backend.QueryDataResponse, error) {
|
|
var jsonQueries = make([]*simplejson.Json, 0, len(raw.Queries))
|
|
for _, query := range raw.Queries {
|
|
jsonBytes, err := json.Marshal(query)
|
|
if err != nil {
|
|
b.log.Error("error marshalling", err)
|
|
}
|
|
|
|
sjQuery, _ := simplejson.NewJson(jsonBytes)
|
|
if err != nil {
|
|
b.log.Error("error unmarshalling", err)
|
|
}
|
|
|
|
jsonQueries = append(jsonQueries, sjQuery)
|
|
}
|
|
mReq := dtos.MetricRequest{
|
|
From: raw.From,
|
|
To: raw.To,
|
|
Queries: jsonQueries,
|
|
}
|
|
|
|
cache := &MyCacheService{
|
|
legacy: b.legacyDatasourceLookup,
|
|
}
|
|
|
|
headers := ExtractKnownHeaders(httpreq.Header)
|
|
|
|
instanceConfig, err := b.clientSupplier.GetInstanceConfigurationSettings(ctx)
|
|
if err != nil {
|
|
b.log.Error("failed to get instance configuration settings", "err", err)
|
|
responder.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
mtDsClientBuilder := mtdsclient.NewMtDatasourceClientBuilderWithClientSupplier(
|
|
b.clientSupplier,
|
|
ctx,
|
|
headers,
|
|
instanceConfig,
|
|
b.log,
|
|
)
|
|
|
|
exprService := expr.ProvideService(
|
|
&setting.Cfg{
|
|
ExpressionsEnabled: instanceConfig.ExpressionsEnabled,
|
|
SQLExpressionCellLimit: instanceConfig.SQLExpressionCellLimit,
|
|
SQLExpressionOutputCellLimit: instanceConfig.SQLExpressionOutputCellLimit,
|
|
SQLExpressionTimeout: instanceConfig.SQLExpressionTimeout,
|
|
},
|
|
nil,
|
|
nil,
|
|
instanceConfig.FeatureToggles,
|
|
nil,
|
|
b.tracer,
|
|
mtDsClientBuilder,
|
|
)
|
|
|
|
qdr, err := service.QueryData(ctx, b.log, cache, exprService, mReq, mtDsClientBuilder, headers)
|
|
|
|
if err != nil {
|
|
return qdr, err
|
|
}
|
|
|
|
return qdr, nil
|
|
}
|
|
|
|
type responderWrapper struct {
|
|
wrapped rest.Responder
|
|
onObjectFn func(statusCode *int, obj runtime.Object)
|
|
onErrorFn func(err error)
|
|
}
|
|
|
|
func newResponderWrapper(responder rest.Responder, onObjectFn func(statusCode *int, obj runtime.Object), onErrorFn func(err error)) *responderWrapper {
|
|
return &responderWrapper{
|
|
wrapped: responder,
|
|
onObjectFn: onObjectFn,
|
|
onErrorFn: onErrorFn,
|
|
}
|
|
}
|
|
|
|
func (r responderWrapper) Object(statusCode int, obj runtime.Object) {
|
|
if r.onObjectFn != nil {
|
|
r.onObjectFn(&statusCode, obj)
|
|
}
|
|
|
|
r.wrapped.Object(statusCode, obj)
|
|
}
|
|
|
|
func (r responderWrapper) Error(err error) {
|
|
if r.onErrorFn != nil {
|
|
r.onErrorFn(err)
|
|
}
|
|
|
|
r.wrapped.Error(err)
|
|
}
|
|
|
|
func logEmptyRefids(queries []v0alpha1.DataQuery, logger log.Logger) {
|
|
emptyCount := 0
|
|
|
|
for _, q := range queries {
|
|
if q.RefID == "" {
|
|
emptyCount += 1
|
|
}
|
|
}
|
|
|
|
if emptyCount > 0 {
|
|
logger.Info("empty refid found", "empty_count", emptyCount, "query_count", len(queries))
|
|
}
|
|
}
|
|
|
|
func mergeHeaders(main http.Header, extra http.Header, l log.Logger) {
|
|
for headerName, extraValues := range extra {
|
|
mainValues := main.Values(headerName)
|
|
for _, extraV := range extraValues {
|
|
if !slices.Contains(mainValues, extraV) {
|
|
main.Add(headerName, extraV)
|
|
} else {
|
|
l.Warn("skipped duplicate response header", "header", headerName, "value", extraV)
|
|
}
|
|
}
|
|
}
|
|
}
|