mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 07:02:12 +08:00
710 lines
22 KiB
Go
710 lines
22 KiB
Go
package test
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-jose/go-jose/v3/jwt"
|
|
"github.com/stretchr/testify/require"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
|
|
"github.com/grafana/authlib/authn"
|
|
"github.com/grafana/authlib/types"
|
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
|
"github.com/grafana/grafana/pkg/util/testutil"
|
|
)
|
|
|
|
// Test names for the storage backend test suite
|
|
const (
|
|
TestHappyPath = "happy path"
|
|
TestWatchWriteEvents = "watch write events from latest"
|
|
TestList = "list"
|
|
TestBlobSupport = "blob support"
|
|
TestGetResourceStats = "get resource stats"
|
|
TestListHistory = "list history"
|
|
)
|
|
|
|
type NewBackendFunc func(ctx context.Context) resource.StorageBackend
|
|
|
|
// TestOptions configures which tests to run
|
|
type TestOptions struct {
|
|
SkipTests map[string]bool // tests to skip
|
|
|
|
}
|
|
|
|
// RunStorageBackendTest runs the storage backend test suite
|
|
func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOptions) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test")
|
|
}
|
|
|
|
if opts == nil {
|
|
opts = &TestOptions{}
|
|
}
|
|
|
|
cases := []struct {
|
|
name string
|
|
fn func(*testing.T, resource.StorageBackend)
|
|
}{
|
|
{TestHappyPath, runTestIntegrationBackendHappyPath},
|
|
{TestWatchWriteEvents, runTestIntegrationBackendWatchWriteEvents},
|
|
{TestList, runTestIntegrationBackendList},
|
|
{TestBlobSupport, runTestIntegrationBlobSupport},
|
|
{TestGetResourceStats, runTestIntegrationBackendGetResourceStats},
|
|
{TestListHistory, runTestIntegrationBackendListHistory},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
if shouldSkip := opts.SkipTests[tc.name]; shouldSkip {
|
|
t.Logf("Skipping test: %s", tc.name)
|
|
continue
|
|
}
|
|
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
tc.fn(t, newBackend(context.Background()))
|
|
})
|
|
}
|
|
}
|
|
|
|
func runTestIntegrationBackendHappyPath(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := types.WithAuthInfo(context.Background(), authn.NewAccessTokenAuthInfo(authn.Claims[authn.AccessTokenClaims]{
|
|
Claims: jwt.Claims{
|
|
Subject: "testuser",
|
|
},
|
|
Rest: authn.AccessTokenClaims{},
|
|
}))
|
|
|
|
server := newServer(t, backend)
|
|
|
|
stream, err := backend.WatchWriteEvents(context.Background()) // Using a different context to avoid canceling the stream after the DefaultContextTimeout
|
|
require.NoError(t, err)
|
|
var rv1, rv2, rv3, rv4, rv5 int64
|
|
|
|
t.Run("Add 3 resources", func(t *testing.T) {
|
|
rv1, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv1, int64(0))
|
|
|
|
rv2, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv2, rv1)
|
|
|
|
rv3, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv3, rv2)
|
|
})
|
|
|
|
t.Run("Update item2", func(t *testing.T) {
|
|
rv4, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv4, rv3)
|
|
})
|
|
|
|
t.Run("Delete item1", func(t *testing.T) {
|
|
rv5, err = writeEvent(ctx, backend, "item1", resource.WatchEvent_DELETED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv5, rv4)
|
|
})
|
|
|
|
t.Run("Read latest item 2", func(t *testing.T) {
|
|
resp := backend.ReadResource(ctx, &resource.ReadRequest{Key: resourceKey("item2")})
|
|
require.Nil(t, resp.Error)
|
|
require.Equal(t, rv4, resp.ResourceVersion)
|
|
require.Equal(t, "item2 MODIFIED", string(resp.Value))
|
|
require.Equal(t, "folderuid", resp.Folder)
|
|
})
|
|
|
|
t.Run("Read early version of item2", func(t *testing.T) {
|
|
resp := backend.ReadResource(ctx, &resource.ReadRequest{
|
|
Key: resourceKey("item2"),
|
|
ResourceVersion: rv3, // item2 was created at rv2 and updated at rv4
|
|
})
|
|
require.Nil(t, resp.Error)
|
|
require.Equal(t, rv2, resp.ResourceVersion)
|
|
require.Equal(t, "item2 ADDED", string(resp.Value))
|
|
})
|
|
|
|
t.Run("List latest", func(t *testing.T) {
|
|
resp, err := server.List(ctx, &resource.ListRequest{
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Namespace: "namespace",
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, resp.Error)
|
|
require.Len(t, resp.Items, 2)
|
|
require.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value))
|
|
require.Equal(t, "item3 ADDED", string(resp.Items[1].Value))
|
|
require.GreaterOrEqual(t, resp.ResourceVersion, rv5) // rv5 is the latest resource version
|
|
})
|
|
|
|
t.Run("Watch events", func(t *testing.T) {
|
|
event := <-stream
|
|
require.Equal(t, "item1", event.Key.Name)
|
|
require.Equal(t, rv1, event.ResourceVersion)
|
|
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
|
require.Equal(t, "folderuid", event.Folder)
|
|
|
|
event = <-stream
|
|
require.Equal(t, "item2", event.Key.Name)
|
|
require.Equal(t, rv2, event.ResourceVersion)
|
|
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
|
require.Equal(t, "folderuid", event.Folder)
|
|
|
|
event = <-stream
|
|
require.Equal(t, "item3", event.Key.Name)
|
|
require.Equal(t, rv3, event.ResourceVersion)
|
|
require.Equal(t, resource.WatchEvent_ADDED, event.Type)
|
|
|
|
event = <-stream
|
|
require.Equal(t, "item2", event.Key.Name)
|
|
require.Equal(t, rv4, event.ResourceVersion)
|
|
require.Equal(t, resource.WatchEvent_MODIFIED, event.Type)
|
|
|
|
event = <-stream
|
|
require.Equal(t, "item1", event.Key.Name)
|
|
require.Equal(t, rv5, event.ResourceVersion)
|
|
require.Equal(t, resource.WatchEvent_DELETED, event.Type)
|
|
})
|
|
}
|
|
|
|
func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
|
|
|
|
sortFunc := func(a, b resource.ResourceStats) int {
|
|
if a.Namespace != b.Namespace {
|
|
return strings.Compare(a.Namespace, b.Namespace)
|
|
}
|
|
if a.Group != b.Group {
|
|
return strings.Compare(a.Group, b.Group)
|
|
}
|
|
return strings.Compare(a.Resource, b.Resource)
|
|
}
|
|
// Create resources across different namespaces/groups
|
|
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED,
|
|
WithNamespace("ns1"),
|
|
WithGroup("group"),
|
|
WithResource("resource1"))
|
|
require.NoError(t, err)
|
|
|
|
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED,
|
|
WithNamespace("ns1"),
|
|
WithGroup("group"),
|
|
WithResource("resource1"))
|
|
require.NoError(t, err)
|
|
|
|
_, err = writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED,
|
|
WithNamespace("ns1"),
|
|
WithGroup("group"),
|
|
WithResource("resource2"))
|
|
require.NoError(t, err)
|
|
|
|
_, err = writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED,
|
|
WithNamespace("ns2"),
|
|
WithGroup("group"),
|
|
WithResource("resource1"))
|
|
require.NoError(t, err)
|
|
|
|
_, err = writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED,
|
|
WithNamespace("ns2"),
|
|
WithGroup("group"),
|
|
WithResource("resource1"))
|
|
require.NoError(t, err)
|
|
|
|
t.Run("Get stats for ns1", func(t *testing.T) {
|
|
stats, err := backend.GetResourceStats(ctx, "ns1", 0)
|
|
require.NoError(t, err)
|
|
require.Len(t, stats, 2)
|
|
|
|
// Sort results for consistent testing
|
|
slices.SortFunc(stats, sortFunc)
|
|
|
|
// Check first resource stats
|
|
require.Equal(t, "ns1", stats[0].Namespace)
|
|
require.Equal(t, "group", stats[0].Group)
|
|
require.Equal(t, "resource1", stats[0].Resource)
|
|
require.Equal(t, int64(2), stats[0].Count)
|
|
require.Greater(t, stats[0].ResourceVersion, int64(0))
|
|
|
|
// Check second resource stats
|
|
require.Equal(t, "ns1", stats[1].Namespace)
|
|
require.Equal(t, "group", stats[1].Group)
|
|
require.Equal(t, "resource2", stats[1].Resource)
|
|
require.Equal(t, int64(1), stats[1].Count)
|
|
require.Greater(t, stats[1].ResourceVersion, int64(0))
|
|
})
|
|
|
|
t.Run("Get stats for ns2", func(t *testing.T) {
|
|
stats, err := backend.GetResourceStats(ctx, "ns2", 0)
|
|
require.NoError(t, err)
|
|
require.Len(t, stats, 1)
|
|
|
|
require.Equal(t, "ns2", stats[0].Namespace)
|
|
require.Equal(t, "group", stats[0].Group)
|
|
require.Equal(t, "resource1", stats[0].Resource)
|
|
require.Equal(t, int64(2), stats[0].Count)
|
|
require.Greater(t, stats[0].ResourceVersion, int64(0))
|
|
})
|
|
|
|
t.Run("Get stats with minimum count", func(t *testing.T) {
|
|
stats, err := backend.GetResourceStats(ctx, "ns1", 1)
|
|
require.NoError(t, err)
|
|
require.Len(t, stats, 1)
|
|
|
|
require.Equal(t, "ns1", stats[0].Namespace)
|
|
require.Equal(t, "group", stats[0].Group)
|
|
require.Equal(t, "resource1", stats[0].Resource)
|
|
require.Equal(t, int64(2), stats[0].Count)
|
|
})
|
|
|
|
t.Run("Get stats for non-existent namespace", func(t *testing.T) {
|
|
stats, err := backend.GetResourceStats(ctx, "non-existent", 0)
|
|
require.NoError(t, err)
|
|
require.Empty(t, stats)
|
|
})
|
|
}
|
|
|
|
func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
|
|
|
|
// Create a few resources before initing the watch
|
|
_, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
|
|
// Start the watch
|
|
stream, err := backend.WatchWriteEvents(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Create one more event
|
|
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "item2", (<-stream).Key.Name)
|
|
|
|
// Should close the stream
|
|
ctx.Cancel()
|
|
|
|
_, ok := <-stream
|
|
require.False(t, ok)
|
|
}
|
|
|
|
func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
|
|
server := newServer(t, backend)
|
|
|
|
// Create a few resources before starting the watch
|
|
rv1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv1, int64(0))
|
|
rv2, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv2, rv1)
|
|
rv3, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv3, rv2)
|
|
rv4, err := writeEvent(ctx, backend, "item4", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv4, rv3)
|
|
rv5, err := writeEvent(ctx, backend, "item5", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv5, rv4)
|
|
rv6, err := writeEvent(ctx, backend, "item2", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv6, rv5)
|
|
rv7, err := writeEvent(ctx, backend, "item3", resource.WatchEvent_DELETED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv7, rv6)
|
|
rv8, err := writeEvent(ctx, backend, "item6", resource.WatchEvent_ADDED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rv8, rv7)
|
|
|
|
t.Run("fetch all latest", func(t *testing.T) {
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 5)
|
|
// should be sorted by key ASC
|
|
require.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
|
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
|
require.Equal(t, "item4 ADDED", string(res.Items[2].Value))
|
|
require.Equal(t, "item5 ADDED", string(res.Items[3].Value))
|
|
require.Equal(t, "item6 ADDED", string(res.Items[4].Value))
|
|
|
|
require.Empty(t, res.NextPageToken)
|
|
})
|
|
|
|
t.Run("list latest first page ", func(t *testing.T) {
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
Limit: 3,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 3)
|
|
continueToken, err := resource.GetContinueToken(res.NextPageToken)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
|
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
|
require.Equal(t, "item4 ADDED", string(res.Items[2].Value))
|
|
require.GreaterOrEqual(t, continueToken.ResourceVersion, rv8)
|
|
})
|
|
|
|
t.Run("list at revision", func(t *testing.T) {
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
ResourceVersion: rv4,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 4)
|
|
require.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
|
require.Equal(t, "item2 ADDED", string(res.Items[1].Value))
|
|
require.Equal(t, "item3 ADDED", string(res.Items[2].Value))
|
|
require.Equal(t, "item4 ADDED", string(res.Items[3].Value))
|
|
require.Empty(t, res.NextPageToken)
|
|
})
|
|
|
|
t.Run("fetch first page at revision with limit", func(t *testing.T) {
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
Limit: 3,
|
|
ResourceVersion: rv7,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 3)
|
|
t.Log(res.Items)
|
|
require.Equal(t, "item1 ADDED", string(res.Items[0].Value))
|
|
require.Equal(t, "item2 MODIFIED", string(res.Items[1].Value))
|
|
require.Equal(t, "item4 ADDED", string(res.Items[2].Value))
|
|
|
|
continueToken, err := resource.GetContinueToken(res.NextPageToken)
|
|
require.NoError(t, err)
|
|
require.Equal(t, rv7, continueToken.ResourceVersion)
|
|
})
|
|
|
|
t.Run("fetch second page at revision", func(t *testing.T) {
|
|
continueToken := &resource.ContinueToken{
|
|
ResourceVersion: rv8,
|
|
StartOffset: 2,
|
|
}
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
NextPageToken: continueToken.String(),
|
|
Limit: 2,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Group: "group",
|
|
Resource: "resource",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 2)
|
|
t.Log(res.Items)
|
|
require.Equal(t, "item4 ADDED", string(res.Items[0].Value))
|
|
require.Equal(t, "item5 ADDED", string(res.Items[1].Value))
|
|
|
|
continueToken, err = resource.GetContinueToken(res.NextPageToken)
|
|
require.NoError(t, err)
|
|
require.Equal(t, rv8, continueToken.ResourceVersion)
|
|
require.Equal(t, int64(4), continueToken.StartOffset)
|
|
})
|
|
}
|
|
|
|
func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
|
|
server := newServer(t, backend)
|
|
|
|
rv1, _ := writeEvent(ctx, backend, "item1", resource.WatchEvent_ADDED)
|
|
require.Greater(t, rv1, int64(0))
|
|
|
|
// add 5 events for item1 - should be saved to history
|
|
rvHistory1, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rvHistory1, rv1)
|
|
rvHistory2, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rvHistory2, rvHistory1)
|
|
rvHistory3, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rvHistory3, rvHistory2)
|
|
rvHistory4, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rvHistory4, rvHistory3)
|
|
rvHistory5, err := writeEvent(ctx, backend, "item1", resource.WatchEvent_MODIFIED)
|
|
require.NoError(t, err)
|
|
require.Greater(t, rvHistory5, rvHistory4)
|
|
|
|
t.Run("fetch first history page at revision with limit", func(t *testing.T) {
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
Limit: 3,
|
|
Source: resource.ListRequest_HISTORY,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Namespace: "namespace",
|
|
Group: "group",
|
|
Resource: "resource",
|
|
Name: "item1",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 3)
|
|
t.Log(res.Items)
|
|
// should be in desc order, so the newest RVs are returned first
|
|
require.Equal(t, "item1 MODIFIED", string(res.Items[0].Value))
|
|
require.Equal(t, rvHistory5, res.Items[0].ResourceVersion)
|
|
require.Equal(t, "item1 MODIFIED", string(res.Items[1].Value))
|
|
require.Equal(t, rvHistory4, res.Items[1].ResourceVersion)
|
|
require.Equal(t, "item1 MODIFIED", string(res.Items[2].Value))
|
|
require.Equal(t, rvHistory3, res.Items[2].ResourceVersion)
|
|
|
|
continueToken, err := resource.GetContinueToken(res.NextPageToken)
|
|
require.NoError(t, err)
|
|
// should return the furthest back RV as the next page token
|
|
require.Equal(t, rvHistory3, continueToken.ResourceVersion)
|
|
})
|
|
|
|
t.Run("fetch second page of history at revision", func(t *testing.T) {
|
|
continueToken := &resource.ContinueToken{
|
|
ResourceVersion: rvHistory3,
|
|
StartOffset: 2,
|
|
}
|
|
res, err := server.List(ctx, &resource.ListRequest{
|
|
NextPageToken: continueToken.String(),
|
|
Limit: 2,
|
|
Source: resource.ListRequest_HISTORY,
|
|
Options: &resource.ListOptions{
|
|
Key: &resource.ResourceKey{
|
|
Namespace: "namespace",
|
|
Group: "group",
|
|
Resource: "resource",
|
|
Name: "item1",
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, res.Error)
|
|
require.Len(t, res.Items, 2)
|
|
t.Log(res.Items)
|
|
require.Equal(t, "item1 MODIFIED", string(res.Items[0].Value))
|
|
require.Equal(t, rvHistory2, res.Items[0].ResourceVersion)
|
|
require.Equal(t, "item1 MODIFIED", string(res.Items[1].Value))
|
|
require.Equal(t, rvHistory1, res.Items[1].ResourceVersion)
|
|
})
|
|
}
|
|
|
|
func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend) {
|
|
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))
|
|
server := newServer(t, backend)
|
|
store, ok := backend.(resource.BlobSupport)
|
|
require.True(t, ok)
|
|
|
|
t.Run("put and fetch blob", func(t *testing.T) {
|
|
key := &resource.ResourceKey{
|
|
Namespace: "ns",
|
|
Group: "g",
|
|
Resource: "r",
|
|
Name: "n",
|
|
}
|
|
|
|
b1, err := server.PutBlob(ctx, &resource.PutBlobRequest{
|
|
Resource: key,
|
|
Method: resource.PutBlobRequest_GRPC,
|
|
ContentType: "plain/text",
|
|
Value: []byte("hello 11111"),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, b1.Error)
|
|
require.Equal(t, "c894ae57bd227b8f8c63f38a2ddf458b", b1.Hash)
|
|
|
|
b2, err := server.PutBlob(ctx, &resource.PutBlobRequest{
|
|
Resource: key,
|
|
Method: resource.PutBlobRequest_GRPC,
|
|
ContentType: "plain/text",
|
|
Value: []byte("hello 22222"), // the most recent
|
|
})
|
|
require.NoError(t, err)
|
|
require.Nil(t, b2.Error)
|
|
require.Equal(t, "b0da48de4ff92e0ad0d836de4d746937", b2.Hash)
|
|
|
|
// Check that we can still access both values
|
|
found, err := store.GetResourceBlob(ctx, key, &utils.BlobInfo{UID: b1.Uid}, true)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("hello 11111"), found.Value)
|
|
|
|
found, err = store.GetResourceBlob(ctx, key, &utils.BlobInfo{UID: b2.Uid}, true)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("hello 22222"), found.Value)
|
|
|
|
// Save a resource with annotation
|
|
obj := &unstructured.Unstructured{}
|
|
meta, err := utils.MetaAccessor(obj)
|
|
require.NoError(t, err)
|
|
meta.SetBlob(&utils.BlobInfo{UID: b2.Uid, Hash: b1.Hash})
|
|
meta.SetName(key.Name)
|
|
meta.SetNamespace(key.Namespace)
|
|
obj.SetAPIVersion(key.Group + "/v1")
|
|
obj.SetKind("Test")
|
|
val, err := obj.MarshalJSON()
|
|
require.NoError(t, err)
|
|
out, err := server.Create(ctx, &resource.CreateRequest{Key: key, Value: val})
|
|
require.NoError(t, err)
|
|
require.Nil(t, out.Error)
|
|
require.True(t, out.ResourceVersion > 0)
|
|
|
|
// The server (not store!) will lookup the saved annotation and return the correct payload
|
|
res, err := server.GetBlob(ctx, &resource.GetBlobRequest{Resource: key})
|
|
require.NoError(t, err)
|
|
require.Nil(t, out.Error)
|
|
require.Equal(t, "hello 22222", string(res.Value))
|
|
|
|
// But we can still get an older version with an explicit UID
|
|
res, err = server.GetBlob(ctx, &resource.GetBlobRequest{Resource: key, Uid: b1.Uid})
|
|
require.NoError(t, err)
|
|
require.Nil(t, out.Error)
|
|
require.Equal(t, "hello 11111", string(res.Value))
|
|
})
|
|
}
|
|
|
|
// WriteEventOption is a function that modifies WriteEventOptions
|
|
type WriteEventOption func(*WriteEventOptions)
|
|
|
|
// WithNamespace sets the namespace for the write event
|
|
func WithNamespace(namespace string) WriteEventOption {
|
|
return func(o *WriteEventOptions) {
|
|
o.Namespace = namespace
|
|
}
|
|
}
|
|
|
|
// WithGroup sets the group for the write event
|
|
func WithGroup(group string) WriteEventOption {
|
|
return func(o *WriteEventOptions) {
|
|
o.Group = group
|
|
}
|
|
}
|
|
|
|
// WithResource sets the resource for the write event
|
|
func WithResource(resource string) WriteEventOption {
|
|
return func(o *WriteEventOptions) {
|
|
o.Resource = resource
|
|
}
|
|
}
|
|
|
|
// WithFolder sets the folder for the write event
|
|
func WithFolder(folder string) WriteEventOption {
|
|
return func(o *WriteEventOptions) {
|
|
o.Folder = folder
|
|
}
|
|
}
|
|
|
|
// WithValue sets the value for the write event
|
|
func WithValue(value []byte) WriteEventOption {
|
|
return func(o *WriteEventOptions) {
|
|
o.Value = value
|
|
}
|
|
}
|
|
|
|
type WriteEventOptions struct {
|
|
Namespace string
|
|
Group string
|
|
Resource string
|
|
Folder string
|
|
Value []byte
|
|
}
|
|
|
|
func writeEvent(ctx context.Context, store resource.StorageBackend, name string, action resource.WatchEvent_Type, opts ...WriteEventOption) (int64, error) {
|
|
// Default options
|
|
options := WriteEventOptions{
|
|
Namespace: "namespace",
|
|
Group: "group",
|
|
Resource: "resource",
|
|
Folder: "folderuid",
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range opts {
|
|
opt(&options)
|
|
}
|
|
|
|
// Set default value if not provided
|
|
if options.Value == nil {
|
|
options.Value = []byte(name + " " + resource.WatchEvent_Type_name[int32(action)])
|
|
}
|
|
|
|
res := &unstructured.Unstructured{
|
|
Object: map[string]any{},
|
|
}
|
|
meta, err := utils.MetaAccessor(res)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
meta.SetFolder(options.Folder)
|
|
|
|
return store.WriteEvent(ctx, resource.WriteEvent{
|
|
Type: action,
|
|
Value: options.Value,
|
|
Key: &resource.ResourceKey{
|
|
Namespace: options.Namespace,
|
|
Group: options.Group,
|
|
Resource: options.Resource,
|
|
Name: name,
|
|
},
|
|
Object: meta,
|
|
})
|
|
}
|
|
|
|
func resourceKey(name string) *resource.ResourceKey {
|
|
return &resource.ResourceKey{
|
|
Namespace: "namespace",
|
|
Group: "group",
|
|
Resource: "resource",
|
|
Name: name,
|
|
}
|
|
}
|
|
|
|
func newServer(t *testing.T, b resource.StorageBackend) resource.ResourceServer {
|
|
t.Helper()
|
|
|
|
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
|
|
Backend: b,
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, server)
|
|
|
|
return server
|
|
}
|