fix(unified-storage): enable in-process events for single-instance (#100807)

This commit is contained in:
Jean-Philippe Quéméner
2025-02-21 12:25:35 +01:00
committed by GitHub
parent a112ef6467
commit 7be1fd953a
9 changed files with 900 additions and 142 deletions

View File

@ -130,6 +130,13 @@ password =
# Example: mysql://user:secret@host:port/database
url =
# Set to true or false to enable or disable high availability mode.
# When it's set to false some functions will be simplified and only run in-process
# instead of relying on the database.
#
# Only set it to false if you run only a single instance of Grafana.
high_availability = true
# Max idle conn setting default is 2
max_idle_conn = 2

View File

@ -129,6 +129,13 @@
# Example: mysql://user:secret@host:port/database
;url =
# Set to true or false to enable or disable high availability mode.
# When it's set to false some functions will be simplified and only run in-process
# instead of relying on the database.
#
# Only set it to false if you run only a single instance of Grafana.
;high_availability = true
# Max idle conn setting default is 2
;max_idle_conn = 2

View File

@ -39,6 +39,7 @@ type BackendOptions struct {
Tracer trace.Tracer
PollingInterval time.Duration
WatchBufferSize int
IsHA bool
}
func NewBackend(opts BackendOptions) (Backend, error) {
@ -50,26 +51,29 @@ func NewBackend(opts BackendOptions) (Backend, error) {
}
ctx, cancel := context.WithCancel(context.Background())
pollingInterval := opts.PollingInterval
if pollingInterval == 0 {
pollingInterval = defaultPollingInterval
if opts.PollingInterval == 0 {
opts.PollingInterval = defaultPollingInterval
}
if opts.WatchBufferSize == 0 {
opts.WatchBufferSize = defaultWatchBufferSize
}
return &backend{
isHA: opts.IsHA,
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
dbProvider: opts.DBProvider,
pollingInterval: pollingInterval,
pollingInterval: opts.PollingInterval,
watchBufferSize: opts.WatchBufferSize,
batchLock: &batchLock{running: make(map[string]bool)},
}, nil
}
type backend struct {
//general
isHA bool
// server lifecycle
done <-chan struct{}
cancel context.CancelFunc
@ -90,6 +94,7 @@ type backend struct {
//stream chan *resource.WatchEvent
pollingInterval time.Duration
watchBufferSize int
notifier eventNotifier
}
func (b *backend) Init(ctx context.Context) error {
@ -112,6 +117,13 @@ func (b *backend) initLocked(ctx context.Context) error {
return fmt.Errorf("no dialect for driver %q", driverName)
}
// Initialize notifier after dialect is set up
notifier, err := newNotifier(b)
if err != nil {
return fmt.Errorf("failed to create notifier: %w", err)
}
b.notifier = notifier
return b.db.PingContext(ctx)
}
@ -187,11 +199,11 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// 1. Insert into resource
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -240,7 +252,21 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
return nil
})
return newVersion, err
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
Folder: folder,
})
return newVersion, nil
}
func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) {
@ -248,11 +274,11 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// 1. Update resource
_, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -303,7 +329,20 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
return nil
})
return newVersion, err
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
Folder: folder,
})
return newVersion, nil
}
func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) {
@ -311,12 +350,11 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// 1. delete from resource
_, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -358,7 +396,20 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
return nil
})
return newVersion, err
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
Folder: folder,
})
return newVersion, nil
}
func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64, error) {
@ -366,12 +417,11 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// 1. Re-create resource
// Note: we may want to replace the write event with a create event, tbd.
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
@ -435,7 +485,20 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
return nil
})
return newVersion, err
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
Folder: folder,
})
return newVersion, nil
}
func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse {
@ -707,68 +770,7 @@ func (b *backend) getHistory(ctx context.Context, req *resource.ListRequest, cb
}
func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
// Get the latest RV
since, err := b.listLatestRVs(ctx)
if err != nil {
return nil, fmt.Errorf("watch, get latest resource version: %w", err)
}
// Start the poller
stream := make(chan *resource.WrittenEvent, b.watchBufferSize)
go b.poller(ctx, since, stream)
return stream, nil
}
func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) {
t := time.NewTicker(b.pollingInterval)
defer close(stream)
defer t.Stop()
isSQLite := b.dialect.DialectName() == "sqlite"
for {
select {
case <-b.done:
return
case <-t.C:
// Block polling duffing import to avoid database locked issues
if isSQLite && b.batchLock.Active() {
continue
}
ctx, span := b.tracer.Start(ctx, tracePrefix+"poller")
// List the latest RVs
grv, err := b.listLatestRVs(ctx)
if err != nil {
b.log.Error("poller get latest resource version", "err", err)
t.Reset(b.pollingInterval)
continue
}
for group, items := range grv {
for resource := range items {
// If we haven't seen this resource before, we start from 0
if _, ok := since[group]; !ok {
since[group] = make(map[string]int64)
}
if _, ok := since[group][resource]; !ok {
since[group][resource] = 0
}
// Poll for new events
next, err := b.poll(ctx, group, resource, since[group][resource], stream)
if err != nil {
b.log.Error("polling for resource", "err", err)
t.Reset(b.pollingInterval)
continue
}
if next > since[group][resource] {
since[group][resource] = next
}
}
}
t.Reset(b.pollingInterval)
span.End()
}
}
return b.notifier.notify(ctx)
}
// listLatestRVs returns the latest resource version for each (Group, Resource) pair.
@ -817,59 +819,6 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec
return res.ResourceVersion, nil
}
func (b *backend) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"poll")
defer span.End()
start := time.Now()
var records []*historyPollResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Resource: res,
Group: grp,
SinceResourceVersion: since,
Response: &historyPollResponse{},
})
return err
})
if err != nil {
return 0, fmt.Errorf("poll history: %w", err)
}
end := time.Now()
resource.NewStorageMetrics().PollerLatency.Observe(end.Sub(start).Seconds())
var nextRV int64
for _, rec := range records {
if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" {
return nextRV, fmt.Errorf("missing key in response")
}
nextRV = rec.ResourceVersion
prevRV := rec.PreviousRV
if prevRV == nil {
prevRV = new(int64)
}
stream <- &resource.WrittenEvent{
Value: rec.Value,
Key: &resource.ResourceKey{
Namespace: rec.Key.Namespace,
Group: rec.Key.Group,
Resource: rec.Key.Resource,
Name: rec.Key.Name,
},
Type: resource.WatchEvent_Type(rec.Action),
PreviousRV: *prevRV,
Folder: rec.Folder,
ResourceVersion: rec.ResourceVersion,
// Timestamp: , // TODO: add timestamp
}
b.log.Debug("poller sent event to stream", "namespace", rec.Key.Namespace, "group", rec.Key.Group, "resource", rec.Key.Resource, "name", rec.Key.Name, "action", rec.Action, "rv", rec.ResourceVersion)
}
return nextRV, nil
}
// resourceVersionAtomicInc atomically increases the version of a kind within a transaction.
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
// in a single roundtrip. This would reduce the latency of the operation, and also increase the

