Unistore: Batch write events (#101381)

* Batch write events

* Improve instrumentation

* Measure batch phases

* Detect lock contention

* remove the execBatch goroutine

* removing tracing prefix

* detect context cancel

* batch channel map
This commit is contained in:
Georges Chaudy
2025-03-13 09:24:12 +01:00
committed by GitHub
parent 2d71c8ae72
commit 3589baac68
21 changed files with 632 additions and 527 deletions

View File

@ -10,8 +10,6 @@ import (
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
@ -103,6 +101,9 @@ type backend struct {
watchBufferSize int
notifier eventNotifier
// resource version manager
rvManager *resourceVersionManager
// testing
simulatedNetworkLatency time.Duration
}
@ -127,6 +128,17 @@ func (b *backend) initLocked(ctx context.Context) error {
return fmt.Errorf("no dialect for driver %q", driverName)
}
// Initialize ResourceVersionManager
rvManager, err := NewResourceVersionManager(ResourceManagerOptions{
Dialect: b.dialect,
DB: b.db,
Tracer: b.tracer,
})
if err != nil {
return fmt.Errorf("failed to create resource version manager: %w", err)
}
b.rvManager = rvManager
// Initialize notifier after dialect is set up
notifier, err := newNotifier(b)
if err != nil {
@ -207,13 +219,14 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
defer span.End()
var newVersion int64
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. Insert into resource
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -221,7 +234,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource: %w", err)
return guid, fmt.Errorf("insert into resource: %w", err)
}
// 2. Insert into resource history
@ -231,38 +244,12 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
return guid, fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
// 5. Update the RV in both resource and resource_history
if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update resource_history rv: %w", err)
}
if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
newVersion = rv
if b.simulatedNetworkLatency > 0 {
time.Sleep(b.simulatedNetworkLatency)
}
return nil
return guid, nil
})
if err != nil {
@ -274,23 +261,24 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
ResourceVersion: rv,
Folder: folder,
})
return newVersion, nil
return rv, nil
}
func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Update")
defer span.End()
var newVersion int64
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
// Use rvManager.ExecWithRV instead of direct transaction
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. Update resource
_, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -299,7 +287,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
GUID: guid,
})
if err != nil {
return fmt.Errorf("initial resource update: %w", err)
return guid, fmt.Errorf("resource update: %w", err)
}
// 2. Insert into resource history
@ -309,36 +297,9 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
return guid, fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
// 5. Update the RV in both resource and resource_history
if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
newVersion = rv
return nil
return guid, nil
})
if err != nil {
@ -350,23 +311,22 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
ResourceVersion: rv,
Folder: folder,
})
return newVersion, nil
return rv, nil
}
func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Delete")
defer span.End()
var newVersion int64
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. delete from resource
_, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -374,7 +334,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
GUID: guid,
})
if err != nil {
return fmt.Errorf("delete resource: %w", err)
return guid, fmt.Errorf("delete resource: %w", err)
}
// 2. Add event to resource history
@ -384,28 +344,9 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
return guid, fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
// 5. Update the RV in resource_history
if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
newVersion = rv
return nil
return guid, nil
})
if err != nil {
@ -417,23 +358,22 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
ResourceVersion: rv,
Folder: folder,
})
return newVersion, nil
return rv, nil
}
func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Restore")
defer span.End()
var newVersion int64
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. Re-create resource
// Note: we may want to replace the write event with a create event, tbd.
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
@ -442,7 +382,7 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource: %w", err)
return guid, fmt.Errorf("insert into resource: %w", err)
}
// 2. Insert into resource history
@ -452,49 +392,22 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
return guid, fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
// 5. Update the RV in both resource and resource_history
if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
// 6. Update all resource history entries with the new UID
// 3. Update all resource history entries with the new UID
// Note: we do not update any history entries that have a deletion timestamp included. This will become
// important once we start using finalizers, as the initial delete will show up as an update with a deletion timestamp included.
if _, err = dbutil.Exec(ctx, tx, sqlResoureceHistoryUpdateUid, sqlResourceHistoryUpdateRequest{
if _, err := dbutil.Exec(ctx, tx, sqlResoureceHistoryUpdateUid, sqlResourceHistoryUpdateRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
OldUID: string(event.ObjectOld.GetUID()),
NewUID: string(event.Object.GetUID()),
}); err != nil {
return fmt.Errorf("update history uid: %w", err)
return guid, fmt.Errorf("update history uid: %w", err)
}
newVersion = rv
return nil
return guid, nil
})
if err != nil {
@ -506,11 +419,11 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: newVersion,
ResourceVersion: rv,
Folder: folder,
})
return newVersion, nil
return rv, nil
}
func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse {
@ -830,67 +743,3 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec
}
return res.ResourceVersion, nil
}
// resourceVersionAtomicInc atomically increases the version of a kind within a transaction.
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
func (b *backend) resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, key *resource.ResourceKey) (newVersion int64, err error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"version_atomic_inc", trace.WithAttributes(
semconv.K8SNamespaceName(key.Namespace),
// TODO: the following attributes could use some standardization.
attribute.String("k8s.resource.group", key.Group),
attribute.String("k8s.resource.type", key.Resource),
))
defer span.End()
// 1. Lock to row and prevent concurrent updates until the transaction is committed.
res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
Response: new(resourceVersionResponse), ReadOnly: false, // This locks the row for update
})
if errors.Is(err, sql.ErrNoRows) {
// if there wasn't a row associated with the given resource, then we create it.
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
Response: new(resourceVersionResponse),
ReadOnly: true, // This locks the row for update
})
if err != nil {
return 0, fmt.Errorf("fetching RV after read")
}
return res.ResourceVersion, nil
} else if err != nil {
return 0, fmt.Errorf("lock the resource version: %w", err)
}
// 2. Update the RV
// Most times, the RV is the current microsecond timestamp generated on the sql server (to avoid clock skew).
// In rare occasion, the server clock might go back in time. In those cases, we simply increment the
// previous RV until the clock catches up.
nextRV := max(res.CurrentEpoch, res.ResourceVersion+1)
_, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
ResourceVersion: nextRV,
})
if err != nil {
return 0, fmt.Errorf("increase resource version: %w", err)
}
return nextRV, nil
}

