diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index b92749e4304..516cf303af2 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -186,4 +186,5 @@ The following toggles require explicitly setting Grafana's [app mode]({{< relref | `externalServiceAuth` | Starts an OAuth2 authentication provider for external services | | `grafanaAPIServerWithExperimentalAPIs` | Register experimental APIs with the k8s API server | | `grafanaAPIServerEnsureKubectlAccess` | Start an additional https handler and write kubectl options | +| `kubernetesQueryServiceRewrite` | Rewrite requests targeting /ds/query to the query service | | `panelTitleSearchInV1` | Enable searching for dashboards using panel title in search v1 | diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 75f54ac9821..243dc60d572 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -130,6 +130,7 @@ export interface FeatureToggles { transformationsVariableSupport?: boolean; kubernetesPlaylists?: boolean; kubernetesSnapshots?: boolean; + kubernetesQueryServiceRewrite?: boolean; cloudWatchBatchQueries?: boolean; recoveryThreshold?: boolean; lokiStructuredMetadata?: boolean; diff --git a/pkg/api/api.go b/pkg/api/api.go index 5ecc27e8029..528db6e2cab 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -515,7 +515,7 @@ func (hs *HTTPServer) registerRoutes() { // metrics // DataSource w/ expressions - apiRoute.Post("/ds/query", requestmeta.SetSLOGroup(requestmeta.SLOGroupHighSlow), authorize(ac.EvalPermission(datasources.ActionQuery)), routing.Wrap(hs.QueryMetricsV2)) + apiRoute.Post("/ds/query", requestmeta.SetSLOGroup(requestmeta.SLOGroupHighSlow), authorize(ac.EvalPermission(datasources.ActionQuery)), hs.getDSQueryEndpoint()) apiRoute.Group("/alerts", func(alertsRoute routing.RouteRegister) { alertsRoute.Post("/test", routing.Wrap(hs.AlertTest)) diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index f6acadf1358..4bbe662c081 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -10,10 +10,14 @@ import ( "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/response" + "github.com/grafana/grafana/pkg/api/routing" + "github.com/grafana/grafana/pkg/infra/appcontext" "github.com/grafana/grafana/pkg/middleware/requestmeta" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/grafana-apiserver/endpoints/request" + "github.com/grafana/grafana/pkg/util/errutil/errhttp" "github.com/grafana/grafana/pkg/web" ) @@ -33,6 +37,26 @@ func (hs *HTTPServer) handleQueryMetricsError(err error) *response.NormalRespons return response.ErrOrFallback(http.StatusInternalServerError, "Query data error", err) } +// metrics.go +func (hs *HTTPServer) getDSQueryEndpoint() web.Handler { + if hs.Features.IsEnabledGlobally(featuremgmt.FlagKubernetesQueryServiceRewrite) { + // DEV ONLY FEATURE FLAG! + // rewrite requests from /ds/query to the new query service + namespaceMapper := request.GetNamespaceMapper(hs.Cfg) + return func(w http.ResponseWriter, r *http.Request) { + user, err := appcontext.User(r.Context()) + if err != nil || user == nil { + errhttp.Write(r.Context(), fmt.Errorf("no user"), w) + return + } + r.URL.Path = "/apis/query.grafana.app/v0alpha1/namespaces/" + namespaceMapper(user.OrgID) + "/query" + hs.clientConfigProvider.DirectlyServeHTTP(w, r) + } + } + + return routing.Wrap(hs.QueryMetricsV2) +} + // QueryMetricsV2 returns query metrics. // swagger:route POST /ds/query ds queryMetricsWithExpressions // diff --git a/pkg/apis/query/v0alpha1/doc.go b/pkg/apis/query/v0alpha1/doc.go new file mode 100644 index 00000000000..e6b9ec55032 --- /dev/null +++ b/pkg/apis/query/v0alpha1/doc.go @@ -0,0 +1,6 @@ +// +k8s:deepcopy-gen=package +// +k8s:openapi-gen=true +// +k8s:defaulter-gen=TypeMeta +// +groupName=query.grafana.app + +package v0alpha1 diff --git a/pkg/apis/query/v0alpha1/query.go b/pkg/apis/query/v0alpha1/query.go new file mode 100644 index 00000000000..4ae9435f0d3 --- /dev/null +++ b/pkg/apis/query/v0alpha1/query.go @@ -0,0 +1,211 @@ +package v0alpha1 + +import ( + "encoding/json" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// Generic query request with shared time across all values +// Copied from: https://github.com/grafana/grafana/blob/main/pkg/api/dtos/models.go#L62 +type GenericQueryRequest struct { + metav1.TypeMeta `json:",inline"` + + // From Start time in epoch timestamps in milliseconds or relative using Grafana time units. + // example: now-1h + From string `json:"from,omitempty"` + + // To End time in epoch timestamps in milliseconds or relative using Grafana time units. + // example: now + To string `json:"to,omitempty"` + + // queries.refId – Specifies an identifier of the query. Is optional and default to “A”. + // queries.datasourceId – Specifies the data source to be queried. Each query in the request must have an unique datasourceId. + // queries.maxDataPoints - Species maximum amount of data points that dashboard panel can render. Is optional and default to 100. + // queries.intervalMs - Specifies the time interval in milliseconds of time series. Is optional and defaults to 1000. + // required: true + // example: [ { "refId": "A", "intervalMs": 86400000, "maxDataPoints": 1092, "datasource":{ "uid":"PD8C576611E62080A" }, "rawSql": "SELECT 1 as valueOne, 2 as valueTwo", "format": "table" } ] + Queries []GenericDataQuery `json:"queries"` + + // required: false + Debug bool `json:"debug,omitempty"` +} + +type DataSourceRef struct { + // The datasource plugin type + Type string `json:"type"` + + // Datasource UID + UID string `json:"uid"` +} + +// GenericDataQuery is a replacement for `dtos.MetricRequest` that provides more explicit types +type GenericDataQuery struct { + // RefID is the unique identifier of the query, set by the frontend call. + RefID string `json:"refId"` + + // TimeRange represents the query range + // NOTE: unlike generic /ds/query, we can now send explicit time values in each query + TimeRange *TimeRange `json:"timeRange,omitempty"` + + // The datasource + Datasource *DataSourceRef `json:"datasource,omitempty"` + + // Deprecated -- use datasource ref instead + DatasourceId int64 `json:"datasourceId,omitempty"` + + // QueryType is an optional identifier for the type of query. + // It can be used to distinguish different types of queries. + QueryType string `json:"queryType,omitempty"` + + // MaxDataPoints is the maximum number of data points that should be returned from a time series query. + MaxDataPoints int64 `json:"maxDataPoints,omitempty"` + + // Interval is the suggested duration between time points in a time series query. + IntervalMS float64 `json:"intervalMs,omitempty"` + + // true if query is disabled (ie should not be returned to the dashboard) + // Note this does not always imply that the query should not be executed since + // the results from a hidden query may be used as the input to other queries (SSE etc) + Hide bool `json:"hide,omitempty"` + + // Additional Properties (that live at the root) + props map[string]any `json:"-"` +} + +// TimeRange represents a time range for a query and is a property of DataQuery. +type TimeRange struct { + // From is the start time of the query. + From string `json:"from"` + + // To is the end time of the query. + To string `json:"to"` +} + +func (g *GenericDataQuery) AdditionalProperties() map[string]any { + if g.props == nil { + g.props = make(map[string]any) + } + return g.props +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (g *GenericDataQuery) DeepCopyInto(out *GenericDataQuery) { + *out = *g + if g.props != nil { + out.props = runtime.DeepCopyJSON(g.props) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GenericDataQuery. +func (g *GenericDataQuery) DeepCopy() *GenericDataQuery { + if g == nil { + return nil + } + out := new(GenericDataQuery) + g.DeepCopyInto(out) + return out +} + +// MarshalJSON ensures that the unstructured object produces proper +// JSON when passed to Go's standard JSON library. +func (g GenericDataQuery) MarshalJSON() ([]byte, error) { + vals := map[string]any{} + if g.props != nil { + for k, v := range g.props { + vals[k] = v + } + } + + vals["refId"] = g.RefID + if g.Datasource.Type != "" || g.Datasource.UID != "" { + vals["datasource"] = g.Datasource + } + if g.DatasourceId > 0 { + vals["datasourceId"] = g.DatasourceId + } + if g.IntervalMS > 0 { + vals["intervalMs"] = g.IntervalMS + } + if g.MaxDataPoints > 0 { + vals["maxDataPoints"] = g.MaxDataPoints + } + return json.Marshal(vals) +} + +// UnmarshalJSON ensures that the unstructured object properly decodes +// JSON when passed to Go's standard JSON library. +func (g *GenericDataQuery) UnmarshalJSON(b []byte) error { + vals := map[string]any{} + err := json.Unmarshal(b, &vals) + if err != nil { + return err + } + key := "refId" + v, ok := vals[key] + if ok { + g.RefID, ok = v.(string) + if !ok { + return fmt.Errorf("expected string refid (got: %t)", v) + } + delete(vals, key) + } + + key = "datasource" + v, ok = vals[key] + if ok { + wrap, ok := v.(map[string]any) + if ok { + g.Datasource = &DataSourceRef{} + g.Datasource.Type, _ = wrap["type"].(string) + g.Datasource.UID, _ = wrap["uid"].(string) + delete(vals, key) + } else { + // Old old queries may arrive with just the name + name, ok := v.(string) + if !ok { + return fmt.Errorf("expected datasource as object (got: %t)", v) + } + g.Datasource = &DataSourceRef{} + g.Datasource.UID = name // Not great, but the lookup function will try its best to resolve + delete(vals, key) + } + } + + key = "intervalMs" + v, ok = vals[key] + if ok { + g.IntervalMS, ok = v.(float64) + if !ok { + return fmt.Errorf("expected intervalMs as float (got: %t)", v) + } + delete(vals, key) + } + + key = "maxDataPoints" + v, ok = vals[key] + if ok { + count, ok := v.(float64) + if !ok { + return fmt.Errorf("expected maxDataPoints as number (got: %t)", v) + } + g.MaxDataPoints = int64(count) + delete(vals, key) + } + + key = "datasourceId" + v, ok = vals[key] + if ok { + count, ok := v.(float64) + if !ok { + return fmt.Errorf("expected datasourceId as number (got: %t)", v) + } + g.DatasourceId = int64(count) + delete(vals, key) + } + + g.props = vals + return nil +} diff --git a/pkg/apis/query/v0alpha1/query_test.go b/pkg/apis/query/v0alpha1/query_test.go new file mode 100644 index 00000000000..e2ea3ce53c4 --- /dev/null +++ b/pkg/apis/query/v0alpha1/query_test.go @@ -0,0 +1,75 @@ +package v0alpha1_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" +) + +func TestParseQueriesIntoQueryDataRequest(t *testing.T) { + request := []byte(`{ + "queries": [ + { + "refId": "A", + "datasource": { + "type": "grafana-googlesheets-datasource", + "uid": "b1808c48-9fc9-4045-82d7-081781f8a553" + }, + "cacheDurationSeconds": 300, + "spreadsheet": "spreadsheetID", + "datasourceId": 4, + "intervalMs": 30000, + "maxDataPoints": 794 + }, + { + "refId": "Z", + "datasource": "old", + "maxDataPoints": 10, + "timeRange": { + "from": "100", + "to": "200" + } + } + ], + "from": "1692624667389", + "to": "1692646267389" + }`) + + req := &v0alpha1.GenericQueryRequest{} + err := json.Unmarshal(request, req) + require.NoError(t, err) + + require.Len(t, req.Queries, 2) + require.Equal(t, "b1808c48-9fc9-4045-82d7-081781f8a553", req.Queries[0].Datasource.UID) + require.Equal(t, "spreadsheetID", req.Queries[0].AdditionalProperties()["spreadsheet"]) + + // Write the query (with additional spreadsheetID) to JSON + out, err := json.MarshalIndent(req.Queries[0], "", " ") + require.NoError(t, err) + + // And read it back with standard JSON marshal functions + query := &v0alpha1.GenericDataQuery{} + err = json.Unmarshal(out, query) + require.NoError(t, err) + require.Equal(t, "spreadsheetID", query.AdditionalProperties()["spreadsheet"]) + + // The second query has an explicit time range, and legacy datasource name + out, err = json.MarshalIndent(req.Queries[1], "", " ") + require.NoError(t, err) + // fmt.Printf("%s\n", string(out)) + require.JSONEq(t, `{ + "datasource": { + "type": "", ` /* NOTE! this implies legacy naming */ +` + "uid": "old" + }, + "maxDataPoints": 10, + "refId": "Z", + "timeRange": { + "from": "100", + "to": "200" + } + }`, string(out)) +} diff --git a/pkg/apis/query/v0alpha1/register.go b/pkg/apis/query/v0alpha1/register.go new file mode 100644 index 00000000000..b728ad7b43a --- /dev/null +++ b/pkg/apis/query/v0alpha1/register.go @@ -0,0 +1,25 @@ +package v0alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + common "github.com/grafana/grafana/pkg/apis/common/v0alpha1" +) + +const ( + GROUP = "query.grafana.app" + VERSION = "v0alpha1" + APIVERSION = GROUP + "/" + VERSION +) + +var DataSourceApiServerResourceInfo = common.NewResourceInfo(GROUP, VERSION, + "datasourceapiservers", "datasourceapiserver", "DataSourceApiServer", + func() runtime.Object { return &DataSourceApiServer{} }, + func() runtime.Object { return &DataSourceApiServerList{} }, +) + +var ( + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: GROUP, Version: VERSION} +) diff --git a/pkg/apis/query/v0alpha1/results.go b/pkg/apis/query/v0alpha1/results.go new file mode 100644 index 00000000000..2c3cab7b3f0 --- /dev/null +++ b/pkg/apis/query/v0alpha1/results.go @@ -0,0 +1,64 @@ +package v0alpha1 + +import ( + "encoding/json" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + openapi "k8s.io/kube-openapi/pkg/common" + spec "k8s.io/kube-openapi/pkg/validation/spec" +) + +// Wraps backend.QueryDataResponse, however it includes TypeMeta and implements runtime.Object +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type QueryDataResponse struct { + metav1.TypeMeta `json:",inline"` + + // Backend wrapper (external dependency) + backend.QueryDataResponse +} + +// Expose backend DataResponse in OpenAPI (yes this still requires some serious love!) +func (r QueryDataResponse) OpenAPIDefinition() openapi.OpenAPIDefinition { + return openapi.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{Allows: true}, + }, + VendorExtensible: spec.VendorExtensible{ + Extensions: map[string]interface{}{ + "x-kubernetes-preserve-unknown-fields": true, + }, + }, + }, + } +} + +// MarshalJSON writes the results as json +func (r QueryDataResponse) MarshalJSON() ([]byte, error) { + return r.QueryDataResponse.MarshalJSON() +} + +// UnmarshalJSON will read JSON into a QueryDataResponse +func (r *QueryDataResponse) UnmarshalJSON(b []byte) error { + return r.QueryDataResponse.UnmarshalJSON(b) +} + +func (r *QueryDataResponse) DeepCopy() *QueryDataResponse { + if r == nil { + return nil + } + + // /!\ The most dumb approach, but OK for now... + // likely best to move DeepCopy into SDK + out := &QueryDataResponse{} + body, _ := json.Marshal(r.QueryDataResponse) + _ = json.Unmarshal(body, &out.QueryDataResponse) + return out +} + +func (r *QueryDataResponse) DeepCopyInto(out *QueryDataResponse) { + clone := r.DeepCopy() + *out = *clone +} diff --git a/pkg/apis/query/v0alpha1/types.go b/pkg/apis/query/v0alpha1/types.go new file mode 100644 index 00000000000..3f288eee25d --- /dev/null +++ b/pkg/apis/query/v0alpha1/types.go @@ -0,0 +1,65 @@ +package v0alpha1 + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// The query runner interface +type QueryRunner interface { + // Runs the query as the user in context + ExecuteQueryData(ctx context.Context, + // The k8s group for the datasource (pluginId) + datasource schema.GroupVersion, + + // The datasource name/uid + name string, + + // The raw backend query objects + query []GenericDataQuery, + ) (*backend.QueryDataResponse, error) +} + +type DataSourceApiServerRegistry interface { + // Get the group and preferred version for a plugin + GetDatasourceGroupVersion(pluginId string) (schema.GroupVersion, error) + + // Get the list of available datasource api servers + // The values will be managed though API discovery/reconciliation + GetDatasourceApiServers(ctx context.Context) (*DataSourceApiServerList, error) +} + +// The data source resource is a reflection of the individual datasource instances +// that are exposed in the groups: {datasource}.datasource.grafana.app +// The status is updated periodically. +// The name is the plugin id +// +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type DataSourceApiServer struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // The display name + Title string `json:"title"` + + // Describe the plugin + Description string `json:"description,omitempty"` + + // The group + preferred version + GroupVersion string `json:"groupVersion"` + + // Possible alternative plugin IDs + AliasIDs []string `json:"aliasIDs,omitempty"` +} + +// List of datasource plugins +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type DataSourceApiServerList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []DataSourceApiServer `json:"items,omitempty"` +} diff --git a/pkg/apis/query/v0alpha1/zz_generated.deepcopy.go b/pkg/apis/query/v0alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..927ed373ddc --- /dev/null +++ b/pkg/apis/query/v0alpha1/zz_generated.deepcopy.go @@ -0,0 +1,140 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// SPDX-License-Identifier: AGPL-3.0-only + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v0alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataSourceApiServer) DeepCopyInto(out *DataSourceApiServer) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.AliasIDs != nil { + in, out := &in.AliasIDs, &out.AliasIDs + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataSourceApiServer. +func (in *DataSourceApiServer) DeepCopy() *DataSourceApiServer { + if in == nil { + return nil + } + out := new(DataSourceApiServer) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DataSourceApiServer) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataSourceApiServerList) DeepCopyInto(out *DataSourceApiServerList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]DataSourceApiServer, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataSourceApiServerList. +func (in *DataSourceApiServerList) DeepCopy() *DataSourceApiServerList { + if in == nil { + return nil + } + out := new(DataSourceApiServerList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DataSourceApiServerList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataSourceRef) DeepCopyInto(out *DataSourceRef) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataSourceRef. +func (in *DataSourceRef) DeepCopy() *DataSourceRef { + if in == nil { + return nil + } + out := new(DataSourceRef) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GenericQueryRequest) DeepCopyInto(out *GenericQueryRequest) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Queries != nil { + in, out := &in.Queries, &out.Queries + *out = make([]GenericDataQuery, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GenericQueryRequest. +func (in *GenericQueryRequest) DeepCopy() *GenericQueryRequest { + if in == nil { + return nil + } + out := new(GenericQueryRequest) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *QueryDataResponse) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TimeRange) DeepCopyInto(out *TimeRange) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TimeRange. +func (in *TimeRange) DeepCopy() *TimeRange { + if in == nil { + return nil + } + out := new(TimeRange) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/query/v0alpha1/zz_generated.defaults.go b/pkg/apis/query/v0alpha1/zz_generated.defaults.go new file mode 100644 index 00000000000..238fc2f4edc --- /dev/null +++ b/pkg/apis/query/v0alpha1/zz_generated.defaults.go @@ -0,0 +1,19 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// SPDX-License-Identifier: AGPL-3.0-only + +// Code generated by defaulter-gen. DO NOT EDIT. + +package v0alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + return nil +} diff --git a/pkg/apis/query/v0alpha1/zz_generated.openapi.go b/pkg/apis/query/v0alpha1/zz_generated.openapi.go new file mode 100644 index 00000000000..c3a66de455c --- /dev/null +++ b/pkg/apis/query/v0alpha1/zz_generated.openapi.go @@ -0,0 +1,343 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// SPDX-License-Identifier: AGPL-3.0-only + +// Code generated by openapi-gen. DO NOT EDIT. + +// This file was autogenerated by openapi-gen. Do not edit it manually! + +package v0alpha1 + +import ( + common "k8s.io/kube-openapi/pkg/common" + spec "k8s.io/kube-openapi/pkg/validation/spec" +) + +func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { + return map[string]common.OpenAPIDefinition{ + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceApiServer": schema_pkg_apis_query_v0alpha1_DataSourceApiServer(ref), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceApiServerList": schema_pkg_apis_query_v0alpha1_DataSourceApiServerList(ref), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceRef": schema_pkg_apis_query_v0alpha1_DataSourceRef(ref), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.GenericDataQuery": schema_pkg_apis_query_v0alpha1_GenericDataQuery(ref), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.GenericQueryRequest": schema_pkg_apis_query_v0alpha1_GenericQueryRequest(ref), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.QueryDataResponse": QueryDataResponse{}.OpenAPIDefinition(), + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.TimeRange": schema_pkg_apis_query_v0alpha1_TimeRange(ref), + } +} + +func schema_pkg_apis_query_v0alpha1_DataSourceApiServer(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "The data source resource is a reflection of the individual datasource instances that are exposed in the groups: {datasource}.datasource.grafana.app The status is updated periodically. The name is the plugin id", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "title": { + SchemaProps: spec.SchemaProps{ + Description: "The display name", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "description": { + SchemaProps: spec.SchemaProps{ + Description: "Describe the plugin", + Type: []string{"string"}, + Format: "", + }, + }, + "groupVersion": { + SchemaProps: spec.SchemaProps{ + Description: "The group + preferred version", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "aliasIDs": { + SchemaProps: spec.SchemaProps{ + Description: "Possible alternative plugin IDs", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + }, + Required: []string{"title", "groupVersion"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_pkg_apis_query_v0alpha1_DataSourceApiServerList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "List of datasource plugins", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceApiServer"), + }, + }, + }, + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceApiServer", "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"}, + } +} + +func schema_pkg_apis_query_v0alpha1_DataSourceRef(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "type": { + SchemaProps: spec.SchemaProps{ + Description: "The datasource plugin type", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "uid": { + SchemaProps: spec.SchemaProps{ + Description: "Datasource UID", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"type", "uid"}, + }, + }, + } +} + +func schema_pkg_apis_query_v0alpha1_GenericDataQuery(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "GenericDataQuery is a replacement for `dtos.MetricRequest` that provides more explicit types", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "refId": { + SchemaProps: spec.SchemaProps{ + Description: "RefID is the unique identifier of the query, set by the frontend call.", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "timeRange": { + SchemaProps: spec.SchemaProps{ + Description: "TimeRange represents the query range NOTE: unlike generic /ds/query, we can now send explicit time values in each query", + Ref: ref("github.com/grafana/grafana/pkg/apis/query/v0alpha1.TimeRange"), + }, + }, + "datasource": { + SchemaProps: spec.SchemaProps{ + Description: "The datasource", + Ref: ref("github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceRef"), + }, + }, + "datasourceId": { + SchemaProps: spec.SchemaProps{ + Description: "Deprecated -- use datasource ref instead", + Type: []string{"integer"}, + Format: "int64", + }, + }, + "queryType": { + SchemaProps: spec.SchemaProps{ + Description: "QueryType is an optional identifier for the type of query. It can be used to distinguish different types of queries.", + Type: []string{"string"}, + Format: "", + }, + }, + "maxDataPoints": { + SchemaProps: spec.SchemaProps{ + Description: "MaxDataPoints is the maximum number of data points that should be returned from a time series query.", + Type: []string{"integer"}, + Format: "int64", + }, + }, + "intervalMs": { + SchemaProps: spec.SchemaProps{ + Description: "Interval is the suggested duration between time points in a time series query.", + Type: []string{"number"}, + Format: "double", + }, + }, + "hide": { + SchemaProps: spec.SchemaProps{ + Description: "true if query is disabled (ie should not be returned to the dashboard) Note this does not always imply that the query should not be executed since the results from a hidden query may be used as the input to other queries (SSE etc)", + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + Required: []string{"refId"}, + }, + }, + Dependencies: []string{ + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.DataSourceRef", "github.com/grafana/grafana/pkg/apis/query/v0alpha1.TimeRange"}, + } +} + +func schema_pkg_apis_query_v0alpha1_GenericQueryRequest(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "Generic query request with shared time across all values Copied from: https://github.com/grafana/grafana/blob/main/pkg/api/dtos/models.go#L62", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "from": { + SchemaProps: spec.SchemaProps{ + Description: "From Start time in epoch timestamps in milliseconds or relative using Grafana time units. example: now-1h", + Type: []string{"string"}, + Format: "", + }, + }, + "to": { + SchemaProps: spec.SchemaProps{ + Description: "To End time in epoch timestamps in milliseconds or relative using Grafana time units. example: now", + Type: []string{"string"}, + Format: "", + }, + }, + "queries": { + SchemaProps: spec.SchemaProps{ + Description: "queries.refId – Specifies an identifier of the query. Is optional and default to “A”. queries.datasourceId – Specifies the data source to be queried. Each query in the request must have an unique datasourceId. queries.maxDataPoints - Species maximum amount of data points that dashboard panel can render. Is optional and default to 100. queries.intervalMs - Specifies the time interval in milliseconds of time series. Is optional and defaults to 1000. required: true example: [ { \"refId\": \"A\", \"intervalMs\": 86400000, \"maxDataPoints\": 1092, \"datasource\":{ \"uid\":\"PD8C576611E62080A\" }, \"rawSql\": \"SELECT 1 as valueOne, 2 as valueTwo\", \"format\": \"table\" } ]", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/grafana/grafana/pkg/apis/query/v0alpha1.GenericDataQuery"), + }, + }, + }, + }, + }, + "debug": { + SchemaProps: spec.SchemaProps{ + Description: "required: false", + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + Required: []string{"queries"}, + }, + }, + Dependencies: []string{ + "github.com/grafana/grafana/pkg/apis/query/v0alpha1.GenericDataQuery"}, + } +} + +func schema_pkg_apis_query_v0alpha1_TimeRange(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "TimeRange represents a time range for a query and is a property of DataQuery.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "from": { + SchemaProps: spec.SchemaProps{ + Description: "From is the start time of the query.", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "to": { + SchemaProps: spec.SchemaProps{ + Description: "To is the end time of the query.", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"from", "to"}, + }, + }, + } +} diff --git a/pkg/apis/query/v0alpha1/zz_generated.openapi_violation_exceptions.list b/pkg/apis/query/v0alpha1/zz_generated.openapi_violation_exceptions.list new file mode 100644 index 00000000000..b6d2a25a924 --- /dev/null +++ b/pkg/apis/query/v0alpha1/zz_generated.openapi_violation_exceptions.list @@ -0,0 +1,5 @@ +API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/query/v0alpha1,DataSourceApiServer,AliasIDs +API rule violation: list_type_missing,github.com/grafana/grafana/pkg/apis/query/v0alpha1,GenericQueryRequest,Queries +API rule violation: names_match,github.com/grafana/grafana/pkg/apis/query/v0alpha1,GenericDataQuery,IntervalMS +API rule violation: names_match,github.com/grafana/grafana/pkg/apis/query/v0alpha1,GenericDataQuery,RefID +API rule violation: names_match,github.com/grafana/grafana/pkg/apis/query/v0alpha1,QueryDataResponse,QueryDataResponse diff --git a/pkg/cmd/grafana/apiserver/server.go b/pkg/cmd/grafana/apiserver/server.go index 447567bb3de..d40f89d615b 100644 --- a/pkg/cmd/grafana/apiserver/server.go +++ b/pkg/cmd/grafana/apiserver/server.go @@ -14,6 +14,8 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/example" "github.com/grafana/grafana/pkg/registry/apis/featuretoggle" + "github.com/grafana/grafana/pkg/registry/apis/query" + "github.com/grafana/grafana/pkg/registry/apis/query/runner" "github.com/grafana/grafana/pkg/server" "github.com/grafana/grafana/pkg/services/featuremgmt" grafanaAPIServer "github.com/grafana/grafana/pkg/services/grafana-apiserver" @@ -50,6 +52,13 @@ func (o *APIServerOptions) loadAPIGroupBuilders(args []string) error { // No dependencies for testing case "example.grafana.app": o.builders = append(o.builders, example.NewTestingAPIBuilder()) + // Only works with testdata + case "query.grafana.app": + o.builders = append(o.builders, query.NewQueryAPIBuilder( + featuremgmt.WithFeatures(), + runner.NewDummyTestRunner(), + runner.NewDummyRegistry(), + )) case "featuretoggle.grafana.app": features := featuremgmt.WithFeatureManager(setting.FeatureMgmtSettings{}, nil) // none... for now o.builders = append(o.builders, featuretoggle.NewFeatureFlagAPIBuilder(features)) diff --git a/pkg/registry/apis/datasource/utils.go b/pkg/plugins/apiserver.go similarity index 72% rename from pkg/registry/apis/datasource/utils.go rename to pkg/plugins/apiserver.go index 5beca00d70b..e72355beb9e 100644 --- a/pkg/registry/apis/datasource/utils.go +++ b/pkg/plugins/apiserver.go @@ -1,11 +1,13 @@ -package datasource +package plugins import ( "fmt" "strings" ) -func getDatasourceGroupNameFromPluginID(pluginId string) (string, error) { +// Get the default API group name for from a plugin ID +// NOTE: this is a work in progress, and may change without notice +func GetDatasourceGroupNameFromPluginID(pluginId string) (string, error) { if pluginId == "" { return "", fmt.Errorf("bad pluginID (empty)") } diff --git a/pkg/registry/apis/datasource/utils_test.go b/pkg/plugins/apiserver_test.go similarity index 89% rename from pkg/registry/apis/datasource/utils_test.go rename to pkg/plugins/apiserver_test.go index 2bfded3d4e7..24d7102a40e 100644 --- a/pkg/registry/apis/datasource/utils_test.go +++ b/pkg/plugins/apiserver_test.go @@ -1,4 +1,4 @@ -package datasource +package plugins import ( "testing" @@ -22,11 +22,11 @@ func TestUtils(t *testing.T) { } func getIDIgnoreError(id string) string { - v, _ := getDatasourceGroupNameFromPluginID(id) + v, _ := GetDatasourceGroupNameFromPluginID(id) return v } func getErrorIgnoreValue(id string) error { - _, err := getDatasourceGroupNameFromPluginID(id) + _, err := GetDatasourceGroupNameFromPluginID(id) return err } diff --git a/pkg/registry/apis/apis.go b/pkg/registry/apis/apis.go index c2575481d70..f83ac018bda 100644 --- a/pkg/registry/apis/apis.go +++ b/pkg/registry/apis/apis.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/featuretoggle" "github.com/grafana/grafana/pkg/registry/apis/folders" "github.com/grafana/grafana/pkg/registry/apis/playlist" + "github.com/grafana/grafana/pkg/registry/apis/query" ) var ( @@ -27,6 +28,7 @@ func ProvideRegistryServiceSink( _ *featuretoggle.FeatureFlagAPIBuilder, _ *datasource.DataSourceAPIBuilder, _ *folders.FolderAPIBuilder, + _ *query.QueryAPIBuilder, ) *Service { return &Service{} } diff --git a/pkg/registry/apis/datasource/query.go b/pkg/registry/apis/datasource/query.go deleted file mode 100644 index 7952bcd561f..00000000000 --- a/pkg/registry/apis/datasource/query.go +++ /dev/null @@ -1,104 +0,0 @@ -package datasource - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - - "github.com/grafana/grafana/pkg/tsdb/legacydata" -) - -// Copied from: https://github.com/grafana/grafana/blob/main/pkg/api/dtos/models.go#L62 -type rawMetricRequest struct { - // From Start time in epoch timestamps in milliseconds or relative using Grafana time units. - // required: true - // example: now-1h - From string `json:"from"` - // To End time in epoch timestamps in milliseconds or relative using Grafana time units. - // required: true - // example: now - To string `json:"to"` - // queries.refId – Specifies an identifier of the query. Is optional and default to “A”. - // queries.datasourceId – Specifies the data source to be queried. Each query in the request must have an unique datasourceId. - // queries.maxDataPoints - Species maximum amount of data points that dashboard panel can render. Is optional and default to 100. - // queries.intervalMs - Specifies the time interval in milliseconds of time series. Is optional and defaults to 1000. - // required: true - // example: [ { "refId": "A", "intervalMs": 86400000, "maxDataPoints": 1092, "datasource":{ "uid":"PD8C576611E62080A" }, "rawSql": "SELECT 1 as valueOne, 2 as valueTwo", "format": "table" } ] - Queries []rawDataQuery `json:"queries"` - // required: false - Debug bool `json:"debug"` -} - -type rawDataQuery = map[string]interface{} - -func readQueries(in []byte) ([]backend.DataQuery, error) { - reqDTO := &rawMetricRequest{} - err := json.Unmarshal(in, &reqDTO) - if err != nil { - return nil, err - } - - if len(reqDTO.Queries) == 0 { - return nil, fmt.Errorf("expected queries") - } - - tr := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To) - backendTr := backend.TimeRange{ - From: tr.MustGetFrom(), - To: tr.MustGetTo(), - } - queries := make([]backend.DataQuery, 0) - - for _, query := range reqDTO.Queries { - dataQuery := backend.DataQuery{ - TimeRange: backendTr, - } - - v, ok := query["refId"] - if ok { - dataQuery.RefID, ok = v.(string) - if !ok { - return nil, fmt.Errorf("expeted string refId") - } - } - - v, ok = query["queryType"] - if ok { - dataQuery.QueryType, ok = v.(string) - if !ok { - return nil, fmt.Errorf("expeted string queryType") - } - } - - v, ok = query["maxDataPoints"] - if ok { - vInt, ok := v.(float64) - if !ok { - return nil, fmt.Errorf("expected float64 maxDataPoints") - } - - dataQuery.MaxDataPoints = int64(vInt) - } - - v, ok = query["intervalMs"] - if ok { - vInt, ok := v.(float64) - if !ok { - return nil, fmt.Errorf("expected float64 intervalMs") - } - - dataQuery.Interval = time.Duration(vInt) - } - - dataQuery.JSON, err = json.Marshal(query) - if err != nil { - return nil, err - } - - queries = append(queries, dataQuery) - } - - return queries, nil -} diff --git a/pkg/registry/apis/datasource/query_test.go b/pkg/registry/apis/datasource/query_test.go deleted file mode 100644 index 662bbd2e909..00000000000 --- a/pkg/registry/apis/datasource/query_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package datasource - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestParseQueriesIntoQueryDataRequest(t *testing.T) { - request := []byte(`{ - "queries": [ - { - "refId": "A", - "datasource": { - "type": "grafana-googlesheets-datasource", - "uid": "b1808c48-9fc9-4045-82d7-081781f8a553" - }, - "cacheDurationSeconds": 300, - "spreadsheet": "spreadsheetID", - "range": "", - "datasourceId": 4, - "intervalMs": 30000, - "maxDataPoints": 794 - } - ], - "from": "1692624667389", - "to": "1692646267389" - }`) - - parsedDataQuery, err := readQueries(request) - require.NoError(t, err) - require.Equal(t, len(parsedDataQuery), 1) -} diff --git a/pkg/registry/apis/datasource/register.go b/pkg/registry/apis/datasource/register.go index 86769661890..9fc43b7bc38 100644 --- a/pkg/registry/apis/datasource/register.go +++ b/pkg/registry/apis/datasource/register.go @@ -20,6 +20,7 @@ import ( common "github.com/grafana/grafana/pkg/apis/common/v0alpha1" "github.com/grafana/grafana/pkg/apis/datasource/v0alpha1" + query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/featuremgmt" @@ -112,7 +113,8 @@ func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) { &v0alpha1.DataSourceConnectionList{}, &v0alpha1.HealthCheckResult{}, &unstructured.Unstructured{}, - // Added for subresource stubs + // Query handler + &query.QueryDataResponse{}, &metav1.Status{}, ) } @@ -138,7 +140,7 @@ func (b *DataSourceAPIBuilder) InstallSchema(scheme *runtime.Scheme) error { } func resourceFromPluginID(pluginID string) (common.ResourceInfo, error) { - group, err := getDatasourceGroupNameFromPluginID(pluginID) + group, err := plugins.GetDatasourceGroupNameFromPluginID(pluginID) if err != nil { return common.ResourceInfo{}, err } diff --git a/pkg/registry/apis/datasource/sub_query.go b/pkg/registry/apis/datasource/sub_query.go index 57ee6a58d4f..14c4c410bfb 100644 --- a/pkg/registry/apis/datasource/sub_query.go +++ b/pkg/registry/apis/datasource/sub_query.go @@ -3,14 +3,18 @@ package datasource import ( "context" "encoding/json" - "io" + "fmt" "net/http" - "time" + "strconv" "github.com/grafana/grafana-plugin-sdk-go/backend" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/rest" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/middleware/requestmeta" + "github.com/grafana/grafana/pkg/tsdb/legacydata" + "github.com/grafana/grafana/pkg/web" ) type subQueryREST struct { @@ -20,11 +24,10 @@ type subQueryREST struct { var _ = rest.Connecter(&subQueryREST{}) func (r *subQueryREST) New() runtime.Object { - return &metav1.Status{} + return &v0alpha1.QueryDataResponse{} } -func (r *subQueryREST) Destroy() { -} +func (r *subQueryREST) Destroy() {} func (r *subQueryREST) ConnectMethods() []string { return []string{"POST", "GET"} @@ -34,40 +37,39 @@ func (r *subQueryREST) NewConnectOptions() (runtime.Object, bool, string) { return nil, false, "" } -func (r *subQueryREST) readQueries(req *http.Request) ([]backend.DataQuery, error) { +func (r *subQueryREST) readQueries(req *http.Request) ([]backend.DataQuery, *v0alpha1.DataSourceRef, error) { + reqDTO := v0alpha1.GenericQueryRequest{} // Simple URL to JSON mapping if req.Method == http.MethodGet { - body := make(map[string]any, 0) - for k, v := range req.URL.Query() { - switch len(v) { - case 0: - body[k] = true - case 1: - body[k] = v[0] // TODO, convert numbers + query := v0alpha1.GenericDataQuery{ + RefID: "A", + MaxDataPoints: 1000, + IntervalMS: 10, + } + params := req.URL.Query() + for k := range params { + v := params.Get(k) // the singular value + switch k { + case "to": + reqDTO.To = v + case "from": + reqDTO.From = v + case "maxDataPoints": + query.MaxDataPoints, _ = strconv.ParseInt(v, 10, 64) + case "intervalMs": + query.IntervalMS, _ = strconv.ParseFloat(v, 64) + case "queryType": + query.QueryType = v default: - body[k] = v // TODO, convert numbers + query.AdditionalProperties()[k] = v } } - - var err error - dq := backend.DataQuery{ - RefID: "A", - TimeRange: backend.TimeRange{ - From: time.Now().Add(-1 * time.Hour), // last hour - To: time.Now(), - }, - MaxDataPoints: 1000, - Interval: time.Second * 10, - } - dq.JSON, err = json.Marshal(body) - return []backend.DataQuery{dq}, err + reqDTO.Queries = []v0alpha1.GenericDataQuery{query} + } else if err := web.Bind(req, &reqDTO); err != nil { + return nil, nil, err } - body, err := io.ReadAll(req.Body) - if err != nil { - return nil, err - } - return readQueries(body) + return legacydata.ToDataSourceQueries(reqDTO) } func (r *subQueryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { @@ -77,27 +79,56 @@ func (r *subQueryREST) Connect(ctx context.Context, name string, opts runtime.Ob } return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - queries, err := r.readQueries(req) + queries, dsRef, err := r.readQueries(req) if err != nil { responder.Error(err) return } + if dsRef != nil && dsRef.UID != name { + responder.Error(fmt.Errorf("expected the datasource in the request url and body to match")) + return + } - queryResponse, err := r.builder.client.QueryData(ctx, &backend.QueryDataRequest{ + qdr, err := r.builder.client.QueryData(ctx, &backend.QueryDataRequest{ PluginContext: pluginCtx, Queries: queries, - // Headers: // from context }) - if err != nil { - return - } - - jsonRsp, err := json.Marshal(queryResponse) if err != nil { responder.Error(err) return } - w.WriteHeader(200) - _, _ = w.Write(jsonRsp) + + statusCode := http.StatusOK + for _, res := range qdr.Responses { + if res.Error != nil { + statusCode = http.StatusMultiStatus + } + } + if statusCode != http.StatusOK { + requestmeta.WithDownstreamStatusSource(ctx) + } + + // TODO... someday :) can return protobuf for machine-machine communication + // will avoid some hops the current response workflow (for external plugins) + // 1. Plugin: + // creates: golang structs + // returns: arrow + protobuf | + // 2. Client: | direct when local/non grpc + // reads: protobuf+arrow V + // returns: golang structs + // 3. Datasource Server (eg right here): + // reads: golang structs + // returns: JSON + // 4. Query service (alerting etc): + // reads: JSON? (TODO! raw output from 1???) + // returns: JSON (after more operations) + // 5. Browser + // reads: JSON + w.WriteHeader(statusCode) + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(qdr) + if err != nil { + responder.Error(err) + } }), nil } diff --git a/pkg/registry/apis/query/parser.go b/pkg/registry/apis/query/parser.go new file mode 100644 index 00000000000..bc60a5b451c --- /dev/null +++ b/pkg/registry/apis/query/parser.go @@ -0,0 +1,83 @@ +package query + +import ( + "fmt" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/expr" +) + +type parsedQueryRequest struct { + // The queries broken into requests + Requests []groupedQueries + + // Optionally show the additional query properties + Expressions []v0alpha1.GenericDataQuery +} + +type groupedQueries struct { + // the plugin type + pluginId string + + // The datasource name/uid + uid string + + // The raw backend query objects + query []v0alpha1.GenericDataQuery +} + +// Internally define what makes this request unique (eventually may include the apiVersion) +func (d *groupedQueries) key() string { + return fmt.Sprintf("%s/%s", d.pluginId, d.uid) +} + +func parseQueryRequest(raw v0alpha1.GenericQueryRequest) (parsedQueryRequest, error) { + mixed := make(map[string]*groupedQueries) + parsed := parsedQueryRequest{} + refIds := make(map[string]bool) + + for _, original := range raw.Queries { + if refIds[original.RefID] { + return parsed, fmt.Errorf("invalid query, duplicate refId: " + original.RefID) + } + + refIds[original.RefID] = true + q := original + + if q.TimeRange == nil && raw.From != "" { + q.TimeRange = &v0alpha1.TimeRange{ + From: raw.From, + To: raw.To, + } + } + + // Extract out the expressions queries earlier + if expr.IsDataSource(q.Datasource.Type) || expr.IsDataSource(q.Datasource.UID) { + parsed.Expressions = append(parsed.Expressions, q) + continue + } + + g := &groupedQueries{pluginId: q.Datasource.Type, uid: q.Datasource.UID} + group, ok := mixed[g.key()] + if !ok || group == nil { + group = g + mixed[g.key()] = g + } + group.query = append(group.query, q) + } + + for _, q := range parsed.Expressions { + // TODO: parse and build tree, for now just fail fast on unknown commands + _, err := expr.GetExpressionCommandType(q.AdditionalProperties()) + if err != nil { + return parsed, err + } + } + + // Add each request + for _, v := range mixed { + parsed.Requests = append(parsed.Requests, *v) + } + + return parsed, nil +} diff --git a/pkg/registry/apis/query/plugins.go b/pkg/registry/apis/query/plugins.go new file mode 100644 index 00000000000..786ba1a5c60 --- /dev/null +++ b/pkg/registry/apis/query/plugins.go @@ -0,0 +1,62 @@ +package query + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + + common "github.com/grafana/grafana/pkg/apis/common/v0alpha1" + example "github.com/grafana/grafana/pkg/apis/example/v0alpha1" + query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" +) + +var ( + _ rest.Storage = (*pluginsStorage)(nil) + _ rest.Scoper = (*pluginsStorage)(nil) + _ rest.SingularNameProvider = (*pluginsStorage)(nil) + _ rest.Lister = (*pluginsStorage)(nil) +) + +type pluginsStorage struct { + resourceInfo *common.ResourceInfo + tableConverter rest.TableConvertor + registry query.DataSourceApiServerRegistry +} + +func newPluginsStorage(reg query.DataSourceApiServerRegistry) *pluginsStorage { + var resourceInfo = query.DataSourceApiServerResourceInfo + return &pluginsStorage{ + resourceInfo: &resourceInfo, + tableConverter: rest.NewDefaultTableConvertor(resourceInfo.GroupResource()), + registry: reg, + } +} + +func (s *pluginsStorage) New() runtime.Object { + return s.resourceInfo.NewFunc() +} + +func (s *pluginsStorage) Destroy() {} + +func (s *pluginsStorage) NamespaceScoped() bool { + return false +} + +func (s *pluginsStorage) GetSingularName() string { + return example.DummyResourceInfo.GetSingularName() +} + +func (s *pluginsStorage) NewList() runtime.Object { + return s.resourceInfo.NewListFunc() +} + +func (s *pluginsStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { + return s.tableConverter.ConvertToTable(ctx, object, tableOptions) +} + +func (s *pluginsStorage) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) { + return s.registry.GetDatasourceApiServers(ctx) +} diff --git a/pkg/registry/apis/query/query.go b/pkg/registry/apis/query/query.go new file mode 100644 index 00000000000..e5278141379 --- /dev/null +++ b/pkg/registry/apis/query/query.go @@ -0,0 +1,157 @@ +package query + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "golang.org/x/sync/errgroup" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/middleware/requestmeta" + "github.com/grafana/grafana/pkg/util/errutil/errhttp" + "github.com/grafana/grafana/pkg/web" +) + +func (b *QueryAPIBuilder) handleQuery(w http.ResponseWriter, r *http.Request) { + reqDTO := v0alpha1.GenericQueryRequest{} + if err := web.Bind(r, &reqDTO); err != nil { + errhttp.Write(r.Context(), err, w) + return + } + + parsed, err := parseQueryRequest(reqDTO) + if err != nil { + errhttp.Write(r.Context(), err, w) + return + } + + ctx := r.Context() + qdr, err := b.processRequest(ctx, parsed) + if err != nil { + errhttp.Write(r.Context(), err, w) + return + } + + statusCode := http.StatusOK + for _, res := range qdr.Responses { + if res.Error != nil { + statusCode = http.StatusBadRequest + if b.returnMultiStatus { + statusCode = http.StatusMultiStatus + } + } + } + if statusCode != http.StatusOK { + requestmeta.WithDownstreamStatusSource(ctx) + } + + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(qdr) + if err != nil { + errhttp.Write(r.Context(), err, w) + } +} + +// See: +// https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L88 +func (b *QueryAPIBuilder) processRequest(ctx context.Context, req parsedQueryRequest) (qdr *backend.QueryDataResponse, err error) { + switch len(req.Requests) { + case 0: + break // nothing to do + case 1: + qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0]) + default: + qdr, err = b.executeConcurrentQueries(ctx, req.Requests) + } + + if len(req.Expressions) > 0 { + return b.handleExpressions(ctx, qdr, req.Expressions) + } + return qdr, err +} + +// Process a single request +// See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242 +func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req groupedQueries) (*backend.QueryDataResponse, error) { + gv, err := b.registry.GetDatasourceGroupVersion(req.pluginId) + if err != nil { + return nil, err + } + return b.runner.ExecuteQueryData(ctx, gv, req.uid, req.query) +} + +// buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource. +func buildErrorResponse(err error, req groupedQueries) *backend.QueryDataResponse { + rsp := backend.NewQueryDataResponse() + for _, query := range req.query { + rsp.Responses[query.RefID] = backend.DataResponse{ + Error: err, + } + } + return rsp +} + +// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result. +func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []groupedQueries) (*backend.QueryDataResponse, error) { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(b.concurrentQueryLimit) // prevent too many concurrent requests + rchan := make(chan *backend.QueryDataResponse, len(requests)) + + // Create panic recovery function for loop below + recoveryFn := func(req groupedQueries) { + if r := recover(); r != nil { + var err error + b.log.Error("query datasource panic", "error", r, "stack", log.Stack(1)) + if theErr, ok := r.(error); ok { + err = theErr + } else if theErrString, ok := r.(string); ok { + err = fmt.Errorf(theErrString) + } else { + err = fmt.Errorf("unexpected error - %s", b.userFacingDefaultError) + } + // Due to the panic, there is no valid response for any query for this datasource. Append an error for each one. + rchan <- buildErrorResponse(err, req) + } + } + + // Query each datasource concurrently + for idx := range requests { + req := requests[idx] + g.Go(func() error { + defer recoveryFn(req) + + dqr, err := b.handleQuerySingleDatasource(ctx, req) + if err == nil { + rchan <- dqr + } else { + rchan <- buildErrorResponse(err, req) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + close(rchan) + + // Merge the results from each response + resp := backend.NewQueryDataResponse() + for result := range rchan { + for refId, dataResponse := range result.Responses { + resp.Responses[refId] = dataResponse + } + } + + return resp, nil +} + +// NOTE the upstream queries have already been executed +// https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242 +func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, qdr *backend.QueryDataResponse, expressions []v0alpha1.GenericDataQuery) (*backend.QueryDataResponse, error) { + return qdr, fmt.Errorf("expressions are not implemented yet") +} diff --git a/pkg/registry/apis/query/register.go b/pkg/registry/apis/query/register.go new file mode 100644 index 00000000000..9b450c4904d --- /dev/null +++ b/pkg/registry/apis/query/register.go @@ -0,0 +1,260 @@ +package query + +import ( + "encoding/json" + "net/http" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" + genericapiserver "k8s.io/apiserver/pkg/server" + common "k8s.io/kube-openapi/pkg/common" + "k8s.io/kube-openapi/pkg/spec3" + "k8s.io/kube-openapi/pkg/validation/spec" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/registry/apis/query/runner" + "github.com/grafana/grafana/pkg/services/accesscontrol" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" + grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver" + "github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext" + "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" +) + +var _ grafanaapiserver.APIGroupBuilder = (*QueryAPIBuilder)(nil) + +type QueryAPIBuilder struct { + log log.Logger + concurrentQueryLimit int + userFacingDefaultError string + returnMultiStatus bool // from feature toggle + + runner v0alpha1.QueryRunner + registry v0alpha1.DataSourceApiServerRegistry +} + +func NewQueryAPIBuilder(features featuremgmt.FeatureToggles, + runner v0alpha1.QueryRunner, + registry v0alpha1.DataSourceApiServerRegistry, +) *QueryAPIBuilder { + return &QueryAPIBuilder{ + concurrentQueryLimit: 4, // from config? + log: log.New("query_apiserver"), + returnMultiStatus: features.IsEnabledGlobally(featuremgmt.FlagDatasourceQueryMultiStatus), + runner: runner, + registry: registry, + } +} + +func RegisterAPIService(features featuremgmt.FeatureToggles, + apiregistration grafanaapiserver.APIRegistrar, + dataSourcesService datasources.DataSourceService, + pluginStore pluginstore.Store, + accessControl accesscontrol.AccessControl, + pluginClient plugins.Client, + pCtxProvider *plugincontext.Provider, +) *QueryAPIBuilder { + if !features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) { + return nil // skip registration unless opting into experimental apis + } + + builder := NewQueryAPIBuilder( + features, + runner.NewDirectQueryRunner(pluginClient, pCtxProvider), + runner.NewDirectRegistry(pluginStore, dataSourcesService), + ) + + // ONLY testdata... + if false { + builder = NewQueryAPIBuilder( + features, + runner.NewDummyTestRunner(), + runner.NewDummyRegistry(), + ) + } + + apiregistration.RegisterAPI(builder) + return builder +} + +func (b *QueryAPIBuilder) GetGroupVersion() schema.GroupVersion { + return v0alpha1.SchemeGroupVersion +} + +func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) { + scheme.AddKnownTypes(gv, + &v0alpha1.DataSourceApiServer{}, + &v0alpha1.DataSourceApiServerList{}, + &v0alpha1.QueryDataResponse{}, + ) +} + +func (b *QueryAPIBuilder) InstallSchema(scheme *runtime.Scheme) error { + addKnownTypes(scheme, v0alpha1.SchemeGroupVersion) + metav1.AddToGroupVersion(scheme, v0alpha1.SchemeGroupVersion) + return scheme.SetVersionPriority(v0alpha1.SchemeGroupVersion) +} + +func (b *QueryAPIBuilder) GetAPIGroupInfo( + scheme *runtime.Scheme, + codecs serializer.CodecFactory, // pointer? + optsGetter generic.RESTOptionsGetter, +) (*genericapiserver.APIGroupInfo, error) { + gv := v0alpha1.SchemeGroupVersion + apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(gv.Group, scheme, metav1.ParameterCodec, codecs) + + plugins := newPluginsStorage(b.registry) + + storage := map[string]rest.Storage{} + storage[plugins.resourceInfo.StoragePath()] = plugins + + apiGroupInfo.VersionedResourcesStorageMap[gv.Version] = storage + return &apiGroupInfo, nil +} + +func (b *QueryAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions { + return v0alpha1.GetOpenAPIDefinitions +} + +// Register additional routes with the server +func (b *QueryAPIBuilder) GetAPIRoutes() *grafanaapiserver.APIRoutes { + defs := v0alpha1.GetOpenAPIDefinitions(func(path string) spec.Ref { return spec.Ref{} }) + querySchema := defs["github.com/grafana/grafana/pkg/apis/query/v0alpha1.QueryRequest"].Schema + responseSchema := defs["github.com/grafana/grafana/pkg/apis/query/v0alpha1.QueryDataResponse"].Schema + + var randomWalkQuery any + var randomWalkTable any + _ = json.Unmarshal([]byte(`{ + "queries": [ + { + "refId": "A", + "scenarioId": "random_walk", + "seriesCount": 1, + "datasource": { + "type": "grafana-testdata-datasource", + "uid": "PD8C576611E62080A" + }, + "intervalMs": 60000, + "maxDataPoints": 20 + } + ], + "from": "1704893381544", + "to": "1704914981544" + }`), &randomWalkQuery) + + _ = json.Unmarshal([]byte(`{ + "queries": [ + { + "refId": "A", + "scenarioId": "random_walk_table", + "seriesCount": 1, + "datasource": { + "type": "grafana-testdata-datasource", + "uid": "PD8C576611E62080A" + }, + "intervalMs": 60000, + "maxDataPoints": 20 + } + ], + "from": "1704893381544", + "to": "1704914981544" + }`), &randomWalkTable) + + return &grafanaapiserver.APIRoutes{ + Root: []grafanaapiserver.APIRouteHandler{}, + Namespace: []grafanaapiserver.APIRouteHandler{ + { + Path: "query", + Spec: &spec3.PathProps{ + Post: &spec3.Operation{ + OperationProps: spec3.OperationProps{ + Tags: []string{"query"}, + Description: "query across multiple datasources with expressions. This api matches the legacy /ds/query endpoint", + Parameters: []*spec3.Parameter{ + { + ParameterProps: spec3.ParameterProps{ + Name: "namespace", + Description: "object name and auth scope, such as for teams and projects", + In: "path", + Required: true, + Schema: spec.StringProperty(), + Example: "default", + }, + }, + }, + RequestBody: &spec3.RequestBody{ + RequestBodyProps: spec3.RequestBodyProps{ + Required: true, + Description: "the query array", + Content: map[string]*spec3.MediaType{ + "application/json": { + MediaTypeProps: spec3.MediaTypeProps{ + Schema: querySchema.WithExample(randomWalkQuery), + Examples: map[string]*spec3.Example{ + "random_walk": { + ExampleProps: spec3.ExampleProps{ + Summary: "random walk", + Value: randomWalkQuery, + }, + }, + "random_walk_table": { + ExampleProps: spec3.ExampleProps{ + Summary: "random walk (table)", + Value: randomWalkTable, + }, + }, + }, + }, + }, + }, + }, + }, + Responses: &spec3.Responses{ + ResponsesProps: spec3.ResponsesProps{ + StatusCodeResponses: map[int]*spec3.Response{ + http.StatusOK: { + ResponseProps: spec3.ResponseProps{ + Description: "Query results", + Content: map[string]*spec3.MediaType{ + "application/json": { + MediaTypeProps: spec3.MediaTypeProps{ + Schema: &responseSchema, + }, + }, + }, + }, + }, + http.StatusMultiStatus: { + ResponseProps: spec3.ResponseProps{ + Description: "Errors exist in the downstream results", + Content: map[string]*spec3.MediaType{ + "application/json": { + MediaTypeProps: spec3.MediaTypeProps{ + Schema: &responseSchema, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Handler: b.handleQuery, + }, + }, + } +} + +func (b *QueryAPIBuilder) GetAuthorizer() authorizer.Authorizer { + return nil // default is OK +} diff --git a/pkg/registry/apis/query/runner/direct.go b/pkg/registry/apis/query/runner/direct.go new file mode 100644 index 00000000000..e36a7e6188e --- /dev/null +++ b/pkg/registry/apis/query/runner/direct.go @@ -0,0 +1,179 @@ +package runner + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext" + "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tsdb/legacydata" +) + +type directRunner struct { + pluginClient plugins.Client + pCtxProvider *plugincontext.Provider +} + +type directRegistry struct { + pluginsMu sync.Mutex + plugins *v0alpha1.DataSourceApiServerList + apis map[string]schema.GroupVersion + groupToPlugin map[string]string + pluginStore pluginstore.Store + + // called on demand + dataSourcesService datasources.DataSourceService +} + +var _ v0alpha1.QueryRunner = (*directRunner)(nil) +var _ v0alpha1.DataSourceApiServerRegistry = (*directRegistry)(nil) + +// NewDummyTestRunner creates a runner that only works with testdata +func NewDirectQueryRunner( + pluginClient plugins.Client, + pCtxProvider *plugincontext.Provider) v0alpha1.QueryRunner { + return &directRunner{ + pluginClient: pluginClient, + pCtxProvider: pCtxProvider, + } +} + +func NewDirectRegistry(pluginStore pluginstore.Store, + dataSourcesService datasources.DataSourceService, +) v0alpha1.DataSourceApiServerRegistry { + return &directRegistry{ + pluginStore: pluginStore, + dataSourcesService: dataSourcesService, + } +} + +// ExecuteQueryData implements QueryHelper. +func (d *directRunner) ExecuteQueryData(ctx context.Context, + // The k8s group for the datasource (pluginId) + datasource schema.GroupVersion, + + // The datasource name/uid + name string, + + // The raw backend query objects + query []v0alpha1.GenericDataQuery, +) (*backend.QueryDataResponse, error) { + queries, dsRef, err := legacydata.ToDataSourceQueries(v0alpha1.GenericQueryRequest{ + Queries: query, + }) + if err != nil { + return nil, err + } + if dsRef != nil && dsRef.UID != name { + return nil, fmt.Errorf("expected query body datasource and request to match") + } + + // NOTE: this depends on uid unique across datasources + settings, err := d.pCtxProvider.GetDataSourceInstanceSettings(ctx, name) + if err != nil { + return nil, err + } + + pCtx, err := d.pCtxProvider.PluginContextForDataSource(ctx, settings) + if err != nil { + return nil, err + } + + return d.pluginClient.QueryData(ctx, &backend.QueryDataRequest{ + PluginContext: pCtx, + Queries: queries, + }) +} + +// GetDatasourceAPI implements DataSourceRegistry. +func (d *directRegistry) GetDatasourceGroupVersion(pluginId string) (schema.GroupVersion, error) { + d.pluginsMu.Lock() + defer d.pluginsMu.Unlock() + + if d.plugins == nil { + err := d.updatePlugins() + if err != nil { + return schema.GroupVersion{}, err + } + } + + var err error + gv, ok := d.apis[pluginId] + if !ok { + err = fmt.Errorf("no API found for id: " + pluginId) + } + return gv, err +} + +// GetDatasourcePlugins no namespace? everything that is available +func (d *directRegistry) GetDatasourceApiServers(ctx context.Context) (*v0alpha1.DataSourceApiServerList, error) { + d.pluginsMu.Lock() + defer d.pluginsMu.Unlock() + + if d.plugins == nil { + err := d.updatePlugins() + if err != nil { + return nil, err + } + } + + return d.plugins, nil +} + +// This should be called when plugins change +func (d *directRegistry) updatePlugins() error { + groupToPlugin := map[string]string{} + apis := map[string]schema.GroupVersion{} + result := &v0alpha1.DataSourceApiServerList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", time.Now().UnixMilli()), + }, + } + + // TODO? only backend plugins + for _, dsp := range d.pluginStore.Plugins(context.Background(), plugins.TypeDataSource) { + ts := setting.BuildStamp * 1000 + if dsp.Info.Build.Time > 0 { + ts = dsp.Info.Build.Time + } + + group, err := plugins.GetDatasourceGroupNameFromPluginID(dsp.ID) + if err != nil { + return err + } + gv := schema.GroupVersion{Group: group, Version: "v0alpha1"} // default version + apis[dsp.ID] = gv + for _, alias := range dsp.AliasIDs { + apis[alias] = gv + } + groupToPlugin[group] = dsp.ID + + ds := v0alpha1.DataSourceApiServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: dsp.ID, + CreationTimestamp: metav1.NewTime(time.UnixMilli(ts)), + }, + Title: dsp.Name, + AliasIDs: dsp.AliasIDs, + GroupVersion: gv.String(), + Description: dsp.Info.Description, + } + result.Items = append(result.Items, ds) + } + + d.plugins = result + d.apis = apis + d.groupToPlugin = groupToPlugin + return nil +} diff --git a/pkg/registry/apis/query/runner/dummy.go b/pkg/registry/apis/query/runner/dummy.go new file mode 100644 index 00000000000..4937cb81241 --- /dev/null +++ b/pkg/registry/apis/query/runner/dummy.go @@ -0,0 +1,88 @@ +package runner + +import ( + "context" + "fmt" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + testdata "github.com/grafana/grafana/pkg/tsdb/grafana-testdata-datasource" + "github.com/grafana/grafana/pkg/tsdb/legacydata" +) + +type testdataDummy struct{} + +var _ v0alpha1.QueryRunner = (*testdataDummy)(nil) +var _ v0alpha1.DataSourceApiServerRegistry = (*testdataDummy)(nil) + +// NewDummyTestRunner creates a runner that only works with testdata +func NewDummyTestRunner() v0alpha1.QueryRunner { + return &testdataDummy{} +} + +func NewDummyRegistry() v0alpha1.DataSourceApiServerRegistry { + return &testdataDummy{} +} + +// ExecuteQueryData implements QueryHelper. +func (d *testdataDummy) ExecuteQueryData(ctx context.Context, + // The k8s group for the datasource (pluginId) + datasource schema.GroupVersion, + + // The datasource name/uid + name string, + + // The raw backend query objects + query []v0alpha1.GenericDataQuery, +) (*backend.QueryDataResponse, error) { + if datasource.Group != "testdata.datasource.grafana.app" { + return nil, fmt.Errorf("expecting testdata requests") + } + + queries, _, err := legacydata.ToDataSourceQueries(v0alpha1.GenericQueryRequest{ + Queries: query, + }) + if err != nil { + return nil, err + } + + return testdata.ProvideService().QueryData(ctx, &backend.QueryDataRequest{ + Queries: queries, + }) +} + +// GetDatasourceAPI implements DataSourceRegistry. +func (*testdataDummy) GetDatasourceGroupVersion(pluginId string) (schema.GroupVersion, error) { + if pluginId == "testdata" || pluginId == "grafana-testdata-datasource" { + return schema.GroupVersion{ + Group: "testdata.datasource.grafana.app", + Version: "v0alpha1", + }, nil + } + return schema.GroupVersion{}, fmt.Errorf("unsupported plugin (only testdata for now)") +} + +// GetDatasourcePlugins implements QueryHelper. +func (d *testdataDummy) GetDatasourceApiServers(ctx context.Context) (*v0alpha1.DataSourceApiServerList, error) { + return &v0alpha1.DataSourceApiServerList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", time.Now().UnixMilli()), + }, + Items: []v0alpha1.DataSourceApiServer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "grafana-testdata-datasource", + CreationTimestamp: metav1.Now(), + }, + Title: "Test Data", + GroupVersion: "testdata.datasource.grafana.app/v0alpha1", + AliasIDs: []string{"testdata"}, + }, + }, + }, nil +} diff --git a/pkg/registry/apis/wireset.go b/pkg/registry/apis/wireset.go index 3497faba067..c5da75bf909 100644 --- a/pkg/registry/apis/wireset.go +++ b/pkg/registry/apis/wireset.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/featuretoggle" "github.com/grafana/grafana/pkg/registry/apis/folders" "github.com/grafana/grafana/pkg/registry/apis/playlist" + "github.com/grafana/grafana/pkg/registry/apis/query" "github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext" ) @@ -27,4 +28,5 @@ var WireSet = wire.NewSet( featuretoggle.RegisterAPIService, datasource.RegisterAPIService, folders.RegisterAPIService, + query.RegisterAPIService, ) diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index bfab3b7a3eb..f790618239a 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -949,6 +949,15 @@ var ( RequiresRestart: true, // changes the API routing Created: time.Date(2023, time.December, 4, 12, 0, 0, 0, time.UTC), }, + { + Name: "kubernetesQueryServiceRewrite", + Description: "Rewrite requests targeting /ds/query to the query service", + Stage: FeatureStageExperimental, + Owner: grafanaAppPlatformSquad, + RequiresRestart: true, // changes the API routing + RequiresDevMode: true, + Created: time.Date(2024, time.January, 28, 12, 0, 0, 0, time.UTC), + }, { Name: "cloudWatchBatchQueries", Description: "Runs CloudWatch metrics queries as separate batches", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 197563171e8..ac27e659c64 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -111,6 +111,7 @@ formatString,preview,@grafana/grafana-bi-squad,2023-10-13,false,false,true transformationsVariableSupport,preview,@grafana/grafana-bi-squad,2023-10-04,false,false,true kubernetesPlaylists,experimental,@grafana/grafana-app-platform-squad,2023-11-08,false,true,false kubernetesSnapshots,experimental,@grafana/grafana-app-platform-squad,2023-12-04,false,true,false +kubernetesQueryServiceRewrite,experimental,@grafana/grafana-app-platform-squad,2024-01-28,true,true,false cloudWatchBatchQueries,preview,@grafana/aws-datasources,2023-10-20,false,false,false recoveryThreshold,GA,@grafana/alerting-squad,2023-10-10,false,true,false lokiStructuredMetadata,experimental,@grafana/observability-logs,2023-11-16,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 2c0e677c0c3..42b161e6701 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -455,6 +455,10 @@ const ( // Use the kubernetes API in the frontend to support playlists FlagKubernetesSnapshots = "kubernetesSnapshots" + // FlagKubernetesQueryServiceRewrite + // Rewrite requests targeting /ds/query to the query service + FlagKubernetesQueryServiceRewrite = "kubernetesQueryServiceRewrite" + // FlagCloudWatchBatchQueries // Runs CloudWatch metrics queries as separate batches FlagCloudWatchBatchQueries = "cloudWatchBatchQueries" diff --git a/pkg/services/grafana-apiserver/service.go b/pkg/services/grafana-apiserver/service.go index 30519427505..84447d9db2f 100644 --- a/pkg/services/grafana-apiserver/service.go +++ b/pkg/services/grafana-apiserver/service.go @@ -103,6 +103,9 @@ type DirectRestConfigProvider interface { // logged logged in user as the current request context. This is useful when // creating clients that map legacy API handlers to k8s backed services GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Config + + // This can be used to rewrite incoming requests to path now supported under /apis + DirectlyServeHTTP(w http.ResponseWriter, r *http.Request) } type service struct { @@ -377,6 +380,10 @@ func (s *service) GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Co } } +func (s *service) DirectlyServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + func (s *service) running(ctx context.Context) error { // skip waiting for the server in prod mode if !s.config.devMode { diff --git a/pkg/tests/apis/datasource/testdata_test.go b/pkg/tests/apis/datasource/testdata_test.go index c21b1bdf309..b0449903fe3 100644 --- a/pkg/tests/apis/datasource/testdata_test.go +++ b/pkg/tests/apis/datasource/testdata_test.go @@ -107,7 +107,7 @@ func TestTestDatasource(t *testing.T) { }) t.Run("Call subresources", func(t *testing.T) { - client := helper.Org1.Admin.Client.Resource(schema.GroupVersionResource{ + client := helper.Org1.Admin.ResourceClient(t, schema.GroupVersionResource{ Group: "testdata.datasource.grafana.app", Version: "v0alpha1", Resource: "connections", diff --git a/pkg/tests/apis/example/example_test.go b/pkg/tests/apis/example/example_test.go index 6f3f6f423f6..b84db254f2f 100644 --- a/pkg/tests/apis/example/example_test.go +++ b/pkg/tests/apis/example/example_test.go @@ -29,7 +29,7 @@ func TestExampleApp(t *testing.T) { t.Run("Check runtime info resource", func(t *testing.T) { // Resource is not namespaced! - client := helper.Org1.Admin.Client.Resource(schema.GroupVersionResource{ + client := helper.Org1.Admin.ResourceClient(t, schema.GroupVersionResource{ Group: "example.grafana.app", Version: "v0alpha1", Resource: "runtime", @@ -139,7 +139,7 @@ func TestExampleApp(t *testing.T) { }) t.Run("Check dummy with subresource", func(t *testing.T) { - client := helper.Org1.Viewer.Client.Resource(schema.GroupVersionResource{ + client := helper.Org1.Viewer.ResourceClient(t, schema.GroupVersionResource{ Group: "example.grafana.app", Version: "v0alpha1", Resource: "dummy", diff --git a/pkg/tests/apis/helper.go b/pkg/tests/apis/helper.go index ee7e2c24aff..3d1e233174d 100644 --- a/pkg/tests/apis/helper.go +++ b/pkg/tests/apis/helper.go @@ -98,10 +98,13 @@ func (c *K8sTestHelper) GetResourceClient(args ResourceClientArgs) *K8sResourceC args.Namespace = c.namespacer(args.User.Identity.GetOrgID()) } + client, err := dynamic.NewForConfig(args.User.NewRestConfig()) + require.NoError(c.t, err) + return &K8sResourceClient{ t: c.t, Args: args, - Resource: args.User.Client.Resource(args.GVR).Namespace(args.Namespace), + Resource: client.Resource(args.GVR).Namespace(args.Namespace), } } @@ -163,8 +166,31 @@ type OrgUsers struct { type User struct { Identity identity.Requester - Client *dynamic.DynamicClient password string + baseURL string +} + +func (c *User) NewRestConfig() *rest.Config { + return &rest.Config{ + Host: c.baseURL, + Username: c.Identity.GetLogin(), + Password: c.password, + } +} + +func (c *User) ResourceClient(t *testing.T, gvr schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + client, err := dynamic.NewForConfig(c.NewRestConfig()) + require.NoError(t, err) + return client.Resource(gvr) +} + +func (c *User) RESTClient(t *testing.T, gv *schema.GroupVersion) *rest.RESTClient { + cfg := dynamic.ConfigFor(c.NewRestConfig()) // adds negotiated serializers! + cfg.GroupVersion = gv + cfg.APIPath = "apis" // the plural + client, err := rest.RESTClientFor(cfg) + require.NoError(t, err) + return client } type RequestParams struct { @@ -380,19 +406,10 @@ func (c K8sTestHelper) createTestUsers(orgName string) OrgUsers { require.Equal(c.t, orgId, s.OrgID) require.Equal(c.t, role, s.OrgRole) // make sure the role was set properly - config := &rest.Config{ - Host: baseUrl, - Username: s.Login, - Password: key, - } - - client, err := dynamic.NewForConfig(config) - require.NoError(c.t, err) - return User{ Identity: s, - Client: client, password: key, + baseURL: baseUrl, } } return OrgUsers{ diff --git a/pkg/tests/apis/query/query_test.go b/pkg/tests/apis/query/query_test.go new file mode 100644 index 00000000000..8a70de87986 --- /dev/null +++ b/pkg/tests/apis/query/query_test.go @@ -0,0 +1,98 @@ +package dashboards + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/tests/apis" + "github.com/grafana/grafana/pkg/tests/testinfra" +) + +func TestSimpleQuery(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ + AppModeProduction: false, // dev mode required for datasource connections + EnableFeatureToggles: []string{ + featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service + }, + }) + + // Create a single datasource + ds := helper.CreateDS(&datasources.AddDataSourceCommand{ + Name: "test", + Type: datasources.DS_TESTDATA, + UID: "test", + OrgID: int64(1), + }) + require.Equal(t, "test", ds.UID) + + t.Run("Call query", func(t *testing.T) { + client := helper.Org1.Admin.RESTClient(t, &schema.GroupVersion{ + Group: "query.grafana.app", + Version: "v0alpha1", + }) + + q := query.GenericDataQuery{ + Datasource: &query.DataSourceRef{ + Type: "grafana-testdata-datasource", + UID: ds.UID, + }, + } + q.AdditionalProperties()["csvContent"] = "a,b,c\n1,hello,true" + q.AdditionalProperties()["scenarioId"] = "csv_content" + body, err := json.Marshal(&query.GenericQueryRequest{Queries: []query.GenericDataQuery{q}}) + require.NoError(t, err) + + result := client.Post(). + Namespace("default"). + Suffix("query"). + SetHeader("Content-type", "application/json"). + Body(body). + Do(context.Background()) + + require.NoError(t, result.Error()) + + body, err = result.Raw() + require.NoError(t, err) + fmt.Printf("OUT: %s", string(body)) + + rsp := &backend.QueryDataResponse{} + err = json.Unmarshal(body, rsp) + require.NoError(t, err) + require.Equal(t, 1, len(rsp.Responses)) + + frame := rsp.Responses["A"].Frames[0] + disp, err := frame.StringTable(100, 10) + require.NoError(t, err) + fmt.Printf("%s\n", disp) + + type expect struct { + idx int + name string + val any + } + for _, check := range []expect{ + {0, "a", int64(1)}, + {1, "b", "hello"}, + {2, "c", true}, + } { + field := frame.Fields[check.idx] + require.Equal(t, check.name, field.Name) + + v, _ := field.ConcreteAt(0) + require.Equal(t, check.val, v) + } + }) +} diff --git a/pkg/tsdb/legacydata/conversions.go b/pkg/tsdb/legacydata/conversions.go new file mode 100644 index 00000000000..37eeaffd53f --- /dev/null +++ b/pkg/tsdb/legacydata/conversions.go @@ -0,0 +1,85 @@ +package legacydata + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/apis/query/v0alpha1" +) + +// ToDataSourceQueries returns queries that should be sent to a single datasource +// This will throw an error if the queries reference multiple instances +func ToDataSourceQueries(req v0alpha1.GenericQueryRequest) ([]backend.DataQuery, *v0alpha1.DataSourceRef, error) { + var dsRef *v0alpha1.DataSourceRef + var tr *backend.TimeRange + if req.From != "" { + val := NewDataTimeRange(req.From, req.To) + tr = &backend.TimeRange{ + From: val.GetFromAsTimeUTC(), + To: val.GetToAsTimeUTC(), + } + } + + queries := []backend.DataQuery{} + if len(req.Queries) > 0 { + dsRef := req.Queries[0].Datasource + for _, generic := range req.Queries { + if generic.Datasource != nil && dsRef != nil { + if dsRef.Type != generic.Datasource.Type { + return queries, dsRef, fmt.Errorf("expect same datasource types") + } + if dsRef.UID != generic.Datasource.UID { + return queries, dsRef, fmt.Errorf("expect same datasource UID") + } + } + q, err := toBackendDataQuery(generic, tr) + if err != nil { + return queries, dsRef, err + } + queries = append(queries, q) + } + return queries, dsRef, nil + } + return queries, dsRef, nil +} + +// Converts a generic query to a backend one +func toBackendDataQuery(q v0alpha1.GenericDataQuery, defaultTimeRange *backend.TimeRange) (backend.DataQuery, error) { + var err error + bq := backend.DataQuery{ + RefID: q.RefID, + QueryType: q.QueryType, + MaxDataPoints: q.MaxDataPoints, + } + + // Set an explicit time range for the query + if q.TimeRange != nil { + tr := NewDataTimeRange(q.TimeRange.From, q.TimeRange.To) + bq.TimeRange = backend.TimeRange{ + From: tr.GetFromAsTimeUTC(), + To: tr.GetToAsTimeUTC(), + } + } else if defaultTimeRange != nil { + bq.TimeRange = *defaultTimeRange + } + + bq.JSON, err = json.Marshal(q) + if err != nil { + return bq, err + } + if bq.RefID == "" { + bq.RefID = "A" + } + if bq.MaxDataPoints == 0 { + bq.MaxDataPoints = 100 + } + if q.IntervalMS > 0 { + bq.Interval = time.Millisecond * time.Duration(q.IntervalMS) + } else { + bq.Interval = time.Second + } + return bq, nil +}