View File

@ -0,0 +1,119 @@
package sql
import (
"context"
"sync"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
type eventNotifier interface {
notify(ctx context.Context) (<-chan *resource.WrittenEvent, error)
// send will forward an event to all subscribers who want to be notified.
//
// Note: depending on the implementation, send might be noop and new events
// will be fetched from an external source.
send(ctx context.Context, event *resource.WrittenEvent)
close()
}
func newNotifier(b *backend) (eventNotifier, error) {
if b.isHA {
b.log.Info("Using polling notifier")
notifier, err := newPollingNotifier(&pollingNotifierConfig{
pollingInterval: b.pollingInterval,
watchBufferSize: b.watchBufferSize,
log: b.log,
tracer: b.tracer,
batchLock: b.batchLock,
listLatestRVs: b.listLatestRVs,
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
var records []*historyPollResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Resource: res,
Group: grp,
SinceResourceVersion: since,
Response: &historyPollResponse{},
})
return err
})
return records, err
},
done: b.done,
dialect: b.dialect,
})
if err != nil {
return nil, err
}
return notifier, nil
}
b.log.Info("Using channel notifier")
return newChannelNotifier(b.watchBufferSize, b.log), nil
}
type channelNotifier struct {
log log.Logger
bufferSize int
mu sync.RWMutex
subscribers map[chan *resource.WrittenEvent]bool
}
func newChannelNotifier(bufferSize int, log log.Logger) *channelNotifier {
return &channelNotifier{
subscribers: make(map[chan *resource.WrittenEvent]bool),
log: log,
bufferSize: bufferSize,
}
}
func (n *channelNotifier) notify(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
events := make(chan *resource.WrittenEvent, n.bufferSize)
n.mu.Lock()
n.subscribers[events] = true
n.mu.Unlock()
go func() {
<-ctx.Done()
n.mu.Lock()
if n.subscribers[events] {
delete(n.subscribers, events)
close(events)
}
n.mu.Unlock()
}()
return events, nil
}
func (n *channelNotifier) send(_ context.Context, event *resource.WrittenEvent) {
n.mu.RLock()
defer n.mu.RUnlock()
for ch := range n.subscribers {
select {
case ch <- event:
default:
n.log.Warn("Dropped event notification for subscriber - channel full")
}
}
}
func (n *channelNotifier) close() {
n.mu.Lock()
defer n.mu.Unlock()
for ch := range n.subscribers {
close(ch)
}
n.subscribers = make(map[chan *resource.WrittenEvent]bool)
}