View File

@ -202,85 +202,6 @@ func TestBackend_IsHealthy(t *testing.T) {
require.ErrorIs(t, err, errTest)
}
// expectSuccessfulResourceVersionAtomicInc sets up expectations for calling
// resourceVersionAtomicInc, where the returned RV will be 1.
func expectSuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend) {
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
b.ExecWithResult("update resource_version set resource_version", 0, 0)
}
// expectUnsuccessfulResourceVersionAtomicInc sets up expectations for calling
// resourceVersionAtomicInc, where the returned RV will be 1.
func expectUnsuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend, err error) {
b.QueryWithErr("select resource_version for update", errTest)
}
func TestResourceVersionAtomicInc(t *testing.T) {
t.Parallel()
t.Run("happy path - insert new row", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
})
t.Run("happy path - update existing row", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
b.ExecWithResult("update resource_version", 0, 1)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
})
t.Run("error getting current version", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.QueryWithErr("select resource_version for update", errTest)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "lock the resource version")
})
t.Run("error inserting new row", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.QueryWithResult("select resource_version", 0, Rows{})
b.ExecWithErr("insert resource_version", errTest)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource_version")
})
t.Run("error updating existing row", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
b.ExecWithErr("update resource_version", errTest)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increase resource version")
})
}
func TestBackend_create(t *testing.T) {
t.Parallel()
meta, err := utils.MetaAccessor(&unstructured.Unstructured{
@ -296,18 +217,15 @@ func TestBackend_create(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
expectSuccessfulResourceVersionExec(t, b.TestDBProvider,
func() { b.ExecWithResult("insert resource", 0, 1) },
func() { b.ExecWithResult("insert resource_history", 0, 1) },
)
b.SQLMock.ExpectCommit()
v, err := b.create(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
require.Equal(t, int64(200), v)
})
t.Run("error inserting into resource", func(t *testing.T) {
@ -321,7 +239,7 @@ func TestBackend_create(t *testing.T) {
v, err := b.create(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource:")
require.ErrorContains(t, err, "insert into resource")
})
t.Run("error inserting into resource_history", func(t *testing.T) {
@ -336,58 +254,7 @@ func TestBackend_create(t *testing.T) {
v, err := b.create(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource history:")
})
t.Run("error incrementing resource version", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest)
b.SQLMock.ExpectRollback()
v, err := b.create(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increment resource version")
})
t.Run("error updating resource_history", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.create(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update resource_history", 0, 1)
})
t.Run("error updating resource", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithErr("update resource", errTest)
b.SQLMock.ExpectRollback()
v, err := b.create(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update resource rv")
require.ErrorContains(t, err, "insert into resource history")
})
}
@ -409,16 +276,15 @@ func TestBackend_update(t *testing.T) {
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
expectSuccessfulResourceVersionExec(t, b.TestDBProvider,
func() { b.ExecWithResult("update resource", 0, 1) },
func() { b.ExecWithResult("insert resource_history", 0, 1) },
)
b.SQLMock.ExpectCommit()
v, err := b.update(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
require.Equal(t, int64(200), v)
})
t.Run("error in first update to resource", func(t *testing.T) {
@ -432,7 +298,7 @@ func TestBackend_update(t *testing.T) {
v, err := b.update(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "initial resource update")
require.ErrorContains(t, err, "resource update")
})
t.Run("error inserting into resource history", func(t *testing.T) {
@ -449,57 +315,6 @@ func TestBackend_update(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource history")
})
t.Run("error incrementing rv", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest)
b.SQLMock.ExpectRollback()
v, err := b.update(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increment resource version")
})
t.Run("error updating history rv", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.update(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update history rv")
})
t.Run("error updating resource rv", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithErr("update resource", errTest)
b.SQLMock.ExpectRollback()
v, err := b.update(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update resource rv")
})
}
func TestBackend_delete(t *testing.T) {
@ -519,15 +334,15 @@ func TestBackend_delete(t *testing.T) {
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("delete resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
expectSuccessfulResourceVersionExec(t, b.TestDBProvider,
func() { b.ExecWithResult("delete resource", 0, 1) },
func() { b.ExecWithResult("insert resource_history", 0, 1) },
)
b.SQLMock.ExpectCommit()
v, err := b.delete(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
require.Equal(t, int64(200), v)
})
t.Run("error deleting resource", func(t *testing.T) {
@ -558,39 +373,6 @@ func TestBackend_delete(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource history")
})
t.Run("error incrementing resource version", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("delete resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest)
b.SQLMock.ExpectCommit()
v, err := b.delete(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increment resource version")
})
t.Run("error updating resource history", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("delete resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectCommit()
v, err := b.delete(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update history rv")
})
}
func TestBackend_restore(t *testing.T) {
@ -617,17 +399,16 @@ func TestBackend_restore(t *testing.T) {
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("update resource_history", 0, 1)
expectSuccessfulResourceVersionExec(t, b.TestDBProvider,
func() { b.ExecWithResult("insert resource", 0, 1) },
func() { b.ExecWithResult("insert resource_history", 0, 1) },
func() { b.ExecWithResult("update resource_history", 0, 1) },
)
b.SQLMock.ExpectCommit()
v, err := b.restore(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
require.Equal(t, int64(200), v)
})
t.Run("error restoring resource", func(t *testing.T) {
@ -641,7 +422,7 @@ func TestBackend_restore(t *testing.T) {
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource:")
require.ErrorContains(t, err, "insert into resource")
})
t.Run("error inserting into resource history", func(t *testing.T) {
@ -659,57 +440,6 @@ func TestBackend_restore(t *testing.T) {
require.ErrorContains(t, err, "insert into resource history")
})
t.Run("error incrementing resource version", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increment resource version")
})
t.Run("error updating resource history", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update history rv")
})
t.Run("error updating resource", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithErr("update resource", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update resource rv")
})
t.Run("error updating resource history uid", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
@ -717,9 +447,6 @@ func TestBackend_restore(t *testing.T) {
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()

View File

@ -245,7 +245,9 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
}
// Make sure the collection RV is above our last written event
_, err = b.resourceVersionAtomicInc(ctx, tx, key)
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
return "", nil
})
if err != nil {
b.log.Warn("error increasing RV", "error", err)
}

View File

@ -1,4 +1,12 @@
UPDATE {{ .Ident "resource_history" }}
SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }}
;
SET {{ .Ident "resource_version" }} = (
CASE
{{ range $guid, $rv := .GUIDToRV }}
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
{{ end }}
END
)
WHERE {{ .Ident "guid" }} IN (
{{$first := true}}
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}
);

View File

@ -3,7 +3,8 @@ UPDATE {{ .Ident "resource" }}
{{ .Ident "guid" }} = {{ .Arg .GUID }},
{{ .Ident "value" }} = {{ .Arg .WriteEvent.Value }},
{{ .Ident "folder" }} = {{ .Arg .Folder }},
{{ .Ident "action" }} = {{ .Arg .WriteEvent.Type }}
{{ .Ident "action" }} = {{ .Arg .WriteEvent.Type }},
{{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }}

View File

@ -1,4 +1,12 @@
UPDATE {{ .Ident "resource" }}
SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }}
;
SET {{ .Ident "resource_version" }} = (
CASE
{{ range $guid, $rv := .GUIDToRV }}
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
{{ end }}
END
)
WHERE {{ .Ident "guid" }} IN (
{{$first := true}}
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}
);

View File

@ -295,8 +295,7 @@ func (r sqlResourceBlobQueryRequest) Validate() error {
type sqlResourceUpdateRVRequest struct {
sqltemplate.SQLTemplate
GUID string
ResourceVersion int64
GUIDToRV map[string]int64
}
func (r sqlResourceUpdateRVRequest) Validate() error {

View File

@ -136,6 +136,10 @@ func TestUnifiedStorageQueries(t *testing.T) {
Name: "single path",
Data: &sqlResourceUpdateRVRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
GUIDToRV: map[string]int64{
"guid1": 123,
"guid2": 456,
},
},
},
},
@ -164,6 +168,10 @@ func TestUnifiedStorageQueries(t *testing.T) {
Name: "single path",
Data: &sqlResourceUpdateRVRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
GUIDToRV: map[string]int64{
"guid1": 123,
"guid2": 456,
},
},
},
},

View File

@ -0,0 +1,400 @@
package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)
var (
rvmWriteDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rvmanager_write_duration_seconds",
Help: "Duration of ResourceVersionManager write operations",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"group", "resource", "status"})
rvmExecBatchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rvmanager_exec_batch_duration_seconds",
Help: "Duration of ResourceVersionManager batch operations",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"group", "resource", "status"})
rvmExecBatchPhaseDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rvmanager_exec_batch_phase_duration_seconds",
Help: "Duration of batch operation phases",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"group", "resource", "phase"})
rvmInflightWrites = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "rvmanager_inflight_writes",
Help: "Number of concurrent write operations",
Namespace: "grafana",
}, []string{"group", "resource"})
rvmBatchSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rvmanager_batch_size",
Help: "Number of write operations per batch",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"group", "resource"})
)
const (
defaultMaxBatchSize = 25
defaultMaxBatchWaitTime = 100 * time.Millisecond
defaultBatchTimeout = 5 * time.Second
)
// resourceVersionManager handles resource version operations
type resourceVersionManager struct {
dialect sqltemplate.Dialect
db db.DB
tracer trace.Tracer
batchMu sync.RWMutex
batchChMap map[string]chan *writeOp
maxBatchSize int // The maximum number of operations to batch together
maxBatchWaitTime time.Duration // The maximum time to wait for a batch to be ready
}
type writeOpResult struct {
guid string
rv int64
err error
batchTraceLink trace.Link
}
// writeOp is a write operation that is executed with an incremented resource version
type writeOp struct {
key *resource.ResourceKey // The key of the resource
fn WriteEventFunc // The function to execute to write the event
done chan writeOpResult // A channel informing the operation is done
}
// WriteEventFunc is a function that writes a resource to the database
// It returns the GUID of the created resource
// The GUID is used to update the resource version for the resource in the same transaction.
type WriteEventFunc func(tx db.Tx) (guid string, err error)
type ResourceManagerOptions struct {
Dialect sqltemplate.Dialect // The dialect to use for the database
DB db.DB // The database to use
MaxBatchSize int // The maximum number of operations to batch together
MaxBatchWaitTime time.Duration // The maximum time to wait for a batch to be ready
Tracer trace.Tracer // The tracer to use for tracing
}
// NewResourceVersionManager creates a new ResourceVersionManager
func NewResourceVersionManager(opts ResourceManagerOptions) (*resourceVersionManager, error) {
if opts.MaxBatchSize == 0 {
opts.MaxBatchSize = defaultMaxBatchSize
}
if opts.MaxBatchWaitTime == 0 {
opts.MaxBatchWaitTime = defaultMaxBatchWaitTime
}
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("resource-version-manager")
}
if opts.Dialect == nil {
return nil, errors.New("dialect is required")
}
if opts.DB == nil {
return nil, errors.New("db is required")
}
return &resourceVersionManager{
dialect: opts.Dialect,
db: opts.DB,
tracer: opts.Tracer,
batchChMap: make(map[string]chan *writeOp),
maxBatchSize: opts.MaxBatchSize,
maxBatchWaitTime: opts.MaxBatchWaitTime,
}, nil
}
// ExecWithRV executes the given function with an incremented resource version
func (m *resourceVersionManager) ExecWithRV(ctx context.Context, key *resource.ResourceKey, fn WriteEventFunc) (rv int64, err error) {
rvmInflightWrites.WithLabelValues(key.Group, key.Resource).Inc()
defer rvmInflightWrites.WithLabelValues(key.Group, key.Resource).Dec()
var status string
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
status = "success"
if err != nil {
status = "error"
}
rvmWriteDuration.WithLabelValues(key.Group, key.Resource, status).Observe(v)
}))
defer timer.ObserveDuration()
ctx, span := m.tracer.Start(ctx, "sql.rvmanager.ExecWithRV")
defer span.End()
span.SetAttributes(
attribute.String("group", key.Group),
attribute.String("resource", key.Resource),
)
op := writeOp{key: key, fn: fn, done: make(chan writeOpResult, 1)}
batchKey := fmt.Sprintf("%s/%s", key.Group, key.Resource)
m.batchMu.Lock()
ch, ok := m.batchChMap[batchKey]
if !ok {
ch = make(chan *writeOp, m.maxBatchSize)
m.batchChMap[batchKey] = ch
go m.startBatchProcessor(key.Group, key.Resource)
}
m.batchMu.Unlock()
select {
case ch <- &op:
case <-ctx.Done():
return 0, ctx.Err()
}
select {
case res := <-op.done:
if res.err != nil {
span.RecordError(res.err)
}
span.SetAttributes(
attribute.String("guid", res.guid),
attribute.Int64("resource_version", res.rv),
)
span.AddLink(res.batchTraceLink)
return res.rv, res.err
case <-ctx.Done():
return 0, ctx.Err()
}
}
// startBatchProcessor is responsible for processing batches of write operations
func (m *resourceVersionManager) startBatchProcessor(group, resource string) {
ctx := context.TODO()
batchKey := fmt.Sprintf("%s/%s", group, resource)
m.batchMu.Lock()
ch, ok := m.batchChMap[batchKey]
if !ok {
m.batchMu.Unlock()
return
}
m.batchMu.Unlock()
for {
batch := make([]writeOp, 0, m.maxBatchSize)
// wait for a new writeOp
select {
case op := <-ch:
batch = append(batch, *op)
case <-ctx.Done():
return
}
prepare:
for len(batch) < m.maxBatchSize {
select {
case op := <-ch:
batch = append(batch, *op)
default:
break prepare
}
}
rvmBatchSize.WithLabelValues(group, resource).Observe(float64(len(batch)))
m.execBatch(ctx, group, resource, batch)
}
}
func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource string, batch []writeOp) {
ctx, span := m.tracer.Start(ctx, "sql.rvmanager.execBatch")
defer span.End()
// Add batch size attribute
span.SetAttributes(
attribute.Int("batch_size", len(batch)),
attribute.String("group", group),
attribute.String("resource", resource),
)
var err error
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
status := "success"
if err != nil {
status = "error"
}
rvmExecBatchDuration.WithLabelValues(group, resource, status).Observe(v)
}))
defer timer.ObserveDuration()
ctx, cancel := context.WithTimeout(ctx, defaultBatchTimeout)
defer cancel()
guidToRV := make(map[string]int64, len(batch))
guids := make([]string, len(batch)) // The GUIDs of the created resources in the same order as the batch
rvs := make([]int64, len(batch)) // The RVs of the created resources in the same order as the batch
err = m.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
span.AddEvent("starting_batch_transaction")
writeTimer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
rvmExecBatchPhaseDuration.WithLabelValues(group, resource, "write_ops").Observe(v)
}))
for i := range batch {
guid, err := batch[i].fn(tx)
if err != nil {
span.AddEvent("batch_operation_failed", trace.WithAttributes(
attribute.Int("operation_index", i),
attribute.String("error", err.Error()),
))
return fmt.Errorf("failed to execute function: %w", err)
}
guids[i] = guid
}
writeTimer.ObserveDuration()
span.AddEvent("batch_operations_completed")
lockTimer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
rvmExecBatchPhaseDuration.WithLabelValues(group, resource, "waiting_for_lock").Observe(v)
}))
rv, err := m.lock(ctx, tx, group, resource)
lockTimer.ObserveDuration()
if err != nil {
span.AddEvent("resource_version_lock_failed", trace.WithAttributes(
attribute.String("error", err.Error()),
))
return fmt.Errorf("failed to increment resource version: %w", err)
}
span.AddEvent("resource_version_locked", trace.WithAttributes(
attribute.Int64("initial_rv", rv),
))
rvUpdateTimer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
rvmExecBatchPhaseDuration.WithLabelValues(group, resource, "update_resource_versions").Observe(v)
}))
defer rvUpdateTimer.ObserveDuration()
// Allocate the RVs
for i, guid := range guids {
guidToRV[guid] = rv
rvs[i] = rv
rv++
}
// Update the resource version for the created resources in both the resource and the resource history
if _, err := dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(m.dialect),
GUIDToRV: guidToRV,
}); err != nil {
span.AddEvent("resource_update_rv_failed", trace.WithAttributes(
attribute.String("error", err.Error()),
))
return fmt.Errorf("update resource version: %w", err)
}
span.AddEvent("resource_versions_updated")
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(m.dialect),
GUIDToRV: guidToRV,
}); err != nil {
span.AddEvent("resource_history_update_rv_failed", trace.WithAttributes(
attribute.String("error", err.Error()),
))
return fmt.Errorf("update resource history version: %w", err)
}
span.AddEvent("resource_history_versions_updated")
// Record the latest RV in the resource version table
err = m.saveRV(ctx, tx, group, resource, rv)
if err != nil {
span.AddEvent("save_rv_failed", trace.WithAttributes(
attribute.String("error", err.Error()),
))
}
return err
})
if err != nil {
span.AddEvent("batch_transaction_failed", trace.WithAttributes(
attribute.String("error", err.Error()),
))
} else {
span.AddEvent("batch_transaction_completed")
}
// notify the caller that the operations are done
for i := range batch {
batch[i].done <- writeOpResult{
guid: guids[i],
rv: rvs[i],
err: err,
batchTraceLink: trace.LinkFromContext(ctx),
}
}
}
// lock locks the resource version for the given key
func (m *resourceVersionManager) lock(ctx context.Context, x db.ContextExecer, group, resource string) (nextRV int64, err error) {
// 1. Lock the row and prevent concurrent updates until the transaction is committed
res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(m.dialect),
Group: group,
Resource: resource,
Response: new(resourceVersionResponse),
ReadOnly: false, // Lock the row for update
})
if errors.Is(err, sql.ErrNoRows) {
// If there wasn't a row for this resource, create it
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(m.dialect),
Group: group,
Resource: resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
// Fetch the newly created resource version
res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(m.dialect),
Group: group,
Resource: resource,
Response: new(resourceVersionResponse),
ReadOnly: true,
})
if err != nil {
return 0, fmt.Errorf("fetching RV after insert: %w", err)
}
return res.ResourceVersion, nil
} else if err != nil {
return 0, fmt.Errorf("lock the resource version: %w", err)
}
return max(res.CurrentEpoch, res.ResourceVersion+1), nil
}
func (m *resourceVersionManager) saveRV(ctx context.Context, x db.ContextExecer, group, resource string, rv int64) error {
_, err := dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(m.dialect),
Group: group,
Resource: resource,
ResourceVersion: rv,
})
if err != nil {
return fmt.Errorf("save resource version: %w", err)
}
return nil
}

