unistore: update spanner ddl to include unistore tables (#102198)

* provision unistore tables

* update the tests to use generated namespace

* update ddl

* update ddl

* clean streams

* missing space

* use engine.db.query
This commit is contained in:
Georges Chaudy
2025-03-18 14:37:11 +01:00
committed by GitHub
parent 8c5a4591fd
commit 0bafd4e99d
6 changed files with 131 additions and 67 deletions

View File

@ -94,3 +94,11 @@ func IsTestDBMSSQL() bool {
return false
}
func IsTestDBSpanner() bool {
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
return db == migrator.Spanner
}
return false
}

View File

@ -16,7 +16,7 @@
"CREATE TABLE `alert_notification_state` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `org_id` INT64 NOT NULL, `alert_id` INT64 NOT NULL, `notifier_id` INT64 NOT NULL, `state` STRING(50) NOT NULL, `version` INT64 NOT NULL, `updated_at` INT64 NOT NULL, `alert_rule_state_updated_version` INT64 NOT NULL) PRIMARY KEY (id)",
"CREATE INDEX `IDX_alert_notification_state_alert_id` ON `alert_notification_state` (alert_id)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_alert_notification_state_org_id_alert_id_notifier_id` ON `alert_notification_state` (org_id, alert_id, notifier_id)",
"CREATE TABLE `alert_rule` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `org_id` INT64 NOT NULL, `title` STRING(190) NOT NULL, `condition` STRING(190) NOT NULL, `data` STRING(MAX), `updated` TIMESTAMP NOT NULL, `interval_seconds` INT64 NOT NULL DEFAULT (60), `version` INT64 NOT NULL DEFAULT (0), `uid` STRING(40) NOT NULL DEFAULT ('0'), `namespace_uid` STRING(40) NOT NULL, `rule_group` STRING(190) NOT NULL, `no_data_state` STRING(15) NOT NULL DEFAULT ('NoData'), `exec_err_state` STRING(15) NOT NULL DEFAULT ('Alerting'), `for` INT64 NOT NULL DEFAULT (0), `annotations` STRING(MAX), `labels` STRING(MAX), `dashboard_uid` STRING(40), `panel_id` INT64, `rule_group_idx` INT64 NOT NULL DEFAULT (1), `is_paused` BOOL NOT NULL DEFAULT (false), `notification_settings` STRING(MAX), `record` STRING(MAX), `metadata` STRING(MAX), `updated_by` STRING(40), `guid` STRING(36) NOT NULL DEFAULT ('')) PRIMARY KEY (id)",
"CREATE TABLE `alert_rule` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `org_id` INT64 NOT NULL, `title` STRING(190) NOT NULL, `condition` STRING(190) NOT NULL, `data` STRING(MAX), `updated` TIMESTAMP NOT NULL, `interval_seconds` INT64 NOT NULL DEFAULT (60), `version` INT64 NOT NULL DEFAULT (0), `uid` STRING(40) NOT NULL DEFAULT ('0'), `namespace_uid` STRING(40) NOT NULL, `rule_group` STRING(190) NOT NULL, `no_data_state` STRING(15) NOT NULL DEFAULT ('NoData'), `exec_err_state` STRING(15) NOT NULL DEFAULT ('Alerting'), `for` INT64 NOT NULL DEFAULT (0), `annotations` STRING(MAX), `labels` STRING(MAX), `dashboard_uid` STRING(40), `panel_id` INT64, `rule_group_idx` INT64 NOT NULL DEFAULT (1), `is_paused` BOOL NOT NULL DEFAULT (false), `notification_settings` STRING(MAX), `record` STRING(MAX), `metadata` STRING(MAX), `updated_by` STRING(40), `guid` STRING(36) NOT NULL DEFAULT (''), `missing_series_evals_to_resolve` INT64) PRIMARY KEY (id)",
"CREATE INDEX `IDX_alert_rule_org_id_dashboard_uid_panel_id` ON `alert_rule` (org_id, dashboard_uid, panel_id)",
"CREATE INDEX `IDX_alert_rule_org_id_namespace_uid_rule_group` ON `alert_rule` (org_id, namespace_uid, rule_group)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_alert_rule_guid` ON `alert_rule` (guid)",
@ -27,7 +27,7 @@
"CREATE TABLE `alert_rule_tag` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `alert_id` INT64 NOT NULL, `tag_id` INT64 NOT NULL) PRIMARY KEY (id)",
"CREATE INDEX `IDX_alert_rule_tag_alert_id` ON `alert_rule_tag` (alert_id)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_alert_rule_tag_alert_id_tag_id` ON `alert_rule_tag` (alert_id, tag_id)",
"CREATE TABLE `alert_rule_version` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `rule_org_id` INT64 NOT NULL, `rule_uid` STRING(40) NOT NULL DEFAULT ('0'), `rule_namespace_uid` STRING(40) NOT NULL, `rule_group` STRING(190) NOT NULL, `parent_version` INT64 NOT NULL, `restored_from` INT64 NOT NULL, `version` INT64 NOT NULL, `created` TIMESTAMP NOT NULL, `title` STRING(190) NOT NULL, `condition` STRING(190) NOT NULL, `data` STRING(MAX), `interval_seconds` INT64 NOT NULL, `no_data_state` STRING(15) NOT NULL DEFAULT ('NoData'), `exec_err_state` STRING(15) NOT NULL DEFAULT ('Alerting'), `for` INT64 NOT NULL DEFAULT (0), `annotations` STRING(MAX), `labels` STRING(MAX), `rule_group_idx` INT64 NOT NULL DEFAULT (1), `is_paused` BOOL NOT NULL DEFAULT (false), `notification_settings` STRING(MAX), `record` STRING(MAX), `metadata` STRING(MAX), `created_by` STRING(40), `rule_guid` STRING(36) NOT NULL DEFAULT ('')) PRIMARY KEY (id)",
"CREATE TABLE `alert_rule_version` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `rule_org_id` INT64 NOT NULL, `rule_uid` STRING(40) NOT NULL DEFAULT ('0'), `rule_namespace_uid` STRING(40) NOT NULL, `rule_group` STRING(190) NOT NULL, `parent_version` INT64 NOT NULL, `restored_from` INT64 NOT NULL, `version` INT64 NOT NULL, `created` TIMESTAMP NOT NULL, `title` STRING(190) NOT NULL, `condition` STRING(190) NOT NULL, `data` STRING(MAX), `interval_seconds` INT64 NOT NULL, `no_data_state` STRING(15) NOT NULL DEFAULT ('NoData'), `exec_err_state` STRING(15) NOT NULL DEFAULT ('Alerting'), `for` INT64 NOT NULL DEFAULT (0), `annotations` STRING(MAX), `labels` STRING(MAX), `rule_group_idx` INT64 NOT NULL DEFAULT (1), `is_paused` BOOL NOT NULL DEFAULT (false), `notification_settings` STRING(MAX), `record` STRING(MAX), `metadata` STRING(MAX), `created_by` STRING(40), `rule_guid` STRING(36) NOT NULL DEFAULT (''), `missing_series_evals_to_resolve` INT64) PRIMARY KEY (id)",
"CREATE INDEX `IDX_alert_rule_version_rule_org_id_rule_namespace_uid_rule_group` ON `alert_rule_version` (rule_org_id, rule_namespace_uid, rule_group)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_alert_rule_version_rule_guid_version` ON `alert_rule_version` (rule_guid, version)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_alert_rule_version_rule_org_id_rule_uid_rule_guid_version` ON `alert_rule_version` (rule_org_id, rule_uid, rule_guid, version)",
@ -253,5 +253,9 @@
"CREATE INDEX `IDX_user_role_user_id` ON `user_role` (user_id)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_user_role_org_id_user_id_role_id_group_mapping_uid` ON `user_role` (org_id, user_id, role_id, group_mapping_uid)",
"CREATE TABLE `user_stats` (`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), `user_id` INT64 NOT NULL, `billing_role` STRING(40) NOT NULL, `created` TIMESTAMP NOT NULL, `updated` TIMESTAMP NOT NULL) PRIMARY KEY (id)",
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_user_stats_user_id` ON `user_stats` (user_id)"
"CREATE UNIQUE NULL_FILTERED INDEX `UQE_user_stats_user_id` ON `user_stats` (user_id)",
"CREATE TABLE resource ( namespace STRING(63), resource_group STRING(190), resource STRING(190), name STRING(253), folder STRING(253), value BYTES(MAX), resource_version TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = true ), previous_resource_version TIMESTAMP, ) PRIMARY KEY (namespace, resource_group, resource, name)",
"CREATE TABLE resource_history ( namespace STRING(63), resource_group STRING(190), resource STRING(190), name STRING(253), folder STRING(253), value BYTES(MAX), resource_version TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = true ), previous_resource_version TIMESTAMP, action INT64, ) PRIMARY KEY (namespace, resource_group, resource, name, resource_version DESC)",
"CREATE TABLE resource_blob ( uid STRING(36) NOT NULL, resource_key STRING(MAX) NOT NULL, content_type STRING(100), value BYTES(MAX), ) PRIMARY KEY (uid)",
"CREATE CHANGE STREAM resource_stream FOR resource"
]

View File

@ -642,6 +642,8 @@
"add index in alert_rule_version table on rule_org_id, rule_uid, rule_guid and version columns",
"add index in alert_rule_version table on rule_guid and version columns",
"add index in alert_rule table on guid columns",
"add missing_series_evals_to_resolve column to alert_rule",
"add missing_series_evals_to_resolve column to alert_rule_version",
"create data_source_usage_by_day table",
"create data_source_usage_by_day(data_source_id) index",
"create data_source_usage_by_day(data_source_id, day) unique index",

View File

@ -173,12 +173,15 @@ func (s *SpannerDialect) CleanDB(engine *xorm.Engine) error {
// Collect all DROP statements.
var statements []string
for _, table := range tables {
// Ignore these tables used by Unified storage.
if table.Name == "resource" || table.Name == "resource_blob" || table.Name == "resource_history" {
continue
changeStreams, err := s.findChangeStreams(engine)
if err != nil {
return err
}
for _, cs := range changeStreams {
statements = append(statements, fmt.Sprintf("DROP CHANGE STREAM `%s`", cs))
}
for _, table := range tables {
// Indexes must be dropped first, otherwise dropping tables fails.
for _, index := range table.Indexes {
if !index.IsRegular {
@ -338,3 +341,24 @@ func SpannerConnectorConfigToClientOptions(connectorConfig spannerdriver.Connect
func (s *SpannerDialect) UnionDistinct() string {
return "UNION DISTINCT"
}
func (s *SpannerDialect) findChangeStreams(engine *xorm.Engine) ([]string, error) {
var result []string
query := `SELECT c.CHANGE_STREAM_NAME
FROM INFORMATION_SCHEMA.CHANGE_STREAMS AS C
WHERE C.CHANGE_STREAM_CATALOG=''
AND C.CHANGE_STREAM_SCHEMA=''`
rows, err := engine.DB().Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, err
}
result = append(result, name)
}
return result, nil
}

View File

@ -31,6 +31,10 @@ func TestMain(m *testing.M) {
// TestStorageBackend is a test for the StorageBackend interface.
func TestIntegrationSQLStorageBackend(t *testing.T) {
if infraDB.IsTestDBSpanner() {
t.Skip("skipping integration test")
}
t.Run("IsHA (polling notifier)", func(t *testing.T) {
unitest.RunStorageBackendTest(t, func(ctx context.Context) resource.StorageBackend {
dbstore := infraDB.InitTestDB(t)
@ -74,6 +78,10 @@ func TestClientServer(t *testing.T) {
if infraDB.IsTestDbSQLite() {
t.Skip("TODO: test blocking, skipping to unblock Enterprise until we fix this")
}
if infraDB.IsTestDBSpanner() {
t.Skip("skipping integration test")
}
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
dbstore := infraDB.InitTestDB(t)

View File

@ -2,12 +2,14 @@ package test
import (
"context"
"fmt"
"slices"
"strings"
"testing"
"time"
"github.com/go-jose/go-jose/v3/jwt"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -33,7 +35,13 @@ type NewBackendFunc func(ctx context.Context) resource.StorageBackend
// TestOptions configures which tests to run
type TestOptions struct {
SkipTests map[string]bool // tests to skip
NSPrefix string // namespace prefix for isolation
}
// GenerateRandomNSPrefix creates a random namespace prefix for test isolation
func GenerateRandomNSPrefix() string {
uid := uuid.New().String()[:10]
return fmt.Sprintf("test-%s", uid)
}
// RunStorageBackendTest runs the storage backend test suite
@ -46,9 +54,15 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
opts = &TestOptions{}
}
if opts.NSPrefix == "" {
opts.NSPrefix = GenerateRandomNSPrefix()
}
t.Logf("Running tests with namespace prefix: %s", opts.NSPrefix)
cases := []struct {
name string
fn func(*testing.T, resource.StorageBackend)
fn func(*testing.T, resource.StorageBackend, string)
}{
{TestHappyPath, runTestIntegrationBackendHappyPath},
{TestWatchWriteEvents, runTestIntegrationBackendWatchWriteEvents},
@ -65,12 +79,12 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
}
t.Run(tc.name, func(t *testing.T) {
tc.fn(t, newBackend(context.Background()))
tc.fn(t, newBackend(context.Background()), opts.NSPrefix)
})
}
}
func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := types.WithAuthInfo(context.Background(), authn.NewAccessTokenAuthInfo(authn.Claims[authn.AccessTokenClaims]{
Claims: jwt.Claims{
Subject: "testuser",
@ -79,39 +93,46 @@ func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBa
}))
server := newServer(t, backend)
ns := nsPrefix + "-ns1"
stream, err := backend.WatchWriteEvents(context.Background()) // Using a different context to avoid canceling the stream after the DefaultContextTimeout
require.NoError(t, err)
var rv1, rv2, rv3, rv4, rv5 int64
t.Run("Add 3 resources", func(t *testing.T) {
rv1, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
rv1, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv1, int64(0))
rv2, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
rv2, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv2, rv1)
rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv3, rv2)
})
t.Run("Update item2", func(t *testing.T) {
rv4, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED)
rv4, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv4, rv3)
})
t.Run("Delete item1", func(t *testing.T) {
rv5, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED)
rv5, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv5, rv4)
})
t.Run("Read latest item 2", func(t *testing.T) {
resp := backend.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
resp := backend.ReadResource(ctx, &resource.ReadRequest{
Key: &resource.ResourceKey{
Name: "item2",
Namespace: ns,
Group: "group",
Resource: "resource",
},
})
require.Nil(t, resp.Error)
require.Equal(t, rv4, resp.ResourceVersion)
require.Equal(t, "item2 MODIFIED", string(resp.Value))
@ -120,7 +141,12 @@ func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBa
t.Run("Read early version of item2", func(t *testing.T) {
resp := backend.ReadResource(ctx, &resource.ReadRequest{
Key: resourceKey("item2"),
Key: &resource.ResourceKey{
Name: "item2",
Namespace: ns,
Group: "group",
Resource: "resource",
},
ResourceVersion: rv3, // item2 was created at rv2 and updated at rv4
})
require.Nil(t, resp.Error)
@ -132,7 +158,7 @@ func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBa
resp, err := server.List(ctx, &resource.ListRequest{
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "namespace",
Namespace: ns,
Group: "group",
Resource: "resource",
},
@ -176,7 +202,7 @@ func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBa
})
}
func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
sortFunc := func(a, b resource.ResourceStats) int {
@ -190,37 +216,37 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
}
// Create resources across different namespaces/groups
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED,
WithNamespace("ns1"),
WithNamespace(nsPrefix+"-ns1"),
WithGroup("group"),
WithResource("resource1"))
require.NoError(t, err)
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED,
WithNamespace("ns1"),
WithNamespace(nsPrefix+"-ns1"),
WithGroup("group"),
WithResource("resource1"))
require.NoError(t, err)
_, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED,
WithNamespace("ns1"),
WithNamespace(nsPrefix+"-ns1"),
WithGroup("group"),
WithResource("resource2"))
require.NoError(t, err)
_, err = writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED,
WithNamespace("ns2"),
WithNamespace(nsPrefix+"-ns2"),
WithGroup("group"),
WithResource("resource1"))
require.NoError(t, err)
_, err = writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED,
WithNamespace("ns2"),
WithNamespace(nsPrefix+"-ns2"),
WithGroup("group"),
WithResource("resource1"))
require.NoError(t, err)
t.Run("Get stats for ns1", func(t *testing.T) {
stats, err := backend.GetResourceStats(ctx, "ns1", 0)
stats, err := backend.GetResourceStats(ctx, nsPrefix+"-ns1", 0)
require.NoError(t, err)
require.Len(t, stats, 2)
@ -228,14 +254,14 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
slices.SortFunc(stats, sortFunc)
// Check first resource stats
require.Equal(t, "ns1", stats[0].Namespace)
require.Equal(t, nsPrefix+"-ns1", stats[0].Namespace)
require.Equal(t, "group", stats[0].Group)
require.Equal(t, "resource1", stats[0].Resource)
require.Equal(t, int64(2), stats[0].Count)
require.Greater(t, stats[0].ResourceVersion, int64(0))
// Check second resource stats
require.Equal(t, "ns1", stats[1].Namespace)
require.Equal(t, nsPrefix+"-ns1", stats[1].Namespace)
require.Equal(t, "group", stats[1].Group)
require.Equal(t, "resource2", stats[1].Resource)
require.Equal(t, int64(1), stats[1].Count)
@ -243,11 +269,11 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
})
t.Run("Get stats for ns2", func(t *testing.T) {
stats, err := backend.GetResourceStats(ctx, "ns2", 0)
stats, err := backend.GetResourceStats(ctx, nsPrefix+"-ns2", 0)
require.NoError(t, err)
require.Len(t, stats, 1)
require.Equal(t, "ns2", stats[0].Namespace)
require.Equal(t, nsPrefix+"-ns2", stats[0].Namespace)
require.Equal(t, "group", stats[0].Group)
require.Equal(t, "resource1", stats[0].Resource)
require.Equal(t, int64(2), stats[0].Count)
@ -255,11 +281,11 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
})
t.Run("Get stats with minimum count", func(t *testing.T) {
stats, err := backend.GetResourceStats(ctx, "ns1", 1)
stats, err := backend.GetResourceStats(ctx, nsPrefix+"-ns1", 1)
require.NoError(t, err)
require.Len(t, stats, 1)
require.Equal(t, "ns1", stats[0].Namespace)
require.Equal(t, nsPrefix+"-ns1", stats[0].Namespace)
require.Equal(t, "group", stats[0].Group)
require.Equal(t, "resource1", stats[0].Resource)
require.Equal(t, int64(2), stats[0].Count)
@ -272,11 +298,11 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
})
}
func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
// Create a few resources before initing the watch
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED, WithNamespace(nsPrefix+"-ns1"))
require.NoError(t, err)
// Start the watch
@ -284,7 +310,7 @@ func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.St
require.NoError(t, err)
// Create one more event
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED, WithNamespace(nsPrefix+"-ns1"))
require.NoError(t, err)
require.Equal(t, "item2", (<-stream).Key.Name)
@ -295,33 +321,33 @@ func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.St
require.False(t, ok)
}
func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
server := newServer(t, backend)
ns := nsPrefix + "-ns1"
// Create a few resources before starting the watch
rv1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
rv1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv1, int64(0))
rv2, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
rv2, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv2, rv1)
rv3, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
rv3, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv3, rv2)
rv4, err := writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED)
rv4, err := writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv4, rv3)
rv5, err := writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED)
rv5, err := writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv5, rv4)
rv6, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED)
rv6, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv6, rv5)
rv7, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED)
rv7, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv7, rv6)
rv8, err := writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED)
rv8, err := writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rv8, rv7)
@ -442,27 +468,27 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend
})
}
func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
server := newServer(t, backend)
rv1, _ := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
ns := nsPrefix + "-ns1"
rv1, _ := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED, WithNamespace(ns))
require.Greater(t, rv1, int64(0))
// add 5 events for item1 - should be saved to history
rvHistory1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
rvHistory1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rvHistory1, rv1)
rvHistory2, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
rvHistory2, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rvHistory2, rvHistory1)
rvHistory3, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
rvHistory3, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rvHistory3, rvHistory2)
rvHistory4, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
rvHistory4, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rvHistory4, rvHistory3)
rvHistory5, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
rvHistory5, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED, WithNamespace(ns))
require.NoError(t, err)
require.Greater(t, rvHistory5, rvHistory4)
@ -472,7 +498,7 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage
Source: resource.ListRequest_HISTORY,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "namespace",
Namespace: ns,
Group: "group",
Resource: "resource",
Name: "item1",
@ -509,7 +535,7 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage
Source: resource.ListRequest_HISTORY,
Options: &resource.ListOptions{
Key: &resource.ResourceKey{
Namespace: "namespace",
Namespace: ns,
Group: "group",
Resource: "resource",
Name: "item1",
@ -527,15 +553,16 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage
})
}
func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
server := newServer(t, backend)
store, ok := backend.(resource.BlobSupport)
require.True(t, ok)
ns := nsPrefix + "-ns1"
t.Run("put and fetch blob", func(t *testing.T) {
key := &resource.ResourceKey{
Namespace: "ns",
Namespace: ns,
Group: "g",
Resource: "r",
Name: "n",
@ -687,15 +714,6 @@ func writeEvent(ctx context.Context, store resource.StorageBackend, name string,
})
}
func resourceKey(name string) *resource.ResourceKey {
return &resource.ResourceKey{
Namespace: "namespace",
Group: "group",
Resource: "resource",
Name: name,
}
}
func newServer(t *testing.T, b resource.StorageBackend) resource.ResourceServer {
t.Helper()