View File

@ -0,0 +1,216 @@
package sql
import (
"context"
"fmt"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"go.opentelemetry.io/otel/trace"
)
var (
// Validation errors.
errHistoryPollRequired = fmt.Errorf("historyPoll is required")
errListLatestRVsRequired = fmt.Errorf("listLatestRVs is required")
errBatchLockRequired = fmt.Errorf("batchLock is required")
errTracerRequired = fmt.Errorf("tracer is required")
errLogRequired = fmt.Errorf("log is required")
errInvalidWatchBufferSize = fmt.Errorf("watchBufferSize must be greater than 0")
errInvalidPollingInterval = fmt.Errorf("pollingInterval must be greater than 0")
errDoneRequired = fmt.Errorf("done is required")
errDialectRequired = fmt.Errorf("dialect is required")
)
// pollingNotifier is a notifier that polls the database for new events.
type pollingNotifier struct {
dialect sqltemplate.Dialect
pollingInterval time.Duration
watchBufferSize int
log log.Logger
tracer trace.Tracer
batchLock *batchLock
listLatestRVs func(ctx context.Context) (groupResourceRV, error)
historyPoll func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error)
done <-chan struct{}
}
type pollingNotifierConfig struct {
dialect sqltemplate.Dialect
pollingInterval time.Duration
watchBufferSize int
log log.Logger
tracer trace.Tracer
batchLock *batchLock
listLatestRVs func(ctx context.Context) (groupResourceRV, error)
historyPoll func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error)
done <-chan struct{}
}
func (cfg *pollingNotifierConfig) validate() error {
if cfg.historyPoll == nil {
return errHistoryPollRequired
}
if cfg.listLatestRVs == nil {
return errListLatestRVsRequired
}
if cfg.batchLock == nil {
return errBatchLockRequired
}
if cfg.tracer == nil {
return errTracerRequired
}
if cfg.log == nil {
return errLogRequired
}
if cfg.watchBufferSize <= 0 {
return errInvalidWatchBufferSize
}
if cfg.pollingInterval <= 0 {
return errInvalidPollingInterval
}
if cfg.done == nil {
return errDoneRequired
}
if cfg.dialect == nil {
return errDialectRequired
}
return nil
}
func newPollingNotifier(cfg *pollingNotifierConfig) (*pollingNotifier, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid polling notifier config: %w", err)
}
return &pollingNotifier{
dialect: cfg.dialect,
pollingInterval: cfg.pollingInterval,
watchBufferSize: cfg.watchBufferSize,
log: cfg.log,
tracer: cfg.tracer,
batchLock: cfg.batchLock,
listLatestRVs: cfg.listLatestRVs,
historyPoll: cfg.historyPoll,
done: cfg.done,
}, nil
}
func (p *pollingNotifier) notify(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
since, err := p.listLatestRVs(ctx)
if err != nil {
return nil, fmt.Errorf("watch, get latest resource version: %w", err)
}
stream := make(chan *resource.WrittenEvent, p.watchBufferSize)
go p.poller(ctx, since, stream)
return stream, nil
}
func (p *pollingNotifier) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) {
t := time.NewTicker(p.pollingInterval)
defer close(stream)
defer t.Stop()
for {
select {
case <-p.done:
return
case <-t.C:
ctx, span := p.tracer.Start(ctx, tracePrefix+"poller")
// List the latest RVs to see if any of those are not have been seen before.
grv, err := p.listLatestRVs(ctx)
if err != nil {
p.log.Error("poller get latest resource version", "err", err)
t.Reset(p.pollingInterval)
continue
}
for group, items := range grv {
for resource := range items {
// If we haven't seen this resource before, we start from 0.
if _, ok := since[group]; !ok {
since[group] = make(map[string]int64)
}
if _, ok := since[group][resource]; !ok {
since[group][resource] = 0
}
// Poll for new events.
next, err := p.poll(ctx, group, resource, since[group][resource], stream)
if err != nil {
p.log.Error("polling for resource", "err", err)
t.Reset(p.pollingInterval)
continue
}
if next > since[group][resource] {
since[group][resource] = next
}
}
}
t.Reset(p.pollingInterval)
span.End()
}
}
}
func (p *pollingNotifier) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) {
ctx, span := p.tracer.Start(ctx, tracePrefix+"poll")
defer span.End()
start := time.Now()
records, err := p.historyPoll(ctx, grp, res, since)
if err != nil {
return 0, fmt.Errorf("poll history: %w", err)
}
resource.NewStorageMetrics().PollerLatency.Observe(time.Since(start).Seconds())
var nextRV int64
for _, rec := range records {
if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" {
return nextRV, fmt.Errorf("missing key in response")
}
nextRV = rec.ResourceVersion
prevRV := rec.PreviousRV
if prevRV == nil {
prevRV = new(int64)
}
stream <- &resource.WrittenEvent{
Value: rec.Value,
Key: &resource.ResourceKey{
Namespace: rec.Key.Namespace,
Group: rec.Key.Group,
Resource: rec.Key.Resource,
Name: rec.Key.Name,
},
Type: resource.WatchEvent_Type(rec.Action),
PreviousRV: *prevRV,
Folder: rec.Folder,
ResourceVersion: rec.ResourceVersion,
// Timestamp: , // TODO: add timestamp
}
p.log.Debug("poller sent event to stream",
"namespace", rec.Key.Namespace,
"group", rec.Key.Group,
"resource", rec.Key.Resource,
"name", rec.Key.Name,
"action", rec.Action,
"rv", rec.ResourceVersion)
}
return nextRV, nil
}
func (p *pollingNotifier) send(_ context.Context, _ *resource.WrittenEvent) {
// No-op for polling strategy - changes are detected via polling.
}
func (p *pollingNotifier) close() {
// No-op for polling strategy.
}