View File

@ -0,0 +1,64 @@
package sql
import (
"testing"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/test"
"github.com/grafana/grafana/pkg/util/testutil"
"github.com/stretchr/testify/require"
)
func expectSuccessfulResourceVersionLock(t *testing.T, dbp test.TestDBProvider, rv int64, timestamp int64) {
dbp.SQLMock.ExpectQuery("select resource_version, unix_timestamp for update").
WillReturnRows(sqlmock.NewRows([]string{"resource_version", "unix_timestamp"}).
AddRow(rv, timestamp))
}
func expectSuccessfulResourceVersionSaveRV(t *testing.T, dbp test.TestDBProvider) {
dbp.SQLMock.ExpectExec("update resource set resource_version").WillReturnResult(sqlmock.NewResult(1, 1))
dbp.SQLMock.ExpectExec("update resource_history set resource_version").WillReturnResult(sqlmock.NewResult(1, 1))
dbp.SQLMock.ExpectExec("update resource_version set resource_version").WillReturnResult(sqlmock.NewResult(1, 1))
}
func expectSuccessfulResourceVersionExec(t *testing.T, dbp test.TestDBProvider, cbs ...func()) {
for _, cb := range cbs {
cb()
}
expectSuccessfulResourceVersionLock(t, dbp, 100, 200)
expectSuccessfulResourceVersionSaveRV(t, dbp)
}
func TestResourceVersionManager(t *testing.T) {
ctx := testutil.NewDefaultTestContext(t)
dbp := test.NewDBProviderMatchWords(t)
dialect := sqltemplate.DialectForDriver(dbp.DB.DriverName())
manager, err := NewResourceVersionManager(ResourceManagerOptions{
DB: dbp.DB,
Dialect: dialect,
})
require.NoError(t, err)
require.NotNil(t, manager)
t.Run("should handle single operation", func(t *testing.T) {
key := &resource.ResourceKey{
Group: "test-group",
Resource: "test-resource",
}
dbp.SQLMock.ExpectBegin()
expectSuccessfulResourceVersionExec(t, dbp, func() {
dbp.SQLMock.ExpectExec("select 1").WillReturnResult(sqlmock.NewResult(1, 1))
})
dbp.SQLMock.ExpectCommit()
rv, err := manager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
_, err := tx.ExecContext(ctx, "select 1")
return "1234", err
})
require.NoError(t, err)
require.Equal(t, rv, int64(200))
})
}

