Files
grafana/pkg/expr/dataplane_test.go
Sarah Zinger 3fad863fd1 Query Service: Combine SSE handling in single tenant and multi tenant paths (#108041)
* parse via sse

I need to figure out how to handle the pipeline.execute with our own
client. I think this is important for MT reasons, just like using our
own cache (via legacy) is important.

parsing is done though!

* WIP nonsense

* horrible code but i think it works

* Add support for sql expressions config settings

* Cleanup:
- remove spew from nodes.go
- uncomment out plugin context and use in single tenant flow
- make code more readable and add comments

* Cleanup:
- create separate file for mt ds client builder
- ensure error handling is the same for both expressions and regular queries
- other cleanup

* not working but good thoughts

* WIP, vector not working for non sse

* super hacky but i think vectors work now

* delete delete delete

* Comments for future ref

* break out query handling and start test

* add prom debugger

* clean up: remove comments and commented out bits

* fix query_test

* add prom debugger

* create table-driven tests with testsdata files

* Fix test

* Add test

* go mod??

* idk

* Remove comment

* go enterprise issue maybe

* Fix codeowners

* Delete

* Remove test data

* Clean up

* logger

* Remove go changes hopefully

* idk go man

* sad

* idk i ran go mod tidy and this is what it wants

* Fix readme, with much help from adam

* some linting and testing errors

* lint

* fix lint

* fix lint register.go

* another lint

* address lint in test

* fix dead code and linters for query_test

* Go mod?

* Struggling with go mod

* Fix test

* Fix another test

* Revert headers change

* Its difficult to test this in OSS as it depends on functionality defined in enterprise, let's bring these tests back in some form in enterprise

* Fix codeowners

---------

Co-authored-by: Adam Simpson <adam@adamsimpson.net>
2025-07-17 17:22:55 -04:00

269 lines
7.8 KiB
Go

package expr
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/grafana/dataplane/examples"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/expr/metrics"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasources"
datafakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/mtdsclient"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginconfig"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
func TestPassThroughDataplaneExamples(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
validExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
})
require.NoError(t, err)
for _, collection := range validExamples.Collections() {
for _, example := range collection.ExampleSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
_, err := framesPassThroughService(t, example.Frames("A"))
require.NoError(t, err)
})
}
}
}
func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, error) {
me := &mockEndpoint{
map[string]backend.DataResponse{"A": {Frames: frames}},
}
features := featuremgmt.WithFeatures()
cfg := setting.NewCfg()
s := Service{
cfg: cfg,
dataService: me,
features: features,
pCtxProvider: plugincontext.ProvideService(cfg, nil, &pluginstore.FakePluginStore{
PluginList: []pluginstore.Plugin{
{JSONData: plugins.JSONData{ID: "test"}},
}},
&datafakes.FakeCacheService{}, &datafakes.FakeDataSourceService{},
nil, pluginconfig.NewFakePluginRequestConfigProvider()),
tracer: tracing.InitializeTracerForTest(),
metrics: metrics.NewSSEMetrics(nil),
converter: &ResultConverter{
Features: features,
Tracer: tracing.InitializeTracerForTest(),
},
mtDatasourceClientBuilder: mtdsclient.NewNullMTDatasourceClientBuilder(),
}
queries := []Query{{
RefID: "A",
DataSource: &datasources.DataSource{
OrgID: 1,
UID: "test",
Type: "test",
},
JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`),
TimeRange: AbsoluteTimeRange{
From: time.Time{},
To: time.Time{},
},
}}
req := &Request{
Queries: queries,
User: &user.SignedInUser{},
}
pl, err := s.BuildPipeline(req)
require.NoError(t, err)
res, err := s.ExecutePipeline(context.Background(), time.Now(), pl)
require.NoError(t, err)
require.Contains(t, res.Responses, "A")
return res.Responses["A"].Frames, res.Responses["A"].Error
}
func TestShouldUseDataplane(t *testing.T) {
t.Run("zero frames returns no data and is allowed", func(t *testing.T) {
f := data.Frames{}
dt, use, err := shouldUseDataplane(f, log.New(""), false)
require.NoError(t, err)
require.True(t, use)
require.Equal(t, data.KindUnknown, dt.Kind())
})
t.Run("a frame with Type and TypeVersion 0.0 will not use dataplane", func(t *testing.T) {
f := data.Frames{(&data.Frame{}).SetMeta(
&data.FrameMeta{
TypeVersion: data.FrameTypeVersion{},
Type: data.FrameTypeTimeSeriesMulti,
},
)}
_, use, err := shouldUseDataplane(f, log.New(""), false)
require.NoError(t, err)
require.False(t, use)
})
t.Run("a frame without Type and TypeVersion > 0.0 will not use dataplane", func(t *testing.T) {
f := data.Frames{(&data.Frame{}).SetMeta(
&data.FrameMeta{
TypeVersion: data.FrameTypeVersion{0, 1},
},
)}
_, use, err := shouldUseDataplane(f, log.New(""), false)
require.NoError(t, err)
require.False(t, use)
})
t.Run("a frame with no metadata will not use dataplane", func(t *testing.T) {
f := data.Frames{&data.Frame{}}
_, use, err := shouldUseDataplane(f, log.New(""), false)
require.NoError(t, err)
require.False(t, use)
})
t.Run("a newer version that supported will return a warning but still use dataplane", func(t *testing.T) {
ty := data.FrameTypeTimeSeriesMulti
v := data.FrameTypeVersion{999, 999}
f := data.Frames{(&data.Frame{}).SetMeta(
&data.FrameMeta{
Type: ty,
TypeVersion: v,
},
)}
dt, use, err := shouldUseDataplane(f, log.New(""), false)
require.NoError(t, err)
require.True(t, use)
require.Equal(t, data.KindTimeSeries, dt.Kind())
})
t.Run("all valid dataplane examples should use dataplane", func(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
validExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
})
require.NoError(t, err)
for _, collection := range validExamples.Collections() {
for _, example := range collection.ExampleSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
_, err := framesPassThroughService(t, example.Frames("A"))
require.NoError(t, err)
})
}
}
})
}
func TestHandleDataplaneNumeric(t *testing.T) {
t.Run("no data", func(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
validNoDataNumericExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
Kind: data.KindNumeric,
NoData: util.Pointer(true),
})
require.NoError(t, err)
for _, example := range validNoDataNumericExamples.AsSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
res, err := handleDataplaneNumeric(example.Frames("A"), false)
require.NoError(t, err)
require.Len(t, res.Values, 1)
})
}
})
t.Run("should read correct number of items from examples", func(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
numericExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
Kind: data.KindNumeric,
NoData: util.Pointer(false),
})
require.NoError(t, err)
for _, example := range numericExamples.AsSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
res, err := handleDataplaneNumeric(example.Frames("A"), false)
require.NoError(t, err)
require.Len(t, res.Values, example.Info().ItemCount)
})
}
})
}
func TestHandleDataplaneTS(t *testing.T) {
t.Run("no data", func(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
validNoDataTSExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
Kind: data.KindTimeSeries,
NoData: util.Pointer(true),
})
require.NoError(t, err)
for _, example := range validNoDataTSExamples.AsSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
res, err := handleDataplaneTimeseries(example.Frames("A"))
require.NoError(t, err)
require.Len(t, res.Values, 1)
})
}
})
t.Run("should read correct number of items from examples", func(t *testing.T) {
es, err := examples.GetExamples()
require.NoError(t, err)
tsExamples, err := es.Filter(examples.FilterOptions{
Version: data.FrameTypeVersion{0, 1},
Valid: util.Pointer(true),
Kind: data.KindTimeSeries,
NoData: util.Pointer(false),
})
require.NoError(t, err)
for _, example := range tsExamples.AsSlice() {
t.Run(example.Info().ID, func(t *testing.T) {
res, err := handleDataplaneTimeseries(example.Frames("A"))
require.NoError(t, err)
require.Len(t, res.Values, example.Info().ItemCount)
})
}
})
}