From d999b415df78ceba5302e3c0f98975a870cce8c8 Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Fri, 11 Oct 2024 11:11:33 +0200 Subject: [PATCH] UniStore: Use epoch with microsecond resolution as RV (#92638) * Use epoch with microsecond resolution as RV * fix backend tests * Add solution for when the clock goes back * Add solution for when the clock goes back * generate mocks * go lint * remove comment * Use Greatest instead of max in msyql and postgres * update tests * Update pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go Co-authored-by: Diego Augusto Molina * cast to bigint * add additional round trip * increment the RV using 2 sql round trips instead of 3 * cleanup comments * cast unix timestamp to integer * fix postgres query * remove old increment test data * remove greatest * cast unix_timestamp to signed * Use statement_timestamp instead of clock_timestamp --------- Co-authored-by: Diego Augusto Molina --- pkg/storage/unified/sql/backend.go | 92 ++++++++--------- pkg/storage/unified/sql/backend_test.go | 99 +++++++++---------- .../unified/sql/data/resource_version_get.sql | 3 +- .../sql/data/resource_version_insert.sql | 2 +- ...on_inc.sql => resource_version_update.sql} | 2 +- pkg/storage/unified/sql/queries.go | 31 ++++-- pkg/storage/unified/sql/queries_test.go | 48 ++++++--- .../unified/sql/sqltemplate/dialect.go | 3 + .../unified/sql/sqltemplate/dialect_mysql.go | 4 + .../sql/sqltemplate/dialect_postgresql.go | 4 + .../unified/sql/sqltemplate/dialect_sqlite.go | 5 + .../sql/sqltemplate/mocks/SQLTemplateIface.go | 45 +++++++++ .../sql/sqltemplate/mocks/WithResults.go | 45 +++++++++ .../unified/sql/test/integration_test.go | 92 ++++++++++------- ...ry_insert-insert into resource_history.sql | 8 +- ...resource_read-without_resource_version.sql | 8 +- .../mysql--resource_update-single path.sql | 8 +- ...ysql--resource_version_get-single path.sql | 7 +- ...version_inc-increment resource version.sql | 7 -- ...l--resource_version_insert-single path.sql | 2 +- ...sion_update-increment resource version.sql | 7 ++ ...ry_insert-insert into resource_history.sql | 8 +- ...resource_read-without_resource_version.sql | 8 +- .../postgres--resource_update-single path.sql | 8 +- ...gres--resource_version_get-single path.sql | 7 +- ...version_inc-increment resource version.sql | 7 -- ...s--resource_version_insert-single path.sql | 2 +- ...sion_update-increment resource version.sql | 7 ++ ...ry_insert-insert into resource_history.sql | 8 +- ...resource_read-without_resource_version.sql | 8 +- .../sqlite--resource_update-single path.sql | 8 +- ...lite--resource_version_get-single path.sql | 7 +- ...version_inc-increment resource version.sql | 7 -- ...e--resource_version_insert-single path.sql | 2 +- ...sion_update-increment resource version.sql | 7 ++ 35 files changed, 391 insertions(+), 225 deletions(-) rename pkg/storage/unified/sql/data/{resource_version_inc.sql => resource_version_update.sql} (71%) delete mode 100755 pkg/storage/unified/sql/testdata/mysql--resource_version_inc-increment resource version.sql create mode 100755 pkg/storage/unified/sql/testdata/mysql--resource_version_update-increment resource version.sql delete mode 100755 pkg/storage/unified/sql/testdata/postgres--resource_version_inc-increment resource version.sql create mode 100755 pkg/storage/unified/sql/testdata/postgres--resource_version_update-increment resource version.sql delete mode 100755 pkg/storage/unified/sql/testdata/sqlite--resource_version_inc-increment resource version.sql create mode 100755 pkg/storage/unified/sql/testdata/sqlite--resource_version_update-increment resource version.sql diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 556c860bbb7..950814ba9d8 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -593,12 +593,12 @@ func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) { // fetchLatestRV returns the current maximum RV in the resource table func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) { - res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionRequest{ - SQLTemplate: sqltemplate.New(d), - Group: group, - Resource: resource, - ReadOnly: true, - resourceVersion: new(resourceVersion), + res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ + SQLTemplate: sqltemplate.New(d), + Group: group, + Resource: resource, + ReadOnly: true, + Response: new(resourceVersionResponse), }) if errors.Is(err, sql.ErrNoRows) { return 1, nil @@ -611,7 +611,6 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec func (b *backend) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"poll") defer span.End() - var records []*historyPollResponse err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error @@ -658,53 +657,58 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, return nextRV, nil } -// resourceVersionAtomicInc atomically increases the version of a kind within a -// transaction. +// resourceVersionAtomicInc atomically increases the version of a kind within a transaction. // TODO: Ideally we should attempt to update the RV in the resource and resource_history tables // in a single roundtrip. This would reduce the latency of the operation, and also increase the // throughput of the system. This is a good candidate for a future optimization. func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) { - // TODO: refactor this code to run in a multi-statement transaction in order to minimize the number of round trips. - // 1 Lock the row for update - rv, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionRequest{ - SQLTemplate: sqltemplate.New(d), - Group: key.Group, - Resource: key.Resource, - resourceVersion: new(resourceVersion), - }) - - if errors.Is(err, sql.ErrNoRows) { - // if there wasn't a row associated with the given resource, we create one with - // version 2 to match the etcd behavior. - if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{ - SQLTemplate: sqltemplate.New(d), - Group: key.Group, - Resource: key.Resource, - resourceVersion: &resourceVersion{1}, - }); err != nil { - return 0, fmt.Errorf("insert into resource_version: %w", err) - } - return 2, nil - } - - if err != nil { - return 0, fmt.Errorf("get current resource version: %w", err) - } - nextRV := rv.ResourceVersion + 1 - - // 2. Increment the resource version - _, err = dbutil.Exec(ctx, x, sqlResourceVersionInc, sqlResourceVersionRequest{ + // 1. Lock to row and prevent concurrent updates until the transaction is committed. + res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ SQLTemplate: sqltemplate.New(d), Group: key.Group, Resource: key.Resource, - resourceVersion: &resourceVersion{ - ResourceVersion: nextRV, - }, + + Response: new(resourceVersionResponse), ReadOnly: false, // This locks the row for update + }) + + if errors.Is(err, sql.ErrNoRows) { + // if there wasn't a row associated with the given resource, then we create it. + if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + }); err != nil { + return 0, fmt.Errorf("insert into resource_version: %w", err) + } + res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + Response: new(resourceVersionResponse), + ReadOnly: true, // This locks the row for update + }) + if err != nil { + return 0, fmt.Errorf("fetching RV after read") + } + return res.ResourceVersion, nil + } else if err != nil { + return 0, fmt.Errorf("lock the resource version: %w", err) + } + + // 2. Update the RV + // Most times, the RV is the current microsecond timestamp generated on the sql server (to avoid clock skew). + // In rare occasion, the server clock might go back in time. In those cases, we simply increment the + // previous RV until the clock catches up. + nextRV := max(res.CurrentEpoch, res.ResourceVersion+1) + + _, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + ResourceVersion: nextRV, }) if err != nil { return 0, fmt.Errorf("increase resource version: %w", err) } - - // 3. Return the incremented value return nextRV, nil } diff --git a/pkg/storage/unified/sql/backend_test.go b/pkg/storage/unified/sql/backend_test.go index 33b7bab7d6a..0d96d73896d 100644 --- a/pkg/storage/unified/sql/backend_test.go +++ b/pkg/storage/unified/sql/backend_test.go @@ -36,8 +36,8 @@ type testBackend struct { test.TestDBProvider } -func (b testBackend) ExecWithResult(expectedSQL string) { - b.SQLMock.ExpectExec(expectedSQL).WillReturnResult(sqlmock.NewResult(0, 0)) +func (b testBackend) ExecWithResult(expectedSQL string, lastInsertID int64, rowsAffected int64) { + b.SQLMock.ExpectExec(expectedSQL).WillReturnResult(sqlmock.NewResult(lastInsertID, rowsAffected)) } func (b testBackend) ExecWithErr(expectedSQL string, err error) { @@ -204,8 +204,8 @@ func TestBackend_IsHealthy(t *testing.T) { // expectSuccessfulResourceVersionAtomicInc sets up expectations for calling // resourceVersionAtomicInc, where the returned RV will be 1. func expectSuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend) { - b.QueryWithResult("select resource_version for update", 0, nil) - b.ExecWithResult("insert resource_version") + b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}}) + b.ExecWithResult("update resource_version set resource_version", 0, 0) } // expectUnsuccessfulResourceVersionAtomicInc sets up expectations for calling @@ -227,7 +227,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(23456), v) }) t.Run("happy path - update existing row", func(t *testing.T) { @@ -235,24 +235,23 @@ func TestResourceVersionAtomicInc(t *testing.T) { b, ctx := setupBackendTest(t) - b.QueryWithResult("select resource_version for update", 1, Rows{{2}}) - b.ExecWithResult("update resource_version") + b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}}) + b.ExecWithResult("update resource_version", 0, 1) v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) require.NoError(t, err) - require.Equal(t, int64(3), v) + require.Equal(t, int64(23456), v) }) t.Run("error getting current version", func(t *testing.T) { t.Parallel() b, ctx := setupBackendTest(t) - b.QueryWithErr("select resource_version for update", errTest) v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) require.Zero(t, v) require.Error(t, err) - require.ErrorContains(t, err, "get current resource version") + require.ErrorContains(t, err, "lock the resource version") }) t.Run("error inserting new row", func(t *testing.T) { @@ -260,7 +259,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { b, ctx := setupBackendTest(t) - b.QueryWithResult("select resource_version for update", 0, nil) + b.QueryWithResult("select resource_version", 0, Rows{}) b.ExecWithErr("insert resource_version", errTest) v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) @@ -273,7 +272,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { t.Parallel() b, ctx := setupBackendTest(t) - b.QueryWithResult("select resource_version for update", 1, Rows{{2}}) + b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}}) b.ExecWithErr("update resource_version", errTest) v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) @@ -295,16 +294,16 @@ func TestBackend_create(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("insert resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("insert resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 - b.ExecWithResult("update resource_history") - b.ExecWithResult("update resource") + b.ExecWithResult("update resource_history", 0, 1) + b.ExecWithResult("update resource", 0, 1) b.SQLMock.ExpectCommit() v, err := b.create(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(23456), v) }) t.Run("error inserting into resource", func(t *testing.T) { @@ -326,7 +325,7 @@ func TestBackend_create(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("insert resource") + b.ExecWithResult("insert resource", 0, 1) b.ExecWithErr("insert resource_history", errTest) b.SQLMock.ExpectRollback() @@ -341,8 +340,8 @@ func TestBackend_create(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("insert resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("insert resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest) b.SQLMock.ExpectRollback() @@ -357,8 +356,8 @@ func TestBackend_create(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("insert resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("insert resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) b.ExecWithErr("update resource_history", errTest) b.SQLMock.ExpectRollback() @@ -366,7 +365,7 @@ func TestBackend_create(t *testing.T) { v, err := b.create(ctx, event) require.Zero(t, v) require.Error(t, err) - require.ErrorContains(t, err, "update resource_history") + require.ErrorContains(t, err, "update resource_history", 0, 1) }) t.Run("error updating resource", func(t *testing.T) { @@ -374,10 +373,10 @@ func TestBackend_create(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("insert resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("insert resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) - b.ExecWithResult("update resource_history") + b.ExecWithResult("update resource_history", 0, 1) b.ExecWithErr("update resource", errTest) b.SQLMock.ExpectRollback() @@ -400,16 +399,16 @@ func TestBackend_update(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("update resource") - b.ExecWithResult("insert resource_history") - expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 - b.ExecWithResult("update resource_history") - b.ExecWithResult("update resource") + b.ExecWithResult("update resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) + expectSuccessfulResourceVersionAtomicInc(t, b) + b.ExecWithResult("update resource_history", 0, 1) + b.ExecWithResult("update resource", 0, 1) b.SQLMock.ExpectCommit() v, err := b.update(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(23456), v) }) t.Run("error in first update to resource", func(t *testing.T) { @@ -431,7 +430,7 @@ func TestBackend_update(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("update resource") + b.ExecWithResult("update resource", 0, 1) b.ExecWithErr("insert resource_history", errTest) b.SQLMock.ExpectRollback() @@ -446,8 +445,8 @@ func TestBackend_update(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("update resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("update resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest) b.SQLMock.ExpectRollback() @@ -462,8 +461,8 @@ func TestBackend_update(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("update resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("update resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 b.ExecWithErr("update resource_history", errTest) b.SQLMock.ExpectRollback() @@ -479,10 +478,10 @@ func TestBackend_update(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("update resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("update resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 - b.ExecWithResult("update resource_history") + b.ExecWithResult("update resource_history", 0, 1) b.ExecWithErr("update resource", errTest) b.SQLMock.ExpectRollback() @@ -505,15 +504,15 @@ func TestBackend_delete(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("delete resource") - b.ExecWithResult("insert resource_history") - expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 - b.ExecWithResult("update resource_history") + b.ExecWithResult("delete resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) + expectSuccessfulResourceVersionAtomicInc(t, b) + b.ExecWithResult("update resource_history", 0, 1) b.SQLMock.ExpectCommit() v, err := b.delete(ctx, event) require.NoError(t, err) - require.Equal(t, int64(2), v) + require.Equal(t, int64(23456), v) }) t.Run("error deleting resource", func(t *testing.T) { @@ -535,7 +534,7 @@ func TestBackend_delete(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("delete resource") + b.ExecWithResult("delete resource", 0, 1) b.ExecWithErr("insert resource_history", errTest) b.SQLMock.ExpectCommit() @@ -550,8 +549,8 @@ func TestBackend_delete(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("delete resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("delete resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest) b.SQLMock.ExpectCommit() @@ -566,8 +565,8 @@ func TestBackend_delete(t *testing.T) { b, ctx := setupBackendTest(t) b.SQLMock.ExpectBegin() - b.ExecWithResult("delete resource") - b.ExecWithResult("insert resource_history") + b.ExecWithResult("delete resource", 0, 1) + b.ExecWithResult("insert resource_history", 0, 1) expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 b.ExecWithErr("update resource_history", errTest) b.SQLMock.ExpectCommit() diff --git a/pkg/storage/unified/sql/data/resource_version_get.sql b/pkg/storage/unified/sql/data/resource_version_get.sql index e63f0bb34de..8228179e50e 100644 --- a/pkg/storage/unified/sql/data/resource_version_get.sql +++ b/pkg/storage/unified/sql/data/resource_version_get.sql @@ -1,5 +1,6 @@ SELECT - {{ .Ident "resource_version" | .Into .ResourceVersion }} + {{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, + {{ .CurrentEpoch | .Into .Response.CurrentEpoch }} FROM {{ .Ident "resource_version" }} WHERE 1 = 1 AND {{ .Ident "group" }} = {{ .Arg .Group }} diff --git a/pkg/storage/unified/sql/data/resource_version_insert.sql b/pkg/storage/unified/sql/data/resource_version_insert.sql index 6c3aab0dcd4..d1ee612d9a0 100644 --- a/pkg/storage/unified/sql/data/resource_version_insert.sql +++ b/pkg/storage/unified/sql/data/resource_version_insert.sql @@ -8,6 +8,6 @@ INSERT INTO {{ .Ident "resource_version" }} VALUES ( {{ .Arg .Group }}, {{ .Arg .Resource }}, - 2 + {{ .CurrentEpoch }} ) ; diff --git a/pkg/storage/unified/sql/data/resource_version_inc.sql b/pkg/storage/unified/sql/data/resource_version_update.sql similarity index 71% rename from pkg/storage/unified/sql/data/resource_version_inc.sql rename to pkg/storage/unified/sql/data/resource_version_update.sql index e7bf52fd1eb..8cfec671aab 100644 --- a/pkg/storage/unified/sql/data/resource_version_inc.sql +++ b/pkg/storage/unified/sql/data/resource_version_update.sql @@ -1,6 +1,6 @@ UPDATE {{ .Ident "resource_version" }} SET - {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion}} + {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }} WHERE 1 = 1 AND {{ .Ident "group" }} = {{ .Arg .Group }} AND {{ .Ident "resource" }} = {{ .Arg .Resource }} diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index dc31b7726c4..999d001a7e9 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -41,7 +41,7 @@ var ( // sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql") sqlResourceVersionGet = mustTemplate("resource_version_get.sql") - sqlResourceVersionInc = mustTemplate("resource_version_inc.sql") + sqlResourceVersionUpdate = mustTemplate("resource_version_update.sql") sqlResourceVersionInsert = mustTemplate("resource_version_insert.sql") sqlResourceVersionList = mustTemplate("resource_version_list.sql") ) @@ -191,8 +191,13 @@ func (r sqlResourceUpdateRVRequest) Validate() error { } // resource_version table requests. -type resourceVersion struct { +type resourceVersionResponse struct { ResourceVersion int64 + CurrentEpoch int64 +} + +func (r *resourceVersionResponse) Results() (*resourceVersionResponse, error) { + return r, nil } type groupResourceVersion struct { @@ -200,20 +205,32 @@ type groupResourceVersion struct { ResourceVersion int64 } -func (r *resourceVersion) Results() (*resourceVersion, error) { - return r, nil +type sqlResourceVersionUpsertRequest struct { + sqltemplate.SQLTemplate + Group, Resource string + ResourceVersion int64 } -type sqlResourceVersionRequest struct { +func (r sqlResourceVersionUpsertRequest) Validate() error { + return nil // TODO +} + +type sqlResourceVersionGetRequest struct { sqltemplate.SQLTemplate Group, Resource string ReadOnly bool - *resourceVersion + Response *resourceVersionResponse } -func (r sqlResourceVersionRequest) Validate() error { +func (r sqlResourceVersionGetRequest) Validate() error { return nil // TODO } +func (r sqlResourceVersionGetRequest) Results() (*resourceVersionResponse, error) { + return &resourceVersionResponse{ + ResourceVersion: r.Response.ResourceVersion, + CurrentEpoch: r.Response.CurrentEpoch, + }, nil +} type sqlResourceVersionListRequest struct { sqltemplate.SQLTemplate diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index df7ed9167f7..5e2557af8c4 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -52,7 +52,12 @@ func TestUnifiedStorageQueries(t *testing.T) { Data: &sqlResourceRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), WriteEvent: resource.WriteEvent{ - Key: &resource.ResourceKey{}, + Key: &resource.ResourceKey{ + Namespace: "nn", + Group: "gg", + Resource: "rr", + Name: "name", + }, }, }, }, @@ -63,7 +68,12 @@ func TestUnifiedStorageQueries(t *testing.T) { Data: &sqlResourceReadRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), Request: &resource.ReadRequest{ - Key: &resource.ResourceKey{}, + Key: &resource.ResourceKey{ + Namespace: "nn", + Group: "gg", + Resource: "rr", + Name: "name", + }, }, readResponse: new(readResponse), }, @@ -155,7 +165,12 @@ func TestUnifiedStorageQueries(t *testing.T) { Data: &sqlResourceRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), WriteEvent: resource.WriteEvent{ - Key: &resource.ResourceKey{}, + Key: &resource.ResourceKey{ + Namespace: "nn", + Group: "gg", + Resource: "rr", + Name: "name", + }, PreviousRV: 1234, }, }, @@ -165,22 +180,24 @@ func TestUnifiedStorageQueries(t *testing.T) { sqlResourceVersionGet: { { Name: "single path", - Data: &sqlResourceVersionRequest{ - SQLTemplate: mocks.NewTestingSQLTemplate(), - resourceVersion: new(resourceVersion), - ReadOnly: false, + Data: &sqlResourceVersionGetRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + Resource: "resource", + Group: "group", + Response: new(resourceVersionResponse), + ReadOnly: false, }, }, }, - sqlResourceVersionInc: { + sqlResourceVersionUpdate: { { Name: "increment resource version", - Data: &sqlResourceVersionRequest{ - SQLTemplate: mocks.NewTestingSQLTemplate(), - resourceVersion: &resourceVersion{ - ResourceVersion: 123, - }, + Data: &sqlResourceVersionUpsertRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + Resource: "resource", + Group: "group", + ResourceVersion: int64(12354), }, }, }, @@ -188,8 +205,9 @@ func TestUnifiedStorageQueries(t *testing.T) { sqlResourceVersionInsert: { { Name: "single path", - Data: &sqlResourceVersionRequest{ - SQLTemplate: mocks.NewTestingSQLTemplate(), + Data: &sqlResourceVersionUpsertRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + ResourceVersion: int64(12354), }, }, }, diff --git a/pkg/storage/unified/sql/sqltemplate/dialect.go b/pkg/storage/unified/sql/sqltemplate/dialect.go index 5806c38b33e..918545fdb25 100644 --- a/pkg/storage/unified/sql/sqltemplate/dialect.go +++ b/pkg/storage/unified/sql/sqltemplate/dialect.go @@ -62,6 +62,9 @@ type Dialect interface { // WHERE id = ? // {{ .SelectFor "Update NoWait" }}; -- will be uppercased SelectFor(...string) (string, error) + + // CurrentEpoch returns the current epoch value for the database in microseconds. + CurrentEpoch() string } // RowLockingClause represents a row-locking clause in a SELECT statement. diff --git a/pkg/storage/unified/sql/sqltemplate/dialect_mysql.go b/pkg/storage/unified/sql/sqltemplate/dialect_mysql.go index 6101667fa51..14fd18e0456 100644 --- a/pkg/storage/unified/sql/sqltemplate/dialect_mysql.go +++ b/pkg/storage/unified/sql/sqltemplate/dialect_mysql.go @@ -33,3 +33,7 @@ func (backtickIdent) Ident(s string) (string, error) { return s }) } + +func (mysql) CurrentEpoch() string { + return "CAST(FLOOR(UNIX_TIMESTAMP(NOW(6)) * 1000000) AS SIGNED)" +} diff --git a/pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go b/pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go index 39e8603ae0b..a0dd76010eb 100644 --- a/pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go +++ b/pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go @@ -35,3 +35,7 @@ func (p postgresql) Ident(s string) (string, error) { return p.standardIdent.Ident(s) } + +func (postgresql) CurrentEpoch() string { + return "(EXTRACT(EPOCH FROM statement_timestamp()) * 1000000)::BIGINT" +} diff --git a/pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go b/pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go index 8a41a8f2c4d..f84ea78c477 100644 --- a/pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go +++ b/pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go @@ -16,3 +16,8 @@ type sqlite struct { argPlaceholderFunc name } + +func (sqlite) CurrentEpoch() string { + // Alternative approaches like `unixepoch('subsecond') * 1000000` returns millisecond precision. + return "CAST((julianday('now') - 2440587.5) * 86400000000.0 AS BIGINT)" +} diff --git a/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go b/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go index d714c54e46d..86cbb32fd5b 100644 --- a/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go +++ b/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go @@ -171,6 +171,51 @@ func (_c *SQLTemplate_ArgPlaceholder_Call) RunAndReturn(run func(int) string) *S return _c } +// CurrentEpoch provides a mock function with given fields: +func (_m *SQLTemplate) CurrentEpoch() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CurrentEpoch") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// SQLTemplate_CurrentEpoch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CurrentEpoch' +type SQLTemplate_CurrentEpoch_Call struct { + *mock.Call +} + +// CurrentEpoch is a helper method to define mock.On call +func (_e *SQLTemplate_Expecter) CurrentEpoch() *SQLTemplate_CurrentEpoch_Call { + return &SQLTemplate_CurrentEpoch_Call{Call: _e.mock.On("CurrentEpoch")} +} + +func (_c *SQLTemplate_CurrentEpoch_Call) Run(run func()) *SQLTemplate_CurrentEpoch_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *SQLTemplate_CurrentEpoch_Call) Return(_a0 string) *SQLTemplate_CurrentEpoch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *SQLTemplate_CurrentEpoch_Call) RunAndReturn(run func() string) *SQLTemplate_CurrentEpoch_Call { + _c.Call.Return(run) + return _c +} + // DialectName provides a mock function with given fields: func (_m *SQLTemplate) DialectName() string { ret := _m.Called() diff --git a/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go b/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go index ea9dd58baf2..50f71da1e28 100644 --- a/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go +++ b/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go @@ -171,6 +171,51 @@ func (_c *WithResults_ArgPlaceholder_Call[T]) RunAndReturn(run func(int) string) return _c } +// CurrentEpoch provides a mock function with given fields: +func (_m *WithResults[T]) CurrentEpoch() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CurrentEpoch") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// WithResults_CurrentEpoch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CurrentEpoch' +type WithResults_CurrentEpoch_Call[T interface{}] struct { + *mock.Call +} + +// CurrentEpoch is a helper method to define mock.On call +func (_e *WithResults_Expecter[T]) CurrentEpoch() *WithResults_CurrentEpoch_Call[T] { + return &WithResults_CurrentEpoch_Call[T]{Call: _e.mock.On("CurrentEpoch")} +} + +func (_c *WithResults_CurrentEpoch_Call[T]) Run(run func()) *WithResults_CurrentEpoch_Call[T] { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *WithResults_CurrentEpoch_Call[T]) Return(_a0 string) *WithResults_CurrentEpoch_Call[T] { + _c.Call.Return(_a0) + return _c +} + +func (_c *WithResults_CurrentEpoch_Call[T]) RunAndReturn(run func() string) *WithResults_CurrentEpoch_Call[T] { + _c.Call.Return(run) + return _c +} + // DialectName provides a mock function with given fields: func (_m *WithResults[T]) DialectName() string { ret := _m.Called() diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index 6b54d131f95..d30210f583c 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -57,7 +57,9 @@ func newServer(t *testing.T) (sql.Backend, resource.ResourceServer) { } func TestIntegrationBackendHappyPath(t *testing.T) { - t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + if infraDB.IsTestDbSQLite() { + t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + } if testing.Short() { t.Skip("skipping integration test") } @@ -75,47 +77,48 @@ func TestIntegrationBackendHappyPath(t *testing.T) { stream, err := backend.WatchWriteEvents(context.Background()) // Using a different context to avoid canceling the stream after the DefaultContextTimeout require.NoError(t, err) + var rv1, rv2, rv3, rv4, rv5 int64 t.Run("Add 3 resources", func(t *testing.T) { - rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) + rv1, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) require.NoError(t, err) - require.Equal(t, int64(1), rv) + require.Greater(t, rv1, int64(0)) - rv, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) + rv2, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) require.NoError(t, err) - require.Equal(t, int64(2), rv) + require.Greater(t, rv2, rv1) - rv, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) + rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) require.NoError(t, err) - require.Equal(t, int64(3), rv) + require.Greater(t, rv3, rv2) }) t.Run("Update item2", func(t *testing.T) { - rv, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) + rv4, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) require.NoError(t, err) - require.Equal(t, int64(4), rv) + require.Greater(t, rv4, rv3) }) t.Run("Delete item1", func(t *testing.T) { - rv, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED) + rv5, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED) require.NoError(t, err) - require.Equal(t, int64(5), rv) + require.Greater(t, rv5, rv4) }) t.Run("Read latest item 2", func(t *testing.T) { resp := backend.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")}) require.Nil(t, resp.Error) - require.Equal(t, int64(4), resp.ResourceVersion) + require.Equal(t, rv4, resp.ResourceVersion) require.Equal(t, "item2 MODIFIED", string(resp.Value)) }) t.Run("Read early version of item2", func(t *testing.T) { resp := backend.ReadResource(ctx, &resource.ReadRequest{ Key: resourceKey("item2"), - ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4 + ResourceVersion: rv3, // item2 was created at rv2 and updated at rv4 }) require.Nil(t, resp.Error) - require.Equal(t, int64(2), resp.ResourceVersion) + require.Equal(t, rv2, resp.ResourceVersion) require.Equal(t, "item2 ADDED", string(resp.Value)) }) @@ -134,38 +137,40 @@ func TestIntegrationBackendHappyPath(t *testing.T) { require.Len(t, resp.Items, 2) require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value)) require.Equal(t, "item3 ADDED", string(resp.Items[1].Value)) - require.Equal(t, int64(5), resp.ResourceVersion) + require.Equal(t, rv5, resp.ResourceVersion) }) t.Run("Watch events", func(t *testing.T) { event := <-stream require.Equal(t, "item1", event.Key.Name) - require.Equal(t, int64(1), event.ResourceVersion) + require.Equal(t, rv1, event.ResourceVersion) require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream require.Equal(t, "item2", event.Key.Name) - require.Equal(t, int64(2), event.ResourceVersion) + require.Equal(t, rv2, event.ResourceVersion) require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream require.Equal(t, "item3", event.Key.Name) - require.Equal(t, int64(3), event.ResourceVersion) + require.Equal(t, rv3, event.ResourceVersion) require.Equal(t, resource.WatchEvent_ADDED, event.Type) event = <-stream require.Equal(t, "item2", event.Key.Name) - require.Equal(t, int64(4), event.ResourceVersion) + require.Equal(t, rv4, event.ResourceVersion) require.Equal(t, resource.WatchEvent_MODIFIED, event.Type) event = <-stream require.Equal(t, "item1", event.Key.Name) - require.Equal(t, int64(5), event.ResourceVersion) + require.Equal(t, rv5, event.ResourceVersion) require.Equal(t, resource.WatchEvent_DELETED, event.Type) }) } func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) { - t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + if infraDB.IsTestDbSQLite() { + t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + } if testing.Short() { t.Skip("skipping integration test") } @@ -188,7 +193,9 @@ func TestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T) { } func TestIntegrationBackendList(t *testing.T) { - t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + if infraDB.IsTestDbSQLite() { + t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + } if testing.Short() { t.Skip("skipping integration test") } @@ -197,14 +204,23 @@ func TestIntegrationBackendList(t *testing.T) { backend, server := newServer(t) // Create a few resources before starting the watch - _, _ = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) // rv=1 - _, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6 - _, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7 - _, _ = writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED) // rv=4 - _, _ = writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED) // rv=5 - _, _ = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) // rv=6 - _, _ = writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED) // rv=7 - _, _ = writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED) // rv=8 + rv1, _ := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED) + require.Greater(t, rv1, int64(0)) + rv2, _ := writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6 + require.Greater(t, rv2, rv1) + rv3, _ := writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7 + require.Greater(t, rv3, rv2) + rv4, _ := writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED) + require.Greater(t, rv4, rv3) + rv5, _ := writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED) + require.Greater(t, rv5, rv4) + rv6, _ := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED) + require.Greater(t, rv6, rv5) + rv7, _ := writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED) + require.Greater(t, rv7, rv6) + rv8, _ := writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED) + require.Greater(t, rv8, rv7) + t.Run("fetch all latest", func(t *testing.T) { res, err := server.List(ctx, &resource.ListRequest{ Options: &resource.ListOptions{ @@ -245,12 +261,12 @@ func TestIntegrationBackendList(t *testing.T) { require.Equal(t, "item1 ADDED", string(res.Items[0].Value)) require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) require.Equal(t, "item4 ADDED", string(res.Items[2].Value)) - require.Equal(t, int64(8), continueToken.ResourceVersion) + require.Equal(t, rv8, continueToken.ResourceVersion) }) t.Run("list at revision", func(t *testing.T) { res, err := server.List(ctx, &resource.ListRequest{ - ResourceVersion: 4, + ResourceVersion: rv4, Options: &resource.ListOptions{ Key: &resource.ResourceKey{ Group: "group", @@ -271,7 +287,7 @@ func TestIntegrationBackendList(t *testing.T) { t.Run("fetch first page at revision with limit", func(t *testing.T) { res, err := server.List(ctx, &resource.ListRequest{ Limit: 3, - ResourceVersion: 7, + ResourceVersion: rv7, Options: &resource.ListOptions{ Key: &resource.ResourceKey{ Group: "group", @@ -290,12 +306,12 @@ func TestIntegrationBackendList(t *testing.T) { continueToken, err := sql.GetContinueToken(res.NextPageToken) require.NoError(t, err) - require.Equal(t, int64(7), continueToken.ResourceVersion) + require.Equal(t, rv7, continueToken.ResourceVersion) }) t.Run("fetch second page at revision", func(t *testing.T) { continueToken := &sql.ContinueToken{ - ResourceVersion: 8, + ResourceVersion: rv8, StartOffset: 2, } res, err := server.List(ctx, &resource.ListRequest{ @@ -317,12 +333,14 @@ func TestIntegrationBackendList(t *testing.T) { continueToken, err = sql.GetContinueToken(res.NextPageToken) require.NoError(t, err) - require.Equal(t, int64(8), continueToken.ResourceVersion) + require.Equal(t, rv8, continueToken.ResourceVersion) require.Equal(t, int64(4), continueToken.StartOffset) }) } func TestClientServer(t *testing.T) { - t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + if infraDB.IsTestDbSQLite() { + t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this") + } ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second)) dbstore := infraDB.InitTestDB(t) diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql index d76132ae625..27f85926301 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_insert-insert into resource_history.sql @@ -11,10 +11,10 @@ INSERT INTO `resource_history` ) VALUES ( '', - '', - '', - '', - '', + 'gg', + 'rr', + 'nn', + 'name', 1234, '[]', 'UNKNOWN' diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_read-without_resource_version.sql b/pkg/storage/unified/sql/testdata/mysql--resource_read-without_resource_version.sql index 27530d7792d..10a19336dcd 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_read-without_resource_version.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_read-without_resource_version.sql @@ -3,8 +3,8 @@ SELECT `value` FROM `resource` WHERE 1 = 1 - AND `namespace` = '' - AND `group` = '' - AND `resource` = '' - AND `name` = '' + AND `namespace` = 'nn' + AND `group` = 'gg' + AND `resource` = 'rr' + AND `name` = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_update-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_update-single path.sql index f3a173f5a03..34582a97d87 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_update-single path.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_update-single path.sql @@ -4,8 +4,8 @@ UPDATE `resource` `value` = '[]', `action` = 'UNKNOWN' WHERE 1 = 1 - AND `group` = '' - AND `resource` = '' - AND `namespace` = '' - AND `name` = '' + AND `group` = 'gg' + AND `resource` = 'rr' + AND `namespace` = 'nn' + AND `name` = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_version_get-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_version_get-single path.sql index 15678ba3e7b..69d7075bd3d 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_version_get-single path.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_version_get-single path.sql @@ -1,8 +1,9 @@ SELECT - `resource_version` + `resource_version`, + CAST(FLOOR(UNIX_TIMESTAMP(NOW(6)) * 1000000) AS SIGNED) FROM `resource_version` WHERE 1 = 1 - AND `group` = '' - AND `resource` = '' + AND `group` = 'group' + AND `resource` = 'resource' FOR UPDATE ; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_version_inc-increment resource version.sql b/pkg/storage/unified/sql/testdata/mysql--resource_version_inc-increment resource version.sql deleted file mode 100755 index 2a737b37014..00000000000 --- a/pkg/storage/unified/sql/testdata/mysql--resource_version_inc-increment resource version.sql +++ /dev/null @@ -1,7 +0,0 @@ -UPDATE `resource_version` -SET - `resource_version` = 123 -WHERE 1 = 1 - AND `group` = '' - AND `resource` = '' -; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql index f99b2b00148..432520706ae 100755 --- a/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/mysql--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO `resource_version` VALUES ( '', '', - 2 + CAST(FLOOR(UNIX_TIMESTAMP(NOW(6)) * 1000000) AS SIGNED) ) ; diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_version_update-increment resource version.sql b/pkg/storage/unified/sql/testdata/mysql--resource_version_update-increment resource version.sql new file mode 100755 index 00000000000..610283edb9f --- /dev/null +++ b/pkg/storage/unified/sql/testdata/mysql--resource_version_update-increment resource version.sql @@ -0,0 +1,7 @@ +UPDATE `resource_version` +SET + `resource_version` = 12354 +WHERE 1 = 1 + AND `group` = 'group' + AND `resource` = 'resource' +; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql index a15a8db4b1e..c7db6c75739 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_insert-insert into resource_history.sql @@ -11,10 +11,10 @@ INSERT INTO "resource_history" ) VALUES ( '', - '', - '', - '', - '', + 'gg', + 'rr', + 'nn', + 'name', 1234, '[]', 'UNKNOWN' diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_read-without_resource_version.sql b/pkg/storage/unified/sql/testdata/postgres--resource_read-without_resource_version.sql index e010d010dbc..5b8879b6c6a 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_read-without_resource_version.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_read-without_resource_version.sql @@ -3,8 +3,8 @@ SELECT "value" FROM "resource" WHERE 1 = 1 - AND "namespace" = '' - AND "group" = '' - AND "resource" = '' - AND "name" = '' + AND "namespace" = 'nn' + AND "group" = 'gg' + AND "resource" = 'rr' + AND "name" = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_update-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_update-single path.sql index c0b2f77ce6c..4febc55d530 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_update-single path.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_update-single path.sql @@ -4,8 +4,8 @@ UPDATE "resource" "value" = '[]', "action" = 'UNKNOWN' WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' - AND "namespace" = '' - AND "name" = '' + AND "group" = 'gg' + AND "resource" = 'rr' + AND "namespace" = 'nn' + AND "name" = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_version_get-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_version_get-single path.sql index d6e8f041e5a..bec77d7b02e 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_version_get-single path.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_version_get-single path.sql @@ -1,8 +1,9 @@ SELECT - "resource_version" + "resource_version", + (EXTRACT(EPOCH FROM statement_timestamp()) * 1000000)::BIGINT FROM "resource_version" WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' + AND "group" = 'group' + AND "resource" = 'resource' FOR UPDATE ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_version_inc-increment resource version.sql b/pkg/storage/unified/sql/testdata/postgres--resource_version_inc-increment resource version.sql deleted file mode 100755 index a6f7c024e25..00000000000 --- a/pkg/storage/unified/sql/testdata/postgres--resource_version_inc-increment resource version.sql +++ /dev/null @@ -1,7 +0,0 @@ -UPDATE "resource_version" -SET - "resource_version" = 123 -WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' -; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql index 14b25955585..42392fee7d0 100755 --- a/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/postgres--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO "resource_version" VALUES ( '', '', - 2 + (EXTRACT(EPOCH FROM statement_timestamp()) * 1000000)::BIGINT ) ; diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_version_update-increment resource version.sql b/pkg/storage/unified/sql/testdata/postgres--resource_version_update-increment resource version.sql new file mode 100755 index 00000000000..9077af0ba47 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/postgres--resource_version_update-increment resource version.sql @@ -0,0 +1,7 @@ +UPDATE "resource_version" +SET + "resource_version" = 12354 +WHERE 1 = 1 + AND "group" = 'group' + AND "resource" = 'resource' +; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql index a15a8db4b1e..c7db6c75739 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_insert-insert into resource_history.sql @@ -11,10 +11,10 @@ INSERT INTO "resource_history" ) VALUES ( '', - '', - '', - '', - '', + 'gg', + 'rr', + 'nn', + 'name', 1234, '[]', 'UNKNOWN' diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_read-without_resource_version.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_read-without_resource_version.sql index e010d010dbc..5b8879b6c6a 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_read-without_resource_version.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_read-without_resource_version.sql @@ -3,8 +3,8 @@ SELECT "value" FROM "resource" WHERE 1 = 1 - AND "namespace" = '' - AND "group" = '' - AND "resource" = '' - AND "name" = '' + AND "namespace" = 'nn' + AND "group" = 'gg' + AND "resource" = 'rr' + AND "name" = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_update-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_update-single path.sql index c0b2f77ce6c..4febc55d530 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_update-single path.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_update-single path.sql @@ -4,8 +4,8 @@ UPDATE "resource" "value" = '[]', "action" = 'UNKNOWN' WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' - AND "namespace" = '' - AND "name" = '' + AND "group" = 'gg' + AND "resource" = 'rr' + AND "namespace" = 'nn' + AND "name" = 'name' ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_version_get-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_version_get-single path.sql index 071a12f8677..9443791d642 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_version_get-single path.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_version_get-single path.sql @@ -1,7 +1,8 @@ SELECT - "resource_version" + "resource_version", + CAST((julianday('now') - 2440587.5) * 86400000000.0 AS BIGINT) FROM "resource_version" WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' + AND "group" = 'group' + AND "resource" = 'resource' ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_version_inc-increment resource version.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_version_inc-increment resource version.sql deleted file mode 100755 index a6f7c024e25..00000000000 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_version_inc-increment resource version.sql +++ /dev/null @@ -1,7 +0,0 @@ -UPDATE "resource_version" -SET - "resource_version" = 123 -WHERE 1 = 1 - AND "group" = '' - AND "resource" = '' -; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql index 14b25955585..e58e7094a3d 100755 --- a/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_version_insert-single path.sql @@ -7,6 +7,6 @@ INSERT INTO "resource_version" VALUES ( '', '', - 2 + CAST((julianday('now') - 2440587.5) * 86400000000.0 AS BIGINT) ) ; diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_version_update-increment resource version.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_version_update-increment resource version.sql new file mode 100755 index 00000000000..9077af0ba47 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_version_update-increment resource version.sql @@ -0,0 +1,7 @@ +UPDATE "resource_version" +SET + "resource_version" = 12354 +WHERE 1 = 1 + AND "group" = 'group' + AND "resource" = 'resource' +;