diff --git a/pkg/plugins/ifaces.go b/pkg/plugins/ifaces.go index a919e69303c..48837f61867 100644 --- a/pkg/plugins/ifaces.go +++ b/pkg/plugins/ifaces.go @@ -173,10 +173,10 @@ type SignatureCalculator interface { type KeyStore interface { Get(ctx context.Context, key string) (string, bool, error) - Set(ctx context.Context, key string, value string) error - Del(ctx context.Context, key string) error + Set(ctx context.Context, key string, value any) error + Delete(ctx context.Context, key string) error ListKeys(ctx context.Context) ([]string, error) - GetLastUpdated(ctx context.Context) (*time.Time, error) + GetLastUpdated(ctx context.Context) (time.Time, error) SetLastUpdated(ctx context.Context) error } diff --git a/pkg/services/pluginsintegration/angularpatternsstore/store.go b/pkg/services/pluginsintegration/angularpatternsstore/store.go index 3557d972ade..8debdfed1bf 100644 --- a/pkg/services/pluginsintegration/angularpatternsstore/store.go +++ b/pkg/services/pluginsintegration/angularpatternsstore/store.go @@ -2,74 +2,42 @@ package angularpatternsstore import ( "context" - "encoding/json" - "fmt" "time" "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/services/pluginsintegration/cachekvstore" ) type Service interface { - Get(ctx context.Context) (string, bool, error) - Set(ctx context.Context, patterns any) error GetLastUpdated(ctx context.Context) (time.Time, error) + Get(ctx context.Context) (string, bool, error) + Set(ctx context.Context, value any) error } const ( kvNamespace = "plugin.angularpatterns" - - keyPatterns = "angular_patterns" - keyLastUpdated = "last_updated" + keyPatterns = "angular_patterns" ) // KVStoreService allows to cache GCOM angular patterns into the database, as a cache. type KVStoreService struct { - kv *kvstore.NamespacedKVStore + *cachekvstore.CacheKvStore } +var _ Service = (*KVStoreService)(nil) + func ProvideService(kv kvstore.KVStore) Service { return &KVStoreService{ - kv: kvstore.WithNamespace(kv, 0, kvNamespace), + CacheKvStore: cachekvstore.NewCacheKvStore(kv, kvNamespace), } } -// Get returns the raw cached angular detection patterns. The returned value is a JSON-encoded string. -// If no value is present, the second argument is false and the returned error is nil. +// Get returns the stored angular patterns from the underlying cachekvstore. func (s *KVStoreService) Get(ctx context.Context) (string, bool, error) { - return s.kv.Get(ctx, keyPatterns) + return s.CacheKvStore.Get(ctx, keyPatterns) } -// Set sets the cached angular detection patterns and the latest update time to time.Now(). -// patterns must implement json.Marshaler. -func (s *KVStoreService) Set(ctx context.Context, patterns any) error { - b, err := json.Marshal(patterns) - if err != nil { - return fmt.Errorf("json marshal: %w", err) - } - if err := s.kv.Set(ctx, keyPatterns, string(b)); err != nil { - return fmt.Errorf("kv set: %w", err) - } - if err := s.kv.Set(ctx, keyLastUpdated, time.Now().Format(time.RFC3339)); err != nil { - return fmt.Errorf("kv last updated set: %w", err) - } - return nil -} - -// GetLastUpdated returns the time when Set was last called. If the value cannot be unmarshalled correctly, -// it returns a zero-value time.Time. -func (s *KVStoreService) GetLastUpdated(ctx context.Context) (time.Time, error) { - v, ok, err := s.kv.Get(ctx, keyLastUpdated) - if err != nil { - return time.Time{}, fmt.Errorf("kv get: %w", err) - } - if !ok { - return time.Time{}, nil - } - t, err := time.Parse(time.RFC3339, v) - if err != nil { - // Ignore decode errors, so we can change the format in future versions - // and keep backwards/forwards compatibility - return time.Time{}, nil - } - return t, nil +// Set stores the given angular patterns in the underlying cachekvstore.s +func (s *KVStoreService) Set(ctx context.Context, value any) error { + return s.CacheKvStore.Set(ctx, keyPatterns, value) } diff --git a/pkg/services/pluginsintegration/angularpatternsstore/store_test.go b/pkg/services/pluginsintegration/angularpatternsstore/store_test.go index 9fde56e0f30..fb8c4ea56d0 100644 --- a/pkg/services/pluginsintegration/angularpatternsstore/store_test.go +++ b/pkg/services/pluginsintegration/angularpatternsstore/store_test.go @@ -41,7 +41,8 @@ func TestAngularPatternsStore(t *testing.T) { }) t.Run("latest update", func(t *testing.T) { - svc := ProvideService(kvstore.NewFakeKVStore()) + underlyingKv := kvstore.NewFakeKVStore() + svc := ProvideService(underlyingKv) t.Run("empty", func(t *testing.T) { lastUpdated, err := svc.GetLastUpdated(context.Background()) @@ -59,7 +60,7 @@ func TestAngularPatternsStore(t *testing.T) { }) t.Run("invalid timestamp stored", func(t *testing.T) { - err := svc.(*KVStoreService).kv.Set(context.Background(), keyLastUpdated, "abcd") + err := underlyingKv.Set(context.Background(), 0, kvNamespace, "last_updated", "abcd") require.NoError(t, err) lastUpdated, err := svc.GetLastUpdated(context.Background()) diff --git a/pkg/services/pluginsintegration/cachekvstore/cachekvstore.go b/pkg/services/pluginsintegration/cachekvstore/cachekvstore.go new file mode 100644 index 00000000000..219cd84ea6c --- /dev/null +++ b/pkg/services/pluginsintegration/cachekvstore/cachekvstore.go @@ -0,0 +1,142 @@ +package cachekvstore + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/grafana/grafana/pkg/infra/kvstore" +) + +// keyLastUpdated is the key used to store the last updated time. +const keyLastUpdated = "last_updated" + +// CacheKvStore is a Store that stores data in a *kvstore.NamespacedKVStore. +// It also stores a last updated time, which is unique for all the keys and is updated on each call to `Set`, +// and can be used to determine if the data is stale. +type CacheKvStore struct { + // kv is the underlying KV store. + kv *kvstore.NamespacedKVStore + + // keyPrefix is the prefix to use for all the keys. + keyPrefix string +} + +// NewCacheKvStoreWithPrefix creates a new CacheKvStore using the provided underlying KVStore, namespace and prefix. +func NewCacheKvStoreWithPrefix(kv kvstore.KVStore, namespace, prefix string) *CacheKvStore { + return &CacheKvStore{ + kv: kvstore.WithNamespace(kv, 0, namespace), + keyPrefix: prefix, + } +} + +// NewCacheKvStore creates a new CacheKvStore using the provided underlying KVStore and namespace. +func NewCacheKvStore(kv kvstore.KVStore, namespace string) *CacheKvStore { + return NewCacheKvStoreWithPrefix(kv, namespace, "") +} + +// storeKey returns the key to use in the underlying store for the given key. +func (s *CacheKvStore) storeKey(k string) string { + return s.keyPrefix + k +} + +// Get returns the value for the given key. +// If no value is present, the second argument is false and the returned error is nil. +func (s *CacheKvStore) Get(ctx context.Context, key string) (string, bool, error) { + return s.kv.Get(ctx, s.storeKey(key)) +} + +// Set sets the value for the given key and updates the last updated time. +// It uses the marshal method to marshal the value before storing it. +// This means that the value to store can implement the Marshaler interface to control how it is stored. +func (s *CacheKvStore) Set(ctx context.Context, key string, value any) error { + valueToStore, err := marshal(value) + if err != nil { + return fmt.Errorf("marshal: %w", err) + } + + if err := s.kv.Set(ctx, s.storeKey(key), valueToStore); err != nil { + return fmt.Errorf("kv set: %w", err) + } + if err := s.SetLastUpdated(ctx); err != nil { + return fmt.Errorf("set last updated: %w", err) + } + return nil +} + +// GetLastUpdated returns the last updated time. +// If the last updated time is not set, it returns a zero time. +func (s *CacheKvStore) GetLastUpdated(ctx context.Context) (time.Time, error) { + v, ok, err := s.kv.Get(ctx, keyLastUpdated) + if err != nil { + return time.Time{}, fmt.Errorf("kv get: %w", err) + } + if !ok { + return time.Time{}, nil + } + t, err := time.Parse(time.RFC3339, v) + if err != nil { + // Ignore decode errors, so we can change the format in future versions + // and keep backwards/forwards compatibility + return time.Time{}, nil + } + return t, nil +} + +// SetLastUpdated sets the last updated time to the current time. +// The last updated time is shared between all the keys for this store. +func (s *CacheKvStore) SetLastUpdated(ctx context.Context) error { + return s.kv.Set(ctx, keyLastUpdated, time.Now().Format(time.RFC3339)) +} + +// Delete deletes the value for the given key and it also updates the last updated time. +func (s *CacheKvStore) Delete(ctx context.Context, key string) error { + if err := s.kv.Del(ctx, s.storeKey(key)); err != nil { + return fmt.Errorf("kv del: %w", err) + } + if err := s.SetLastUpdated(ctx); err != nil { + return fmt.Errorf("set last updated: %w", err) + } + return nil +} + +// ListKeys returns all the keys in the store. +func (s *CacheKvStore) ListKeys(ctx context.Context) ([]string, error) { + keys, err := s.kv.Keys(ctx, s.storeKey("")) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return nil, nil + } + res := make([]string, 0, len(keys)-1) + for _, key := range keys { + // Filter out last updated time + if key.Key == keyLastUpdated { + continue + } + res = append(res, key.Key) + } + return res, nil +} + +// marshal marshals the provided value to a string to store it in the kv store. +// The provided value can be of a type implementing fmt.Stringer, a string or []byte. +// If the value is none of those, it is marshaled to JSON. +func marshal(value any) (string, error) { + switch value := value.(type) { + case fmt.Stringer: + return value.String(), nil + case string: + return value, nil + case []byte: + return string(value), nil + default: + b, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("json marshal: %w", err) + } + return string(b), nil + } +} diff --git a/pkg/services/pluginsintegration/cachekvstore/cachekvstore_test.go b/pkg/services/pluginsintegration/cachekvstore/cachekvstore_test.go new file mode 100644 index 00000000000..9f695ff3278 --- /dev/null +++ b/pkg/services/pluginsintegration/cachekvstore/cachekvstore_test.go @@ -0,0 +1,188 @@ +package cachekvstore + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/infra/kvstore" +) + +func TestNamespacedStore(t *testing.T) { + const namespace = "namespace" + + t.Run("simple", func(t *testing.T) { + store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace) + + t.Run("default last updated time is zero", func(t *testing.T) { + ts, err := store.GetLastUpdated(context.Background()) + require.NoError(t, err) + require.Zero(t, ts) + }) + + t.Run("Get returns false if key does not exist", func(t *testing.T) { + _, ok, err := store.Get(context.Background(), "key") + require.NoError(t, err) + require.False(t, ok) + }) + + t.Run("Set sets the value and updates the last updated time", func(t *testing.T) { + ts, err := store.GetLastUpdated(context.Background()) + require.NoError(t, err) + require.Zero(t, ts) + + require.NoError(t, store.Set(context.Background(), "key", "value")) + ts, err = store.GetLastUpdated(context.Background()) + require.NoError(t, err) + require.NotZero(t, ts) + require.WithinDuration(t, ts, time.Now(), time.Second*10) + + v, ok, err := store.Get(context.Background(), "key") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, "value", v) + }) + + t.Run("Delete deletes the value", func(t *testing.T) { + // First store + require.NoError(t, store.Set(context.Background(), "key", "value")) + + // Then read it + v, ok, err := store.Get(context.Background(), "key") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, "value", v) + + // Delete it + require.NoError(t, store.Delete(context.Background(), "key")) + + // Read it again + _, ok, err = store.Get(context.Background(), "key") + require.NoError(t, err) + require.False(t, ok) + }) + + t.Run("sets last updated on delete", func(t *testing.T) { + store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace) + ts, err := store.GetLastUpdated(context.Background()) + require.NoError(t, err) + require.Zero(t, ts) + + require.NoError(t, store.Delete(context.Background(), "key")) + + ts, err = store.GetLastUpdated(context.Background()) + require.NoError(t, err) + require.WithinDuration(t, time.Now(), ts, time.Second*10) + }) + + t.Run("last updated key is used in GetLastUpdated", func(t *testing.T) { + store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace) + + // Set in underlying store + ts := time.Now() + require.NoError(t, store.kv.Set(context.Background(), keyLastUpdated, ts.Format(time.RFC3339))) + + // Make sure we get the same value + storeTs, err := store.GetLastUpdated(context.Background()) + require.NoError(t, err) + // Format to account for marshal/unmarshal differences + require.Equal(t, ts.Format(time.RFC3339), storeTs.Format(time.RFC3339)) + }) + + t.Run("last updated key is used in SetLastUpdated", func(t *testing.T) { + store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace) + require.NoError(t, store.SetLastUpdated(context.Background())) + + marshaledStoreTs, ok, err := store.kv.Get(context.Background(), keyLastUpdated) + require.NoError(t, err) + require.True(t, ok) + storeTs, err := time.Parse(time.RFC3339, marshaledStoreTs) + require.NoError(t, err) + require.WithinDuration(t, time.Now(), storeTs, time.Second*10) + }) + + t.Run("ListKeys", func(t *testing.T) { + t.Run("returns empty list if no keys", func(t *testing.T) { + keys, err := store.ListKeys(context.Background()) + require.NoError(t, err) + require.Empty(t, keys) + }) + + t.Run("returns the keys", func(t *testing.T) { + expectedKeys := make([]string, 0, 10) + for i := 0; i < 10; i++ { + k := fmt.Sprintf("key-%d", i) + err := store.Set(context.Background(), k, fmt.Sprintf("value-%d", i)) + expectedKeys = append(expectedKeys, k) + require.NoError(t, err) + } + + keys, err := store.ListKeys(context.Background()) + require.NoError(t, err) + + sort.Strings(expectedKeys) + sort.Strings(keys) + + require.Equal(t, expectedKeys, keys) + }) + }) + }) + + t.Run("prefix", func(t *testing.T) { + t.Run("no prefix", func(t *testing.T) { + store := NewCacheKvStore(kvstore.NewFakeKVStore(), namespace) + require.Equal(t, "k", store.storeKey("k")) + }) + + t.Run("prefix", func(t *testing.T) { + store := NewCacheKvStoreWithPrefix(kvstore.NewFakeKVStore(), namespace, "my-") + require.Equal(t, "my-k", store.storeKey("k")) + }) + }) +} + +func TestMarshal(t *testing.T) { + t.Run("json", func(t *testing.T) { + // Other type (rather than string, []byte or fmt.Stringer) marshals to JSON. + var value struct { + A string `json:"a"` + B string `json:"b"` + } + expV, err := json.Marshal(value) + require.NoError(t, err) + + v, err := marshal(value) + require.NoError(t, err) + require.Equal(t, string(expV), v) + }) + + t.Run("string", func(t *testing.T) { + v, err := marshal("value") + require.NoError(t, err) + require.Equal(t, "value", v) + }) + + t.Run("stringer", func(t *testing.T) { + var s stringer + v, err := marshal(s) + require.NoError(t, err) + require.Equal(t, s.String(), v) + }) + + t.Run("byte slice", func(t *testing.T) { + v, err := marshal([]byte("value")) + require.NoError(t, err) + require.Equal(t, "value", v) + }) +} + +type stringer struct{} + +func (s stringer) String() string { + return "aaaa" +} diff --git a/pkg/services/pluginsintegration/cachekvstore/doc.go b/pkg/services/pluginsintegration/cachekvstore/doc.go new file mode 100644 index 00000000000..575765868c4 --- /dev/null +++ b/pkg/services/pluginsintegration/cachekvstore/doc.go @@ -0,0 +1,3 @@ +// Package cachekvstore implements a key-value store that also keeps track of the last update time of the store. +// It can be used to cache data that is updated periodically. +package cachekvstore diff --git a/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever.go b/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever.go index ebccb824479..a038caecd0c 100644 --- a/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever.go +++ b/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever.go @@ -101,7 +101,7 @@ func (kr *KeyRetriever) updateKeys(ctx context.Context) error { if err != nil { return err } - if !kr.cfg.PluginForcePublicKeyDownload && time.Since(*lastUpdated) < publicKeySyncInterval { + if !kr.cfg.PluginForcePublicKeyDownload && time.Since(lastUpdated) < publicKeySyncInterval { // Cache is still valid return nil } @@ -170,15 +170,13 @@ func (kr *KeyRetriever) downloadKeys(ctx context.Context) error { // Delete keys that are no longer in the API for _, key := range cachedKeys { if !shouldKeep[key] { - err = kr.kv.Del(ctx, key) + err = kr.kv.Delete(ctx, key) if err != nil { return err } } } - - // Update the last updated timestamp - return kr.kv.SetLastUpdated(ctx) + return nil } func (kr *KeyRetriever) ensureKeys(ctx context.Context) error { diff --git a/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever_test.go b/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever_test.go index d0fe7dfeca2..79288dee477 100644 --- a/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever_test.go +++ b/pkg/services/pluginsintegration/keyretriever/dynamic/dynamic_retriever_test.go @@ -79,7 +79,7 @@ func Test_PublicKeyUpdate(t *testing.T) { defer v.lock.Unlock() ti, err := v.kv.GetLastUpdated(context.Background()) require.NoError(t, err) - require.Less(t, time.Time{}, *ti) + require.Less(t, time.Time{}, ti) }) t.Run("it should remove old keys", func(t *testing.T) { diff --git a/pkg/services/pluginsintegration/keystore/keystore.go b/pkg/services/pluginsintegration/keystore/keystore.go index 80b5bf5c4ff..e1b93fcb88f 100644 --- a/pkg/services/pluginsintegration/keystore/keystore.go +++ b/pkg/services/pluginsintegration/keystore/keystore.go @@ -1,74 +1,25 @@ package keystore import ( - "context" - "fmt" - "time" - "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/pluginsintegration/cachekvstore" ) // Service is a service for storing and retrieving public keys. type Service struct { - kv *kvstore.NamespacedKVStore + *cachekvstore.CacheKvStore } const ( - prefix = "key-" - lastUpdatedKey = "last_updated" + namespace = "plugin.publickeys" + prefix = "key-" ) var _ plugins.KeyStore = (*Service)(nil) func ProvideService(kv kvstore.KVStore) *Service { return &Service{ - kv: kvstore.WithNamespace(kv, 0, "plugin.publickeys"), + CacheKvStore: cachekvstore.NewCacheKvStoreWithPrefix(kv, namespace, prefix), } } - -func (s *Service) Get(ctx context.Context, key string) (string, bool, error) { - return s.kv.Get(ctx, prefix+key) -} - -func (s *Service) Set(ctx context.Context, key string, value string) error { - return s.kv.Set(ctx, prefix+key, value) -} - -func (s *Service) Del(ctx context.Context, key string) error { - return s.kv.Del(ctx, prefix+key) -} - -func (s *Service) GetLastUpdated(ctx context.Context) (*time.Time, error) { - lastUpdated := &time.Time{} - if val, ok, err := s.kv.Get(ctx, lastUpdatedKey); err != nil { - return nil, fmt.Errorf("failed to get last updated time: %v", err) - } else if ok { - if parsed, err := time.Parse(time.RFC3339, val); err != nil { - return nil, fmt.Errorf("failed to parse last updated time: %v", err) - } else { - lastUpdated = &parsed - } - } - return lastUpdated, nil -} - -func (s *Service) SetLastUpdated(ctx context.Context) error { - lastUpdated := time.Now() - if err := s.kv.Set(ctx, lastUpdatedKey, lastUpdated.Format(time.RFC3339)); err != nil { - return fmt.Errorf("failed to update last updated time: %v", err) - } - return nil -} - -func (s *Service) ListKeys(ctx context.Context) ([]string, error) { - keys, err := s.kv.Keys(ctx, prefix) - if err != nil { - return nil, err - } - res := make([]string, 0, len(keys)) - for _, key := range keys { - res = append(res, key.Key) - } - return res, nil -}