Files
grafana/pkg/storage/unified/sql/notifier_sql_test.go
2025-05-15 21:36:52 +02:00

410 lines
12 KiB
Go

package sql
import (
"context"
"sync"
"testing"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
func TestPollingNotifierConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config *pollingNotifierConfig
expectedErr error
}{
{
name: "valid config",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: nil,
},
{
name: "missing historyPoll",
config: &pollingNotifierConfig{
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errHistoryPollRequired,
},
{
name: "missing listLatestRVs",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errListLatestRVsRequired,
},
{
name: "missing bulkLock",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errBulkLockRequired,
},
{
name: "missing tracer",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errTracerRequired,
},
{
name: "missing logger",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errLogRequired,
},
{
name: "invalid watch buffer size",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 0,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errInvalidWatchBufferSize,
},
{
name: "invalid polling interval",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: 0,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errInvalidPollingInterval,
},
{
name: "missing done channel",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
dialect: sqltemplate.SQLite,
},
expectedErr: errDoneRequired,
},
{
name: "missing dialect",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
bulkLock: &bulkLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: &logging.NoOpLogger{},
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
},
expectedErr: errDialectRequired,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := tt.config.validate()
if tt.expectedErr != nil {
require.ErrorIs(t, err, tt.expectedErr)
} else {
require.NoError(t, err)
}
})
}
}
func TestPollingNotifier(t *testing.T) {
t.Parallel()
t.Run("notify returns channel and starts polling", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
defer close(done)
testEvent := &historyPollResponse{
Key: resourcepb.ResourceKey{
Namespace: "test-ns",
Group: "test-group",
Resource: "test-resource",
Name: "test-name",
},
ResourceVersion: 2,
Folder: "test-folder",
Value: []byte(`{"test": "data"}`),
Action: 1,
}
var listLatestRVsCalledCounter int
listLatestRVs := func(ctx context.Context) (groupResourceRV, error) {
// On the first call return 0, then the highest known RV.
var value int64 = 0
if listLatestRVsCalledCounter > 0 {
value = testEvent.ResourceVersion
}
listLatestRVsCalledCounter++
return groupResourceRV{
"test-group": map[string]int64{
"test-resource": value,
},
}, nil
}
var historyPollCalledCounter int
once := sync.Once{}
historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
// only assert the first time - this may be called multiple times
// depending on the host hardware etc, due to timing issues...
historyPollCalledCounter++
once.Do(func() {
require.Equal(t, "test-group", grp)
require.Equal(t, "test-resource", res)
require.Equal(t, int64(0), since)
})
return []*historyPollResponse{testEvent}, nil
}
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: &logging.NoOpLogger{},
tracer: noop.NewTracerProvider().Tracer("test"),
bulkLock: &bulkLock{},
listLatestRVs: listLatestRVs,
historyPoll: historyPoll,
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
select {
case event := <-events:
require.NotNil(t, event)
require.Equal(t, "test-ns", event.Key.Namespace)
require.Equal(t, "test-group", event.Key.Group)
require.Equal(t, "test-resource", event.Key.Resource)
require.Equal(t, "test-name", event.Key.Name)
require.Equal(t, int64(2), event.ResourceVersion)
require.Equal(t, "test-folder", event.Folder)
require.True(t, listLatestRVsCalledCounter > 0, "listLatestRVs should be called at least once")
require.True(t, historyPollCalledCounter == 1, "historyPoll should be called exactly once")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
})
t.Run("handles polling errors gracefully", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
defer close(done)
listLatestRVs := func(ctx context.Context) (groupResourceRV, error) {
return groupResourceRV{
"test-group": map[string]int64{
"test-resource": 0,
},
}, nil
}
historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, errTest
}
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: &logging.NoOpLogger{},
tracer: noop.NewTracerProvider().Tracer("test"),
bulkLock: &bulkLock{},
listLatestRVs: listLatestRVs,
historyPoll: historyPoll,
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
// Verify channel remains open despite error
select {
case _, ok := <-events:
require.True(t, ok, "channel should remain open")
case <-time.After(50 * time.Millisecond):
// Expected - no events due to error
}
})
t.Run("stops polling when done channel is closed", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: &logging.NoOpLogger{},
tracer: noop.NewTracerProvider().Tracer("test"),
bulkLock: &bulkLock{},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
close(done)
select {
case _, ok := <-events:
require.False(t, ok, "events channel should be closed")
case <-time.After(50 * time.Millisecond):
t.Fatal("timeout waiting for events channel to close")
}
})
t.Run("stops polling when context is cancelled", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: &logging.NoOpLogger{},
tracer: noop.NewTracerProvider().Tracer("test"),
bulkLock: &bulkLock{},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
done: make(chan struct{}),
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(ctx)
require.NoError(t, err)
require.NotNil(t, events)
cancel()
select {
case _, ok := <-events:
require.False(t, ok, "events channel should be closed")
case <-time.After(time.Second):
t.Fatal("timeout waiting for events channel to close")
}
})
}