QueryService: Move raw http.Handler to rest.Connector (#87595)

* query connector

* improve error wrappers

* return 400 for missing datasource (not 404)

* fix errors

* message
This commit is contained in:
Ryan McKinley
2024-05-10 21:01:17 +03:00
committed by GitHub
parent f43ed7e6d7
commit f880abc292
4 changed files with 225 additions and 160 deletions

View File

@ -26,14 +26,20 @@ import (
"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
)
// TODO: this is a temporary hack to make rest.Connecter work with resource level routes
var pathRewriters = []filters.PathRewriter{
{
// TODO: this is a temporary hack to make rest.Connecter work with resource level routes
Pattern: regexp.MustCompile(`(/apis/scope.grafana.app/v0alpha1/namespaces/.*/find$)`),
ReplaceFunc: func(matches []string) string {
return matches[1] + "/name" // connector requires a name
},
},
{
Pattern: regexp.MustCompile(`(/apis/query.grafana.app/v0alpha1/namespaces/.*/query$)`),
ReplaceFunc: func(matches []string) string {
return matches[1] + "/name" // connector requires a name
},
},
}
func SetupConfig(

View File

@ -2,7 +2,6 @@ package query
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
@ -12,57 +11,129 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
errorsK8s "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/rest"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/grafana/grafana/pkg/util/errutil/errhttp"
"github.com/grafana/grafana/pkg/web"
)
// The query method (not really a create)
func (b *QueryAPIBuilder) doQuery(w http.ResponseWriter, r *http.Request) {
ctx, span := b.tracer.Start(r.Context(), "QueryService.Query")
defer span.End()
type queryREST struct {
builder *QueryAPIBuilder
}
raw := &query.QueryDataRequest{}
err := web.Bind(r, raw)
if err != nil {
errhttp.Write(ctx, errutil.BadRequest(
"query.bind",
errutil.WithPublicMessage("Error reading query")).
Errorf("error reading: %w", err), w)
return
var (
_ rest.Storage = (*queryREST)(nil)
_ rest.SingularNameProvider = (*queryREST)(nil)
_ rest.Connecter = (*queryREST)(nil)
_ rest.Scoper = (*queryREST)(nil)
_ rest.StorageMetadata = (*queryREST)(nil)
)
func (r *queryREST) New() runtime.Object {
// This is added as the "ResponseType" regarless what ProducesObject() says :)
return &query.QueryDataResponse{}
}
func (r *queryREST) Destroy() {}
func (r *queryREST) NamespaceScoped() bool {
return true
}
func (r *queryREST) GetSingularName() string {
return "QueryResults" // Used for the
}
func (r *queryREST) ProducesMIMETypes(verb string) []string {
return []string{"application/json"} // and parquet!
}
func (r *queryREST) ProducesObject(verb string) interface{} {
return &query.QueryDataResponse{}
}
func (r *queryREST) ConnectMethods() []string {
return []string{"POST"}
}
func (r *queryREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, "" // true means you can use the trailing path as a variable
}
func (r *queryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
// See: /pkg/apiserver/builder/helper.go#L34
// The name is set with a rewriter hack
if name != "name" {
return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name)
}
b := r.builder
// Parses the request and splits it into multiple sub queries (if necessary)
req, err := b.parser.parseRequest(ctx, raw)
if err != nil {
if errors.Is(err, datasources.ErrDataSourceNotFound) {
errhttp.Write(ctx, errutil.BadRequest(
"query.datasource.notfound",
errutil.WithPublicMessage(err.Error())), w)
return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) {
ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.Query")
defer span.End()
raw := &query.QueryDataRequest{}
err := web.Bind(httpreq, raw)
if err != nil {
err = errorsK8s.NewBadRequest("error reading query")
// TODO: can we wrap the error so details are not lost?!
// errutil.BadRequest(
// "query.bind",
// errutil.WithPublicMessage("Error reading query")).
// Errorf("error reading: %w", err)
responder.Error(err)
return
}
errhttp.Write(ctx, err, w)
return
}
// Actually run the query
rsp, err := b.execute(ctx, req)
if err != nil {
errhttp.Write(ctx, errutil.Internal(
"query.execution",
errutil.WithPublicMessage("Error executing query")).
Errorf("execution error: %w", err), w)
return
}
// Parses the request and splits it into multiple sub queries (if necessary)
req, err := b.parser.parseRequest(ctx, raw)
if err != nil {
if errors.Is(err, datasources.ErrDataSourceNotFound) {
// TODO, can we wrap the error somehow?
err = &errorsK8s.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusBadRequest, // the URL is found, but includes bad requests
Reason: metav1.StatusReasonNotFound,
Message: "datasource not found",
}}
}
responder.Error(convertToK8sError(err))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(query.GetResponseCode(rsp))
_ = json.NewEncoder(w).Encode(rsp)
// Actually run the query
rsp, err := b.execute(ctx, req)
if err != nil {
responder.Error(convertToK8sError(err))
return
}
responder.Object(query.GetResponseCode(rsp), &query.QueryDataResponse{
QueryDataResponse: *rsp, // wrap the backend response as a QueryDataResponse
})
}), nil
}
// Would be really nice if errutil was directly k8s compatible :(
func convertToK8sError(err error) error {
var gErr errutil.Error
if errors.As(err, &gErr) {
return &errorsK8s.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: int32(gErr.Reason.Status().HTTPStatus()),
Reason: metav1.StatusReason(gErr.Reason.Status()), // almost true
Message: gErr.PublicMessage,
}}
}
return err
}
func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) {

View File

@ -144,6 +144,9 @@ func (b *QueryAPIBuilder) GetAPIGroupInfo(
plugins.returnEmptyList = true
}
// The query endpoint -- NOTE, this uses a rewrite hack to allow requests without a name parameter
storage["query"] = &queryREST{builder: b}
apiGroupInfo.VersionedResourcesStorageMap[gv.Version] = storage
return &apiGroupInfo, nil
}
@ -154,122 +157,7 @@ func (b *QueryAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions {
// Register additional routes with the server
func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
routes := &builder.APIRoutes{
Namespace: []builder.APIRouteHandler{
{
Path: "query",
Spec: &spec3.PathProps{
Post: &spec3.Operation{
OperationProps: spec3.OperationProps{
Tags: []string{"query"},
Summary: "Query",
Description: "longer description here?",
Parameters: []*spec3.Parameter{
{
ParameterProps: spec3.ParameterProps{
Name: "namespace",
In: "path",
Required: true,
Example: "default",
Description: "workspace",
Schema: spec.StringProperty(),
},
},
},
RequestBody: &spec3.RequestBody{
RequestBodyProps: spec3.RequestBodyProps{
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey),
Examples: map[string]*spec3.Example{
"A": {
ExampleProps: spec3.ExampleProps{
Summary: "Random walk (testdata)",
Description: "Use testdata to execute a random walk query",
Value: `{
"queries": [
{
"refId": "A",
"scenarioId": "random_walk_table",
"seriesCount": 1,
"datasource": {
"type": "grafana-testdata-datasource",
"uid": "PD8C576611E62080A"
},
"intervalMs": 60000,
"maxDataPoints": 20
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
"B": {
ExampleProps: spec3.ExampleProps{
Summary: "With deprecated datasource name",
Description: "Includes an old style string for datasource reference",
Value: `{
"queries": [
{
"refId": "A",
"datasource": {
"type": "grafana-googlesheets-datasource",
"uid": "b1808c48-9fc9-4045-82d7-081781f8a553"
},
"cacheDurationSeconds": 300,
"spreadsheet": "spreadsheetID",
"datasourceId": 4,
"intervalMs": 30000,
"maxDataPoints": 794
},
{
"refId": "Z",
"datasource": "old",
"maxDataPoints": 10,
"timeRange": {
"from": "100",
"to": "200"
}
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
},
},
},
},
},
},
Responses: &spec3.Responses{
ResponsesProps: spec3.ResponsesProps{
StatusCodeResponses: map[int]*spec3.Response{
200: {
ResponseProps: spec3.ResponseProps{
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: spec.StringProperty(), // TODO!!!
},
},
},
},
},
},
},
},
},
},
},
Handler: b.doQuery,
},
},
}
return routes
return nil
}
func (b *QueryAPIBuilder) GetAuthorizer() authorizer.Authorizer {
@ -302,8 +190,99 @@ func (b *QueryAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.OpenAPI
return oas, err
}
// Rewrite the query path
sub := oas.Paths.Paths[root+"namespaces/{namespace}/query/{name}"]
if sub != nil && sub.Post != nil {
sub.Post.Tags = []string{"Query"}
sub.Parameters = []*spec3.Parameter{
{
ParameterProps: spec3.ParameterProps{
Name: "namespace",
In: "path",
Description: "object name and auth scope, such as for teams and projects",
Example: "default",
Required: true,
},
},
}
sub.Post.Description = "Query datasources (with expressions)"
sub.Post.Parameters = nil //
sub.Post.RequestBody = &spec3.RequestBody{
RequestBodyProps: spec3.RequestBodyProps{
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey),
Examples: map[string]*spec3.Example{
"A": {
ExampleProps: spec3.ExampleProps{
Summary: "Random walk (testdata)",
Description: "Use testdata to execute a random walk query",
Value: `{
"queries": [
{
"refId": "A",
"scenarioId": "random_walk_table",
"seriesCount": 1,
"datasource": {
"type": "grafana-testdata-datasource",
"uid": "PD8C576611E62080A"
},
"intervalMs": 60000,
"maxDataPoints": 20
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
"B": {
ExampleProps: spec3.ExampleProps{
Summary: "With deprecated datasource name",
Description: "Includes an old style string for datasource reference",
Value: `{
"queries": [
{
"refId": "A",
"datasource": {
"type": "grafana-googlesheets-datasource",
"uid": "b1808c48-9fc9-4045-82d7-081781f8a553"
},
"cacheDurationSeconds": 300,
"spreadsheet": "spreadsheetID",
"datasourceId": 4,
"intervalMs": 30000,
"maxDataPoints": 794
},
{
"refId": "Z",
"datasource": "old",
"maxDataPoints": 10,
"timeRange": {
"from": "100",
"to": "200"
}
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
},
},
},
},
},
}
delete(oas.Paths.Paths, root+"namespaces/{namespace}/query/{name}")
oas.Paths.Paths[root+"namespaces/{namespace}/query"] = sub
}
// The root API discovery list
sub := oas.Paths.Paths[root]
sub = oas.Paths.Paths[root]
if sub != nil && sub.Get != nil {
sub.Get.Tags = []string{"API Discovery"} // sorts first in the list
}

View File

@ -139,15 +139,24 @@ func TestIntegrationSimpleQuery(t *testing.T) {
require.Error(t, err, "expecting a 400")
require.JSONEq(t, `{
"status": "Failure",
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "did not execute expression [Y] due to a failure to of the dependent expression or query [X]",
"reason": "BadRequest",
"details": { "group": "query.grafana.app" },
"code": 400,
"messageId": "sse.dependencyError",
"extra": { "depRefId": "X", "refId": "Y" }
"reason": "Bad request",
"code": 400
}`, string(body))
// require.JSONEq(t, `{
// "status": "Failure",
// "metadata": {},
// "message": "did not execute expression [Y] due to a failure to of the dependent expression or query [X]",
// "reason": "BadRequest",
// "details": { "group": "query.grafana.app" },
// "code": 400,
// "messageId": "sse.dependencyError",
// "extra": { "depRefId": "X", "refId": "Y" }
// }`, string(body))
statusCode := -1
contentType := "?"