View File

@ -0,0 +1,360 @@
package sql
import (
"context"
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
)
func TestPollingNotifierConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
config *pollingNotifierConfig
expectedErr error
}{
{
name: "valid config",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: nil,
},
{
name: "missing historyPoll",
config: &pollingNotifierConfig{
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errHistoryPollRequired,
},
{
name: "missing listLatestRVs",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errListLatestRVsRequired,
},
{
name: "missing batchLock",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errBatchLockRequired,
},
{
name: "missing tracer",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errTracerRequired,
},
{
name: "missing logger",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errLogRequired,
},
{
name: "invalid watch buffer size",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 0,
pollingInterval: time.Second,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errInvalidWatchBufferSize,
},
{
name: "invalid polling interval",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: 0,
done: make(chan struct{}),
dialect: sqltemplate.SQLite,
},
expectedErr: errInvalidPollingInterval,
},
{
name: "missing done channel",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
dialect: sqltemplate.SQLite,
},
expectedErr: errDoneRequired,
},
{
name: "missing dialect",
config: &pollingNotifierConfig{
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
batchLock: &batchLock{},
tracer: noop.NewTracerProvider().Tracer("test"),
log: log.NewNopLogger(),
watchBufferSize: 10,
pollingInterval: time.Second,
done: make(chan struct{}),
},
expectedErr: errDialectRequired,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := tt.config.validate()
if tt.expectedErr != nil {
require.ErrorIs(t, err, tt.expectedErr)
} else {
require.NoError(t, err)
}
})
}
}
func TestPollingNotifier(t *testing.T) {
t.Parallel()
t.Run("notify returns channel and starts polling", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
defer close(done)
testEvent := &historyPollResponse{
Key: resource.ResourceKey{
Namespace: "test-ns",
Group: "test-group",
Resource: "test-resource",
Name: "test-name",
},
ResourceVersion: 2,
Folder: "test-folder",
Value: []byte(`{"test": "data"}`),
Action: 1,
}
var latestRVsCalled bool
listLatestRVs := func(ctx context.Context) (groupResourceRV, error) {
latestRVsCalled = true
return groupResourceRV{
"test-group": map[string]int64{
"test-resource": 0,
},
}, nil
}
var historyPollCalled bool
historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
historyPollCalled = true
require.Equal(t, "test-group", grp)
require.Equal(t, "test-resource", res)
require.Equal(t, int64(0), since)
return []*historyPollResponse{testEvent}, nil
}
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: log.NewNopLogger(),
tracer: noop.NewTracerProvider().Tracer("test"),
batchLock: &batchLock{},
listLatestRVs: listLatestRVs,
historyPoll: historyPoll,
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
select {
case event := <-events:
require.NotNil(t, event)
require.Equal(t, "test-ns", event.Key.Namespace)
require.Equal(t, "test-group", event.Key.Group)
require.Equal(t, "test-resource", event.Key.Resource)
require.Equal(t, "test-name", event.Key.Name)
require.Equal(t, int64(2), event.ResourceVersion)
require.Equal(t, "test-folder", event.Folder)
require.True(t, latestRVsCalled, "listLatestRVs should be called")
require.True(t, historyPollCalled, "historyPoll should be called")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
})
t.Run("handles polling errors gracefully", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
defer close(done)
listLatestRVs := func(ctx context.Context) (groupResourceRV, error) {
return groupResourceRV{
"test-group": map[string]int64{
"test-resource": 0,
},
}, nil
}
historyPoll := func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, errTest
}
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: log.NewNopLogger(),
tracer: noop.NewTracerProvider().Tracer("test"),
batchLock: &batchLock{},
listLatestRVs: listLatestRVs,
historyPoll: historyPoll,
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
// Verify channel remains open despite error
select {
case _, ok := <-events:
require.True(t, ok, "channel should remain open")
case <-time.After(50 * time.Millisecond):
// Expected - no events due to error
}
})
t.Run("stops polling when done channel is closed", func(t *testing.T) {
t.Parallel()
done := make(chan struct{})
cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: log.NewNopLogger(),
tracer: noop.NewTracerProvider().Tracer("test"),
batchLock: &batchLock{},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
done: done,
}
notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)
events, err := notifier.notify(context.Background())
require.NoError(t, err)
require.NotNil(t, events)
close(done)
select {
case _, ok := <-events:
require.False(t, ok, "events channel should be closed")
case <-time.After(50 * time.Millisecond):
t.Fatal("timeout waiting for events channel to close")
}
})
}

