Files
Georges Chaudy 7002ab90ae unistore: save returns a writecloser (#107955)
* unistore: save returns a writecloser

* go-lint

* address comments
2025-07-11 18:25:48 +02:00

540 lines
14 KiB
Go

package test
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util/testutil"
)
// Test names for the KV test suite
const (
TestKVGet = "get operations"
TestKVSave = "save operations"
TestKVDelete = "delete operations"
TestKVKeys = "keys listing"
TestKVKeysWithLimits = "keys with limits and ranges"
TestKVKeysWithSort = "keys with sorting"
TestKVConcurrent = "concurrent operations"
TestKVUnixTimestamp = "unix timestamp"
)
// NewKVFunc is a function that creates a new KV instance for testing
type NewKVFunc func(ctx context.Context) resource.KV
// KVTestOptions configures which tests to run
type KVTestOptions struct {
NSPrefix string // namespace prefix for isolation
}
// GenerateRandomKVPrefix creates a random namespace prefix for test isolation
func GenerateRandomKVPrefix() string {
return fmt.Sprintf("kvtest-%d", time.Now().UnixNano())
}
// RunKVTest runs the KV test suite
func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
if opts == nil {
opts = &KVTestOptions{}
}
if opts.NSPrefix == "" {
opts.NSPrefix = GenerateRandomKVPrefix()
}
t.Logf("Running KV tests with namespace prefix: %s", opts.NSPrefix)
cases := []struct {
name string
fn func(*testing.T, resource.KV, string)
}{
{TestKVGet, runTestKVGet},
{TestKVSave, runTestKVSave},
{TestKVDelete, runTestKVDelete},
{TestKVKeys, runTestKVKeys},
{TestKVKeysWithLimits, runTestKVKeysWithLimits},
{TestKVKeysWithSort, runTestKVKeysWithSort},
{TestKVConcurrent, runTestKVConcurrent},
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc.fn(t, newKV(context.Background()), opts.NSPrefix)
})
}
}
func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-get"
t.Run("get existing key", func(t *testing.T) {
// First save a key
testValue := "test value for get"
saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader(testValue))
// Now get it
reader, err := kv.Get(ctx, section, "existing-key")
require.NoError(t, err)
// Read the value
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, testValue, string(value))
// Close the value reader
err = reader.Close()
require.NoError(t, err)
})
t.Run("get non-existent key", func(t *testing.T) {
_, err := kv.Get(ctx, section, "non-existent-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("get with empty section", func(t *testing.T) {
_, err := kv.Get(ctx, "", "some-key")
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
}
func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-save"
t.Run("save new key", func(t *testing.T) {
testValue := "new test value"
saveKVHelper(t, kv, ctx, section, "new-key", strings.NewReader(testValue))
// Verify it was saved
reader, err := kv.Get(ctx, section, "new-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, testValue, string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("save overwrite existing key", func(t *testing.T) {
// First save
saveKVHelper(t, kv, ctx, section, "overwrite-key", strings.NewReader("old value"))
// Overwrite
newValue := "new value"
saveKVHelper(t, kv, ctx, section, "overwrite-key", strings.NewReader(newValue))
// Verify it was updated
reader, err := kv.Get(ctx, section, "overwrite-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, newValue, string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("save with empty section", func(t *testing.T) {
_, err := kv.Save(ctx, "", "some-key")
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
t.Run("save binary data", func(t *testing.T) {
binaryData := []byte{0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD}
saveKVHelper(t, kv, ctx, section, "binary-key", bytes.NewReader(binaryData))
// Verify binary data
reader, err := kv.Get(ctx, section, "binary-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, binaryData, value)
err = reader.Close()
require.NoError(t, err)
})
t.Run("save key with no data", func(t *testing.T) {
// Save a key with empty data
saveKVHelper(t, kv, ctx, section, "empty-key", strings.NewReader(""))
// Verify it was saved with empty data
reader, err := kv.Get(ctx, section, "empty-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "", string(value))
assert.Len(t, value, 0)
err = reader.Close()
require.NoError(t, err)
})
}
func runTestKVDelete(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-delete"
t.Run("delete existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "delete-key", strings.NewReader("delete me"))
// Verify it exists
_, err := kv.Get(ctx, section, "delete-key")
require.NoError(t, err)
// Delete it
err = kv.Delete(ctx, section, "delete-key")
require.NoError(t, err)
// Verify it's gone
_, err = kv.Get(ctx, section, "delete-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("delete non-existent key", func(t *testing.T) {
err := kv.Delete(ctx, section, "non-existent-delete-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("delete with empty section", func(t *testing.T) {
err := kv.Delete(ctx, "", "some-key")
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
}
func runTestKVKeys(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-keys"
// Setup test data
testKeys := []string{"a1", "a2", "b1", "b2", "c1"}
for _, key := range testKeys {
saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key))
}
t.Run("list all keys", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, testKeys, keys)
})
t.Run("list keys with empty section", func(t *testing.T) {
var keys []string
var errors []error
for k, err := range kv.Keys(ctx, "", resource.ListOptions{}) {
if err != nil {
errors = append(errors, err)
break
}
keys = append(keys, k)
}
assert.Len(t, errors, 1)
assert.Contains(t, errors[0].Error(), "section is required")
assert.Empty(t, keys)
})
t.Run("list keys returns 0 keys", func(t *testing.T) {
// Use a different section with no keys
emptySection := nsPrefix + "-empty-keys"
var keys []string
for k, err := range kv.Keys(ctx, emptySection, resource.ListOptions{}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Empty(t, keys)
assert.Len(t, keys, 0)
})
}
func runTestKVKeysWithLimits(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-keys-limits"
// Setup test data
testKeys := []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}
for _, key := range testKeys {
saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key))
}
t.Run("keys with limit", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{Limit: 3}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"a1", "a2", "b1"}, keys)
})
t.Run("keys with range", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{StartKey: "b", EndKey: "d"}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"b1", "b2", "c1", "c2"}, keys)
})
t.Run("keys with prefix", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{
StartKey: "c",
EndKey: resource.PrefixRangeEnd("c"),
}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"c1", "c2"}, keys)
})
t.Run("keys with limit and range", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{
StartKey: "a",
EndKey: "c",
Limit: 2,
}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"a1", "a2"}, keys)
})
}
func runTestKVKeysWithSort(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-keys-sort"
// Setup test data
testKeys := []string{"a1", "a2", "b1", "b2", "c1"}
for _, key := range testKeys {
saveKVHelper(t, kv, ctx, section, key, strings.NewReader("value"+key))
}
t.Run("keys in ascending order (default)", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{Sort: resource.SortOrderAsc}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"a1", "a2", "b1", "b2", "c1"}, keys)
})
t.Run("keys in descending order", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{Sort: resource.SortOrderDesc}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"c1", "b2", "b1", "a2", "a1"}, keys)
})
t.Run("keys descending with prefix", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{
StartKey: "a",
EndKey: resource.PrefixRangeEnd("a"),
Sort: resource.SortOrderDesc,
}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"a2", "a1"}, keys)
})
t.Run("keys descending with limit", func(t *testing.T) {
var keys []string
for k, err := range kv.Keys(ctx, section, resource.ListOptions{
Sort: resource.SortOrderDesc,
Limit: 3,
}) {
require.NoError(t, err)
keys = append(keys, k)
}
assert.Equal(t, []string{"c1", "b2", "b1"}, keys)
})
}
func runTestKVConcurrent(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(60*time.Second))
section := nsPrefix + "-concurrent"
t.Run("concurrent save and get operations", func(t *testing.T) {
const numGoroutines = 10
const numOperations = 20
done := make(chan error, numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
var err error
defer func() { done <- err }()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("concurrent-key-%d-%d", goroutineID, j)
value := fmt.Sprintf("concurrent-value-%d-%d", goroutineID, j)
// Save
writer, err := kv.Save(ctx, section, key)
if err != nil {
return
}
defer func() {
err := writer.Close()
require.NoError(t, err)
}()
_, err = io.Copy(writer, strings.NewReader(value))
if err != nil {
return
}
err = writer.Close()
if err != nil {
return
}
// Get immediately
reader, err := kv.Get(ctx, section, key)
if err != nil {
return
}
readValue, err := io.ReadAll(reader)
require.NoError(t, err)
err = reader.Close()
require.NoError(t, err)
assert.Equal(t, value, string(readValue))
}
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
err := <-done
require.NoError(t, err)
}
})
t.Run("concurrent save, delete, and list operations", func(t *testing.T) {
const numGoroutines = 5
done := make(chan error, numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
var err error
defer func() { done <- err }()
key := fmt.Sprintf("concurrent-ops-key-%d", goroutineID)
value := fmt.Sprintf("concurrent-ops-value-%d", goroutineID)
// Save
writer, err := kv.Save(ctx, section, key)
if err != nil {
return
}
defer func() {
err := writer.Close()
require.NoError(t, err)
}()
_, err = io.Copy(writer, strings.NewReader(value))
if err != nil {
return
}
err = writer.Close()
if err != nil {
return
}
// List to verify it exists
found := false
for k, err := range kv.Keys(ctx, section, resource.ListOptions{}) {
if err != nil {
return
}
if k == key {
found = true
break
}
}
if !found {
err = fmt.Errorf("key %s not found in list", key)
return
}
// Delete
err = kv.Delete(ctx, section, key)
if err != nil {
return
}
// Verify it's deleted
_, err = kv.Get(ctx, section, key)
require.ErrorIs(t, resource.ErrNotFound, err)
err = nil // Expected error, so clear it
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
err := <-done
require.NoError(t, err)
}
})
}
func runTestKVUnixTimestamp(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
t.Run("unix timestamp returns reasonable value", func(t *testing.T) {
timestamp, err := kv.UnixTimestamp(ctx)
require.NoError(t, err)
now := time.Now().Unix()
// Allow for some time difference (up to 5 seconds)
assert.InDelta(t, now, timestamp, 5)
})
t.Run("unix timestamp is consistent", func(t *testing.T) {
timestamp1, err := kv.UnixTimestamp(ctx)
require.NoError(t, err)
timestamp2, err := kv.UnixTimestamp(ctx)
require.NoError(t, err)
// Should be very close (within 1 second)
require.InDelta(t, timestamp1, timestamp2, 1)
})
}
// saveKVHelper is a helper function to save data to KV store using the new WriteCloser interface
func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, key string, value io.Reader) {
t.Helper()
writer, err := kv.Save(ctx, section, key)
require.NoError(t, err)
_, err = io.Copy(writer, value)
require.NoError(t, err)
err = writer.Close()
require.NoError(t, err)
}