View File

@ -23,7 +23,7 @@ func newTestBackend(b *testing.B) resource.StorageBackend {
backend, err := sql.NewBackend(sql.BackendOptions{
DBProvider: eDB,
IsHA: true,
SimulatedNetworkLatency: 5 * time.Millisecond, // to simulate some network latency
SimulatedNetworkLatency: 2 * time.Millisecond, // to simulate some network latency
})
require.NoError(b, err)
require.NotNil(b, backend)

View File

@ -1,4 +1,10 @@
UPDATE `resource_history`
SET `resource_version` = 0
WHERE `guid` = ''
;
SET `resource_version` = (
CASE
WHEN `guid` = 'guid1' THEN CAST(123 AS SIGNED)
WHEN `guid` = 'guid2' THEN CAST(456 AS SIGNED)
END
)
WHERE `guid` IN (
'guid1', 'guid2'
);

View File

@ -3,7 +3,8 @@ UPDATE `resource`
`guid` = '',
`value` = '[]',
`folder` = 'fldr',
`action` = 'UNKNOWN'
`action` = 'UNKNOWN',
`resource_version` = 0
WHERE 1 = 1
AND `group` = 'gg'
AND `resource` = 'rr'

View File

@ -1,4 +1,10 @@
UPDATE `resource`
SET `resource_version` = 0
WHERE `guid` = ''
;
SET `resource_version` = (
CASE
WHEN `guid` = 'guid1' THEN CAST(123 AS SIGNED)
WHEN `guid` = 'guid2' THEN CAST(456 AS SIGNED)
END
)
WHERE `guid` IN (
'guid1', 'guid2'
);

View File

@ -1,4 +1,10 @@
UPDATE "resource_history"
SET "resource_version" = 0
WHERE "guid" = ''
;
SET "resource_version" = (
CASE
WHEN "guid" = 'guid1' THEN CAST(123 AS BIGINT)
WHEN "guid" = 'guid2' THEN CAST(456 AS BIGINT)
END
)
WHERE "guid" IN (
'guid1', 'guid2'
);

View File

@ -3,7 +3,8 @@ UPDATE "resource"
"guid" = '',
"value" = '[]',
"folder" = 'fldr',
"action" = 'UNKNOWN'
"action" = 'UNKNOWN',
"resource_version" = 0
WHERE 1 = 1
AND "group" = 'gg'
AND "resource" = 'rr'

View File

@ -1,4 +1,10 @@
UPDATE "resource"
SET "resource_version" = 0
WHERE "guid" = ''
;
SET "resource_version" = (
CASE
WHEN "guid" = 'guid1' THEN CAST(123 AS BIGINT)
WHEN "guid" = 'guid2' THEN CAST(456 AS BIGINT)
END
)
WHERE "guid" IN (
'guid1', 'guid2'
);

View File

@ -1,4 +1,10 @@
UPDATE "resource_history"
SET "resource_version" = 0
WHERE "guid" = ''
;
SET "resource_version" = (
CASE
WHEN "guid" = 'guid1' THEN CAST(123 AS SIGNED)
WHEN "guid" = 'guid2' THEN CAST(456 AS SIGNED)
END
)
WHERE "guid" IN (
'guid1', 'guid2'
);

View File

@ -3,7 +3,8 @@ UPDATE "resource"
"guid" = '',
"value" = '[]',
"folder" = 'fldr',
"action" = 'UNKNOWN'
"action" = 'UNKNOWN',
"resource_version" = 0
WHERE 1 = 1
AND "group" = 'gg'
AND "resource" = 'rr'

View File

@ -1,4 +1,10 @@
UPDATE "resource"
SET "resource_version" = 0
WHERE "guid" = ''
;
SET "resource_version" = (
CASE
WHEN "guid" = 'guid1' THEN CAST(123 AS SIGNED)
WHEN "guid" = 'guid2' THEN CAST(456 AS SIGNED)
END
)
WHERE "guid" IN (
'guid1', 'guid2'
);

View File

@ -26,7 +26,7 @@ type BenchmarkOptions struct {
func DefaultBenchmarkOptions() *BenchmarkOptions {
return &BenchmarkOptions{
NumResources: 1000,
Concurrency: 20,
Concurrency: 50,
NumNamespaces: 1,
NumGroups: 1,
NumResourceTypes: 1,