View File

@ -0,0 +1,71 @@
package sql
import (
"context"
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/stretchr/testify/require"
)
func TestChannelNotifier(t *testing.T) {
t.Run("should notify subscribers of events", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
n := newChannelNotifier(5, log.NewNopLogger())
events, err := n.notify(ctx)
require.NoError(t, err)
testEvent := &resource.WrittenEvent{
Type: resource.WatchEvent_ADDED,
Key: &resource.ResourceKey{
Group: "test",
Resource: "test",
Name: "test1",
Namespace: "test",
},
ResourceVersion: 1,
}
n.send(ctx, testEvent)
select {
case event := <-events:
require.Equal(t, testEvent, event)
case <-ctx.Done():
t.Fatal("timeout waiting for event")
}
})
t.Run("should drop events when buffer is full", func(t *testing.T) {
bufferSize := 2
n := newChannelNotifier(bufferSize, log.NewNopLogger())
events, err := n.notify(context.Background())
require.NoError(t, err)
for i := 0; i < bufferSize+1; i++ {
n.send(context.Background(), &resource.WrittenEvent{
ResourceVersion: int64(i),
})
}
require.Equal(t, bufferSize, len(events))
})
t.Run("should close subscriber channels when context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n := newChannelNotifier(5, log.NewNopLogger())
events, err := n.notify(ctx)
require.NoError(t, err)
cancel()
_, ok := <-events
require.False(t, ok, "channel should be closed")
})
}

View File

@ -10,6 +10,7 @@ import (
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
@ -43,7 +44,17 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
if err != nil {
return nil, err
}
store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer})
dbCfg := cfg.SectionWithEnvOverrides("database")
// Check in the config if HA is enabled by default we always assume a HA setup.
isHA := dbCfg.Key("high_availability").MustBool(true)
// SQLite is not possible to run in HA, so we set it to false.
databaseType := dbCfg.Key("type").MustString(migrator.SQLite)
if databaseType == migrator.SQLite {
isHA = false
}
store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA})
if err != nil {
return nil, err
}

View File

@ -39,6 +39,24 @@ func TestIntegrationSQLStorageBackend(t *testing.T) {
backend, err := sql.NewBackend(sql.BackendOptions{
DBProvider: eDB,
IsHA: true,
})
require.NoError(t, err)
require.NotNil(t, backend)
err = backend.Init(testutil.NewDefaultTestContext(t))
require.NoError(t, err)
return backend
})
// Run single instance tests with in-process notifier.
unitest.RunStorageBackendTest(t, func(ctx context.Context) resource.StorageBackend {
dbstore := infraDB.InitTestDB(t)
eDB, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), nil)
require.NoError(t, err)
require.NotNil(t, eDB)
backend, err := sql.NewBackend(sql.BackendOptions{
DBProvider: eDB,
IsHA: false,
})
require.NoError(t, err)
require.NotNil(t, backend)