Files
grafana/pkg/storage/unified/resource/storage_backend.go
2025-07-16 08:09:31 +02:00

815 lines
22 KiB
Go

package resource
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand/v2"
"net/http"
"sort"
"strings"
"time"
"github.com/bwmarrin/snowflake"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
defaultListBufferSize = 100
)
// Unified storage backend based on KV storage.
type kvStorageBackend struct {
snowflake *snowflake.Node
kv KV
dataStore *dataStore
metaStore *metadataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
}
var _ StorageBackend = &kvStorageBackend{}
func NewKvStorageBackend(kv KV) *kvStorageBackend {
s, err := snowflake.NewNode(rand.Int64N(1024))
if err != nil {
panic(err)
}
eventStore := newEventStore(kv)
return &kvStorageBackend{
kv: kv,
dataStore: newDataStore(kv),
metaStore: newMetadataStore(kv),
eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
}
}
// WriteEvent writes a resource event (create/update/delete) to the storage backend.
func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (int64, error) {
if err := event.Validate(); err != nil {
return 0, fmt.Errorf("invalid event: %w", err)
}
rv := k.snowflake.Generate().Int64()
obj := event.Object
// Write data.
var action DataAction
switch event.Type {
case resourcepb.WatchEvent_ADDED:
action = DataActionCreated
// Check if resource already exists for create operations
_, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
})
if err == nil {
// Resource exists, return already exists error
return 0, ErrResourceAlreadyExists
}
if !errors.Is(err, ErrNotFound) {
// Some other error occurred
return 0, fmt.Errorf("failed to check if resource exists: %w", err)
}
case resourcepb.WatchEvent_MODIFIED:
action = DataActionUpdated
case resourcepb.WatchEvent_DELETED:
action = DataActionDeleted
obj = event.ObjectOld
default:
return 0, fmt.Errorf("invalid event type: %d", event.Type)
}
if obj == nil {
return 0, fmt.Errorf("object is nil")
}
// Build the search document
doc, err := k.builder.BuildDocument(ctx, event.Key, rv, event.Value)
if err != nil {
return 0, fmt.Errorf("failed to build document: %w", err)
}
// Write the data
err = k.dataStore.Save(ctx, DataKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
ResourceVersion: rv,
Action: action,
}, bytes.NewReader(event.Value))
if err != nil {
return 0, fmt.Errorf("failed to write data: %w", err)
}
// Write metadata
err = k.metaStore.Save(ctx, MetaDataObj{
Key: MetaDataKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
ResourceVersion: rv,
Action: action,
Folder: obj.GetFolder(),
},
Value: MetaData{
IndexableDocument: *doc,
},
})
if err != nil {
return 0, fmt.Errorf("failed to write metadata: %w", err)
}
// Write event
err = k.eventStore.Save(ctx, Event{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
ResourceVersion: rv,
Action: action,
Folder: obj.GetFolder(),
PreviousRV: event.PreviousRV,
})
if err != nil {
return 0, fmt.Errorf("failed to save event: %w", err)
}
return rv, nil
}
func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.ReadRequest) *BackendReadResponse {
if req.Key == nil {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusBadRequest, Message: "missing key"}}
}
meta, err := k.metaStore.GetResourceKeyAtRevision(ctx, MetaGetRequestKey{
Namespace: req.Key.Namespace,
Group: req.Key.Group,
Resource: req.Key.Resource,
Name: req.Key.Name,
}, req.ResourceVersion)
if errors.Is(err, ErrNotFound) {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusNotFound, Message: "not found"}}
} else if err != nil {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
}
data, err := k.dataStore.Get(ctx, DataKey{
Namespace: req.Key.Namespace,
Group: req.Key.Group,
Resource: req.Key.Resource,
Name: req.Key.Name,
ResourceVersion: meta.ResourceVersion,
Action: meta.Action,
})
if err != nil || data == nil {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
}
value, err := readAndClose(data)
if err != nil {
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
}
return &BackendReadResponse{
Key: req.Key,
ResourceVersion: meta.ResourceVersion,
Value: value,
Folder: meta.Folder,
}
}
// ListIterator returns an iterator for listing resources.
func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) {
if req.Options == nil || req.Options.Key == nil {
return 0, fmt.Errorf("missing options or key in ListRequest")
}
// Parse continue token if provided
offset := int64(0)
resourceVersion := req.ResourceVersion
if req.NextPageToken != "" {
token, err := GetContinueToken(req.NextPageToken)
if err != nil {
return 0, fmt.Errorf("invalid continue token: %w", err)
}
offset = token.StartOffset
resourceVersion = token.ResourceVersion
}
// We set the listRV to the current time.
listRV := k.snowflake.Generate().Int64()
if resourceVersion > 0 {
listRV = resourceVersion
}
// Fetch the latest objects
keys := make([]MetaDataKey, 0, min(defaultListBufferSize, req.Limit+1))
for metaKey, err := range k.metaStore.ListResourceKeysAtRevision(ctx, MetaListRequestKey{
Namespace: req.Options.Key.Namespace,
Group: req.Options.Key.Group,
Resource: req.Options.Key.Resource,
Name: req.Options.Key.Name,
}, resourceVersion) {
if err != nil {
return 0, err
}
keys = append(keys, metaKey)
}
sortMetaKeysByResourceVersion(keys, true) // sort ascending for sql parity
if offset > 0 && int64(len(keys)) > offset {
keys = keys[offset:]
}
iter := kvListIterator{
keys: keys,
currentIndex: -1,
ctx: ctx,
listRV: listRV,
offset: offset,
limit: req.Limit + 1, // TODO: for now we need at least one more item. Fix the caller
dataStore: k.dataStore,
}
err := cb(&iter)
if err != nil {
return 0, err
}
return listRV, nil
}
// kvListIterator implements ListIterator for KV storage
type kvListIterator struct {
ctx context.Context
keys []MetaDataKey
currentIndex int
dataStore *dataStore
listRV int64
offset int64
limit int64
// current
rv int64
err error
value []byte
}
func (i *kvListIterator) Next() bool {
i.currentIndex++
if i.currentIndex >= len(i.keys) {
return false
}
if int64(i.currentIndex) >= i.limit {
return false
}
i.rv, i.err = i.keys[i.currentIndex].ResourceVersion, nil
data, err := i.dataStore.Get(i.ctx, DataKey{
Namespace: i.keys[i.currentIndex].Namespace,
Group: i.keys[i.currentIndex].Group,
Resource: i.keys[i.currentIndex].Resource,
Name: i.keys[i.currentIndex].Name,
ResourceVersion: i.keys[i.currentIndex].ResourceVersion,
Action: i.keys[i.currentIndex].Action,
})
if err != nil {
i.err = err
return false
}
i.value, i.err = readAndClose(data)
if i.err != nil {
return false
}
// increment the offset
i.offset++
return true
}
func (i *kvListIterator) Error() error {
return nil
}
func (i *kvListIterator) ContinueToken() string {
return ContinueToken{
StartOffset: i.offset,
ResourceVersion: i.listRV,
}.String()
}
func (i *kvListIterator) ResourceVersion() int64 {
return i.rv
}
func (i *kvListIterator) Namespace() string {
return i.keys[i.currentIndex].Namespace
}
func (i *kvListIterator) Name() string {
return i.keys[i.currentIndex].Name
}
func (i *kvListIterator) Folder() string {
return i.keys[i.currentIndex].Folder
}
func (i *kvListIterator) Value() []byte {
return i.value
}
func validateListHistoryRequest(req *resourcepb.ListRequest) error {
if req.Options == nil || req.Options.Key == nil {
return fmt.Errorf("missing options or key in ListRequest")
}
key := req.Options.Key
if key.Group == "" {
return fmt.Errorf("group is required")
}
if key.Resource == "" {
return fmt.Errorf("resource is required")
}
if key.Namespace == "" {
return fmt.Errorf("namespace is required")
}
if key.Name == "" {
return fmt.Errorf("name is required")
}
return nil
}
// filterHistoryKeysByVersion filters history keys based on version match criteria
func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListRequest) ([]DataKey, error) {
switch req.GetVersionMatchV2() {
case resourcepb.ResourceVersionMatchV2_Exact:
if req.ResourceVersion <= 0 {
return nil, fmt.Errorf("expecting an explicit resource version query when using Exact matching")
}
var exactKeys []DataKey
for _, key := range historyKeys {
if key.ResourceVersion == req.ResourceVersion {
exactKeys = append(exactKeys, key)
}
}
return exactKeys, nil
case resourcepb.ResourceVersionMatchV2_NotOlderThan:
if req.ResourceVersion > 0 {
var filteredKeys []DataKey
for _, key := range historyKeys {
if key.ResourceVersion >= req.ResourceVersion {
filteredKeys = append(filteredKeys, key)
}
}
return filteredKeys, nil
}
default:
if req.ResourceVersion > 0 {
var filteredKeys []DataKey
for _, key := range historyKeys {
if key.ResourceVersion <= req.ResourceVersion {
filteredKeys = append(filteredKeys, key)
}
}
return filteredKeys, nil
}
}
return historyKeys, nil
}
// applyLiveHistoryFilter applies "live" history logic by ignoring events before the last delete
func applyLiveHistoryFilter(filteredKeys []DataKey, req *resourcepb.ListRequest) []DataKey {
useLatestDeletionAsMinRV := req.ResourceVersion == 0 && req.Source != resourcepb.ListRequest_TRASH && req.GetVersionMatchV2() != resourcepb.ResourceVersionMatchV2_Exact
if !useLatestDeletionAsMinRV {
return filteredKeys
}
latestDeleteRV := int64(0)
for _, key := range filteredKeys {
if key.Action == DataActionDeleted && key.ResourceVersion > latestDeleteRV {
latestDeleteRV = key.ResourceVersion
}
}
if latestDeleteRV > 0 {
var liveKeys []DataKey
for _, key := range filteredKeys {
if key.ResourceVersion > latestDeleteRV {
liveKeys = append(liveKeys, key)
}
}
return liveKeys
}
return filteredKeys
}
// sortByResourceVersion sorts the history keys based on the sortAscending flag
func sortByResourceVersion(filteredKeys []DataKey, sortAscending bool) {
if sortAscending {
sort.Slice(filteredKeys, func(i, j int) bool {
return filteredKeys[i].ResourceVersion < filteredKeys[j].ResourceVersion
})
} else {
sort.Slice(filteredKeys, func(i, j int) bool {
return filteredKeys[i].ResourceVersion > filteredKeys[j].ResourceVersion
})
}
}
// sortMetaKeysByResourceVersion sorts the metadata keys based on the sortAscending flag
func sortMetaKeysByResourceVersion(keys []MetaDataKey, sortAscending bool) {
if sortAscending {
sort.Slice(keys, func(i, j int) bool {
return keys[i].ResourceVersion < keys[j].ResourceVersion
})
} else {
sort.Slice(keys, func(i, j int) bool {
return keys[i].ResourceVersion > keys[j].ResourceVersion
})
}
}
// applyPagination filters keys based on pagination parameters
func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []DataKey {
if lastSeenRV == 0 {
return keys
}
var pagedKeys []DataKey
for _, key := range keys {
if sortAscending && key.ResourceVersion > lastSeenRV {
pagedKeys = append(pagedKeys, key)
} else if !sortAscending && key.ResourceVersion < lastSeenRV {
pagedKeys = append(pagedKeys, key)
}
}
return pagedKeys
}
// ListHistory is like ListIterator, but it returns the history of a resource.
func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error) (int64, error) {
if err := validateListHistoryRequest(req); err != nil {
return 0, err
}
key := req.Options.Key
// Parse continue token if provided
lastSeenRV := int64(0)
sortAscending := req.GetVersionMatchV2() == resourcepb.ResourceVersionMatchV2_NotOlderThan
if req.NextPageToken != "" {
token, err := GetContinueToken(req.NextPageToken)
if err != nil {
return 0, fmt.Errorf("invalid continue token: %w", err)
}
lastSeenRV = token.ResourceVersion
sortAscending = token.SortAscending
}
// Generate a new resource version for the list
listRV := k.snowflake.Generate().Int64()
// Get all history entries by iterating through datastore keys
historyKeys := make([]DataKey, 0, min(defaultListBufferSize, req.Limit+1))
// Use datastore.Keys to get all data keys for this specific resource
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
Name: key.Name,
}) {
if err != nil {
return 0, err
}
historyKeys = append(historyKeys, dataKey)
}
// Check if context has been cancelled
if ctx.Err() != nil {
return 0, ctx.Err()
}
// Handle trash differently from regular history
if req.Source == resourcepb.ListRequest_TRASH {
return k.processTrashEntries(ctx, req, fn, historyKeys, lastSeenRV, sortAscending, listRV)
}
// Apply filtering based on version match
filteredKeys, filterErr := filterHistoryKeysByVersion(historyKeys, req)
if filterErr != nil {
return 0, filterErr
}
// Apply "live" history logic: ignore events before the last delete
filteredKeys = applyLiveHistoryFilter(filteredKeys, req)
// Sort the entries if not already sorted correctly
sortByResourceVersion(filteredKeys, sortAscending)
// Pagination: filter out items up to and including lastSeenRV
pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending)
iter := kvHistoryIterator{
keys: pagedKeys,
currentIndex: -1,
ctx: ctx,
listRV: listRV,
sortAscending: sortAscending,
dataStore: k.dataStore,
}
err := fn(&iter)
if err != nil {
return 0, err
}
return listRV, nil
}
// processTrashEntries handles the special case of listing deleted items (trash)
func (k *kvStorageBackend) processTrashEntries(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error, historyKeys []DataKey, lastSeenRV int64, sortAscending bool, listRV int64) (int64, error) {
// Filter to only deleted entries
var deletedKeys []DataKey
for _, key := range historyKeys {
if key.Action == DataActionDeleted {
deletedKeys = append(deletedKeys, key)
}
}
// Check if the resource currently exists (is live)
// If it exists, don't return any trash entries
_, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{
Namespace: req.Options.Key.Namespace,
Group: req.Options.Key.Group,
Resource: req.Options.Key.Resource,
Name: req.Options.Key.Name,
})
var trashKeys []DataKey
if errors.Is(err, ErrNotFound) {
// Resource doesn't exist currently, so we can return the latest delete
// Find the latest delete event
var latestDelete *DataKey
for _, key := range deletedKeys {
if latestDelete == nil || key.ResourceVersion > latestDelete.ResourceVersion {
latestDelete = &key
}
}
if latestDelete != nil {
trashKeys = append(trashKeys, *latestDelete)
}
}
// If err != ErrNotFound, the resource exists, so no trash entries should be returned
// Apply version filtering
filteredKeys, err := filterHistoryKeysByVersion(trashKeys, req)
if err != nil {
return 0, err
}
// Sort the entries
sortByResourceVersion(filteredKeys, sortAscending)
// Pagination: filter out items up to and including lastSeenRV
pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending)
iter := kvHistoryIterator{
keys: pagedKeys,
currentIndex: -1,
ctx: ctx,
listRV: listRV,
sortAscending: sortAscending,
dataStore: k.dataStore,
}
err = fn(&iter)
if err != nil {
return 0, err
}
return listRV, nil
}
// kvHistoryIterator implements ListIterator for KV storage history
type kvHistoryIterator struct {
ctx context.Context
keys []DataKey
currentIndex int
listRV int64
sortAscending bool
dataStore *dataStore
// current
rv int64
err error
value []byte
folder string
}
func (i *kvHistoryIterator) Next() bool {
i.currentIndex++
if i.currentIndex >= len(i.keys) {
return false
}
key := i.keys[i.currentIndex]
i.rv = key.ResourceVersion
// Read the value from the ReadCloser
data, err := i.dataStore.Get(i.ctx, key)
if err != nil {
i.err = err
return false
}
if data == nil {
i.err = fmt.Errorf("data is nil")
return false
}
i.value, i.err = readAndClose(data)
if i.err != nil {
return false
}
// Extract the folder from the meta data
partial := &metav1.PartialObjectMetadata{}
err = json.Unmarshal(i.value, partial)
if err != nil {
i.err = err
return false
}
meta, err := utils.MetaAccessor(partial)
if err != nil {
i.err = err
return false
}
i.folder = meta.GetFolder()
i.err = nil
return true
}
func (i *kvHistoryIterator) Error() error {
return i.err
}
func (i *kvHistoryIterator) ContinueToken() string {
if i.currentIndex < 0 || i.currentIndex >= len(i.keys) {
return ""
}
token := ContinueToken{
StartOffset: i.rv,
ResourceVersion: i.keys[i.currentIndex].ResourceVersion,
SortAscending: i.sortAscending,
}
return token.String()
}
func (i *kvHistoryIterator) ResourceVersion() int64 {
return i.rv
}
func (i *kvHistoryIterator) Namespace() string {
if i.currentIndex >= 0 && i.currentIndex < len(i.keys) {
return i.keys[i.currentIndex].Namespace
}
return ""
}
func (i *kvHistoryIterator) Name() string {
if i.currentIndex >= 0 && i.currentIndex < len(i.keys) {
return i.keys[i.currentIndex].Name
}
return ""
}
func (i *kvHistoryIterator) Folder() string {
return i.folder
}
func (i *kvHistoryIterator) Value() []byte {
return i.value
}
// WatchWriteEvents returns a channel that receives write events.
func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
// Create a channel to receive events
events := make(chan *WrittenEvent, 10000) // TODO: make this configurable
notifierEvents := k.notifier.Watch(ctx, defaultWatchOptions())
go func() {
for event := range notifierEvents {
// fetch the data
dataReader, err := k.dataStore.Get(ctx, DataKey{
Namespace: event.Namespace,
Group: event.Group,
Resource: event.Resource,
Name: event.Name,
ResourceVersion: event.ResourceVersion,
Action: event.Action,
})
if err != nil || dataReader == nil {
k.log.Error("failed to get data for event", "error", err)
continue
}
data, err := readAndClose(dataReader)
if err != nil {
k.log.Error("failed to read and close data for event", "error", err)
continue
}
var t resourcepb.WatchEvent_Type
switch event.Action {
case DataActionCreated:
t = resourcepb.WatchEvent_ADDED
case DataActionUpdated:
t = resourcepb.WatchEvent_MODIFIED
case DataActionDeleted:
t = resourcepb.WatchEvent_DELETED
}
events <- &WrittenEvent{
Key: &resourcepb.ResourceKey{
Namespace: event.Namespace,
Group: event.Group,
Resource: event.Resource,
Name: event.Name,
},
Type: t,
Folder: event.Folder,
Value: data,
ResourceVersion: event.ResourceVersion,
PreviousRV: event.PreviousRV,
Timestamp: event.ResourceVersion / time.Second.Nanoseconds(), // convert to seconds
}
}
close(events)
}()
return events, nil
}
// GetResourceStats returns resource stats within the storage backend.
// TODO: this isn't very efficient, we should use a more efficient algorithm.
func (k *kvStorageBackend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) {
stats := make([]ResourceStats, 0)
res := make(map[string]map[string]bool)
rvs := make(map[string]int64)
// Use datastore.Keys to get all data keys for the namespace
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: namespace}) {
if err != nil {
return nil, err
}
key := fmt.Sprintf("%s/%s/%s", dataKey.Namespace, dataKey.Group, dataKey.Resource)
if _, ok := res[key]; !ok {
res[key] = make(map[string]bool)
rvs[key] = 1
}
res[key][dataKey.Name] = dataKey.Action != DataActionDeleted
rvs[key] = dataKey.ResourceVersion
}
for key, names := range res {
parts := strings.Split(key, "/")
count := int64(0)
for _, exists := range names {
if exists {
count++
}
}
if count <= int64(minCount) {
continue
}
stats = append(stats, ResourceStats{
NamespacedResource: NamespacedResource{
Namespace: parts[0],
Group: parts[1],
Resource: parts[2],
},
Count: count,
ResourceVersion: rvs[key],
})
}
return stats, nil
}
// readAndClose reads all data from a ReadCloser and ensures it's closed,
// combining any errors from both operations.
func readAndClose(r io.ReadCloser) ([]byte, error) {
data, err := io.ReadAll(r)
return data, errors.Join(err, r.Close())
}