From f880abc292dceaabca8ba34023e7b52f4568eb7a Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 10 May 2024 21:01:17 +0300 Subject: [PATCH] QueryService: Move raw http.Handler to rest.Connector (#87595) * query connector * improve error wrappers * return 400 for missing datasource (not 404) * fix errors * message --- pkg/apiserver/builder/helper.go | 8 +- pkg/registry/apis/query/query.go | 143 ++++++++++++++----- pkg/registry/apis/query/register.go | 213 +++++++++++++--------------- pkg/tests/apis/query/query_test.go | 21 ++- 4 files changed, 225 insertions(+), 160 deletions(-) diff --git a/pkg/apiserver/builder/helper.go b/pkg/apiserver/builder/helper.go index aa433e2af56..89f95f62b77 100644 --- a/pkg/apiserver/builder/helper.go +++ b/pkg/apiserver/builder/helper.go @@ -26,14 +26,20 @@ import ( "github.com/grafana/grafana/pkg/apiserver/endpoints/filters" ) +// TODO: this is a temporary hack to make rest.Connecter work with resource level routes var pathRewriters = []filters.PathRewriter{ { - // TODO: this is a temporary hack to make rest.Connecter work with resource level routes Pattern: regexp.MustCompile(`(/apis/scope.grafana.app/v0alpha1/namespaces/.*/find$)`), ReplaceFunc: func(matches []string) string { return matches[1] + "/name" // connector requires a name }, }, + { + Pattern: regexp.MustCompile(`(/apis/query.grafana.app/v0alpha1/namespaces/.*/query$)`), + ReplaceFunc: func(matches []string) string { + return matches[1] + "/name" // connector requires a name + }, + }, } func SetupConfig( diff --git a/pkg/registry/apis/query/query.go b/pkg/registry/apis/query/query.go index ff5829e00ae..ecb42305caf 100644 --- a/pkg/registry/apis/query/query.go +++ b/pkg/registry/apis/query/query.go @@ -2,7 +2,6 @@ package query import ( "context" - "encoding/json" "errors" "fmt" "net/http" @@ -12,57 +11,129 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + errorsK8s "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/registry/rest" query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" "github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/util/errutil" - "github.com/grafana/grafana/pkg/util/errutil/errhttp" "github.com/grafana/grafana/pkg/web" ) -// The query method (not really a create) -func (b *QueryAPIBuilder) doQuery(w http.ResponseWriter, r *http.Request) { - ctx, span := b.tracer.Start(r.Context(), "QueryService.Query") - defer span.End() +type queryREST struct { + builder *QueryAPIBuilder +} - raw := &query.QueryDataRequest{} - err := web.Bind(r, raw) - if err != nil { - errhttp.Write(ctx, errutil.BadRequest( - "query.bind", - errutil.WithPublicMessage("Error reading query")). - Errorf("error reading: %w", err), w) - return +var ( + _ rest.Storage = (*queryREST)(nil) + _ rest.SingularNameProvider = (*queryREST)(nil) + _ rest.Connecter = (*queryREST)(nil) + _ rest.Scoper = (*queryREST)(nil) + _ rest.StorageMetadata = (*queryREST)(nil) +) + +func (r *queryREST) New() runtime.Object { + // This is added as the "ResponseType" regarless what ProducesObject() says :) + return &query.QueryDataResponse{} +} + +func (r *queryREST) Destroy() {} + +func (r *queryREST) NamespaceScoped() bool { + return true +} + +func (r *queryREST) GetSingularName() string { + return "QueryResults" // Used for the +} + +func (r *queryREST) ProducesMIMETypes(verb string) []string { + return []string{"application/json"} // and parquet! +} + +func (r *queryREST) ProducesObject(verb string) interface{} { + return &query.QueryDataResponse{} +} + +func (r *queryREST) ConnectMethods() []string { + return []string{"POST"} +} + +func (r *queryREST) NewConnectOptions() (runtime.Object, bool, string) { + return nil, false, "" // true means you can use the trailing path as a variable +} + +func (r *queryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { + // See: /pkg/apiserver/builder/helper.go#L34 + // The name is set with a rewriter hack + if name != "name" { + return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name) } + b := r.builder - // Parses the request and splits it into multiple sub queries (if necessary) - req, err := b.parser.parseRequest(ctx, raw) - if err != nil { - if errors.Is(err, datasources.ErrDataSourceNotFound) { - errhttp.Write(ctx, errutil.BadRequest( - "query.datasource.notfound", - errutil.WithPublicMessage(err.Error())), w) + return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) { + ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.Query") + defer span.End() + + raw := &query.QueryDataRequest{} + err := web.Bind(httpreq, raw) + if err != nil { + err = errorsK8s.NewBadRequest("error reading query") + // TODO: can we wrap the error so details are not lost?! + // errutil.BadRequest( + // "query.bind", + // errutil.WithPublicMessage("Error reading query")). + // Errorf("error reading: %w", err) + responder.Error(err) return } - errhttp.Write(ctx, err, w) - return - } - // Actually run the query - rsp, err := b.execute(ctx, req) - if err != nil { - errhttp.Write(ctx, errutil.Internal( - "query.execution", - errutil.WithPublicMessage("Error executing query")). - Errorf("execution error: %w", err), w) - return - } + // Parses the request and splits it into multiple sub queries (if necessary) + req, err := b.parser.parseRequest(ctx, raw) + if err != nil { + if errors.Is(err, datasources.ErrDataSourceNotFound) { + // TODO, can we wrap the error somehow? + err = &errorsK8s.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusBadRequest, // the URL is found, but includes bad requests + Reason: metav1.StatusReasonNotFound, + Message: "datasource not found", + }} + } + responder.Error(convertToK8sError(err)) + return + } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(query.GetResponseCode(rsp)) - _ = json.NewEncoder(w).Encode(rsp) + // Actually run the query + rsp, err := b.execute(ctx, req) + if err != nil { + responder.Error(convertToK8sError(err)) + return + } + + responder.Object(query.GetResponseCode(rsp), &query.QueryDataResponse{ + QueryDataResponse: *rsp, // wrap the backend response as a QueryDataResponse + }) + }), nil +} + +// Would be really nice if errutil was directly k8s compatible :( +func convertToK8sError(err error) error { + var gErr errutil.Error + if errors.As(err, &gErr) { + return &errorsK8s.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: int32(gErr.Reason.Status().HTTPStatus()), + Reason: metav1.StatusReason(gErr.Reason.Status()), // almost true + Message: gErr.PublicMessage, + }} + } + return err } func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) { diff --git a/pkg/registry/apis/query/register.go b/pkg/registry/apis/query/register.go index a3892beec47..56206c49676 100644 --- a/pkg/registry/apis/query/register.go +++ b/pkg/registry/apis/query/register.go @@ -144,6 +144,9 @@ func (b *QueryAPIBuilder) GetAPIGroupInfo( plugins.returnEmptyList = true } + // The query endpoint -- NOTE, this uses a rewrite hack to allow requests without a name parameter + storage["query"] = &queryREST{builder: b} + apiGroupInfo.VersionedResourcesStorageMap[gv.Version] = storage return &apiGroupInfo, nil } @@ -154,122 +157,7 @@ func (b *QueryAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions { // Register additional routes with the server func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes { - routes := &builder.APIRoutes{ - Namespace: []builder.APIRouteHandler{ - { - Path: "query", - Spec: &spec3.PathProps{ - Post: &spec3.Operation{ - OperationProps: spec3.OperationProps{ - Tags: []string{"query"}, - Summary: "Query", - Description: "longer description here?", - Parameters: []*spec3.Parameter{ - { - ParameterProps: spec3.ParameterProps{ - Name: "namespace", - In: "path", - Required: true, - Example: "default", - Description: "workspace", - Schema: spec.StringProperty(), - }, - }, - }, - RequestBody: &spec3.RequestBody{ - RequestBodyProps: spec3.RequestBodyProps{ - Content: map[string]*spec3.MediaType{ - "application/json": { - MediaTypeProps: spec3.MediaTypeProps{ - Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey), - Examples: map[string]*spec3.Example{ - "A": { - ExampleProps: spec3.ExampleProps{ - Summary: "Random walk (testdata)", - Description: "Use testdata to execute a random walk query", - Value: `{ - "queries": [ - { - "refId": "A", - "scenarioId": "random_walk_table", - "seriesCount": 1, - "datasource": { - "type": "grafana-testdata-datasource", - "uid": "PD8C576611E62080A" - }, - "intervalMs": 60000, - "maxDataPoints": 20 - } - ], - "from": "now-6h", - "to": "now" - }`, - }, - }, - "B": { - ExampleProps: spec3.ExampleProps{ - Summary: "With deprecated datasource name", - Description: "Includes an old style string for datasource reference", - Value: `{ - "queries": [ - { - "refId": "A", - "datasource": { - "type": "grafana-googlesheets-datasource", - "uid": "b1808c48-9fc9-4045-82d7-081781f8a553" - }, - "cacheDurationSeconds": 300, - "spreadsheet": "spreadsheetID", - "datasourceId": 4, - "intervalMs": 30000, - "maxDataPoints": 794 - }, - { - "refId": "Z", - "datasource": "old", - "maxDataPoints": 10, - "timeRange": { - "from": "100", - "to": "200" - } - } - ], - "from": "now-6h", - "to": "now" - }`, - }, - }, - }, - }, - }, - }, - }, - }, - Responses: &spec3.Responses{ - ResponsesProps: spec3.ResponsesProps{ - StatusCodeResponses: map[int]*spec3.Response{ - 200: { - ResponseProps: spec3.ResponseProps{ - Content: map[string]*spec3.MediaType{ - "application/json": { - MediaTypeProps: spec3.MediaTypeProps{ - Schema: spec.StringProperty(), // TODO!!! - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - Handler: b.doQuery, - }, - }, - } - return routes + return nil } func (b *QueryAPIBuilder) GetAuthorizer() authorizer.Authorizer { @@ -302,8 +190,99 @@ func (b *QueryAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.OpenAPI return oas, err } + // Rewrite the query path + sub := oas.Paths.Paths[root+"namespaces/{namespace}/query/{name}"] + if sub != nil && sub.Post != nil { + sub.Post.Tags = []string{"Query"} + sub.Parameters = []*spec3.Parameter{ + { + ParameterProps: spec3.ParameterProps{ + Name: "namespace", + In: "path", + Description: "object name and auth scope, such as for teams and projects", + Example: "default", + Required: true, + }, + }, + } + sub.Post.Description = "Query datasources (with expressions)" + sub.Post.Parameters = nil // + sub.Post.RequestBody = &spec3.RequestBody{ + RequestBodyProps: spec3.RequestBodyProps{ + Content: map[string]*spec3.MediaType{ + "application/json": { + MediaTypeProps: spec3.MediaTypeProps{ + Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey), + Examples: map[string]*spec3.Example{ + "A": { + ExampleProps: spec3.ExampleProps{ + Summary: "Random walk (testdata)", + Description: "Use testdata to execute a random walk query", + Value: `{ + "queries": [ + { + "refId": "A", + "scenarioId": "random_walk_table", + "seriesCount": 1, + "datasource": { + "type": "grafana-testdata-datasource", + "uid": "PD8C576611E62080A" + }, + "intervalMs": 60000, + "maxDataPoints": 20 + } + ], + "from": "now-6h", + "to": "now" + }`, + }, + }, + "B": { + ExampleProps: spec3.ExampleProps{ + Summary: "With deprecated datasource name", + Description: "Includes an old style string for datasource reference", + Value: `{ + "queries": [ + { + "refId": "A", + "datasource": { + "type": "grafana-googlesheets-datasource", + "uid": "b1808c48-9fc9-4045-82d7-081781f8a553" + }, + "cacheDurationSeconds": 300, + "spreadsheet": "spreadsheetID", + "datasourceId": 4, + "intervalMs": 30000, + "maxDataPoints": 794 + }, + { + "refId": "Z", + "datasource": "old", + "maxDataPoints": 10, + "timeRange": { + "from": "100", + "to": "200" + } + } + ], + "from": "now-6h", + "to": "now" + }`, + }, + }, + }, + }, + }, + }, + }, + } + + delete(oas.Paths.Paths, root+"namespaces/{namespace}/query/{name}") + oas.Paths.Paths[root+"namespaces/{namespace}/query"] = sub + } + // The root API discovery list - sub := oas.Paths.Paths[root] + sub = oas.Paths.Paths[root] if sub != nil && sub.Get != nil { sub.Get.Tags = []string{"API Discovery"} // sorts first in the list } diff --git a/pkg/tests/apis/query/query_test.go b/pkg/tests/apis/query/query_test.go index b9786218a5b..738356a44a7 100644 --- a/pkg/tests/apis/query/query_test.go +++ b/pkg/tests/apis/query/query_test.go @@ -139,15 +139,24 @@ func TestIntegrationSimpleQuery(t *testing.T) { require.Error(t, err, "expecting a 400") require.JSONEq(t, `{ - "status": "Failure", + "kind": "Status", + "apiVersion": "v1", "metadata": {}, + "status": "Failure", "message": "did not execute expression [Y] due to a failure to of the dependent expression or query [X]", - "reason": "BadRequest", - "details": { "group": "query.grafana.app" }, - "code": 400, - "messageId": "sse.dependencyError", - "extra": { "depRefId": "X", "refId": "Y" } + "reason": "Bad request", + "code": 400 }`, string(body)) + // require.JSONEq(t, `{ + // "status": "Failure", + // "metadata": {}, + // "message": "did not execute expression [Y] due to a failure to of the dependent expression or query [X]", + // "reason": "BadRequest", + // "details": { "group": "query.grafana.app" }, + // "code": 400, + // "messageId": "sse.dependencyError", + // "extra": { "depRefId": "X", "refId": "Y" } + // }`, string(body)) statusCode := -1 contentType := "?"