package resource import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "math/rand/v2" "net/http" "sort" "strings" "time" "github.com/bwmarrin/snowflake" "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( defaultListBufferSize = 100 ) // Unified storage backend based on KV storage. type kvStorageBackend struct { snowflake *snowflake.Node kv KV dataStore *dataStore metaStore *metadataStore eventStore *eventStore notifier *notifier builder DocumentBuilder log logging.Logger } var _ StorageBackend = &kvStorageBackend{} func NewKvStorageBackend(kv KV) *kvStorageBackend { s, err := snowflake.NewNode(rand.Int64N(1024)) if err != nil { panic(err) } eventStore := newEventStore(kv) return &kvStorageBackend{ kv: kv, dataStore: newDataStore(kv), metaStore: newMetadataStore(kv), eventStore: eventStore, notifier: newNotifier(eventStore, notifierOptions{}), snowflake: s, builder: StandardDocumentBuilder(), // For now we use the standard document builder. log: &logging.NoOpLogger{}, // Make this configurable } } // WriteEvent writes a resource event (create/update/delete) to the storage backend. func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (int64, error) { if err := event.Validate(); err != nil { return 0, fmt.Errorf("invalid event: %w", err) } rv := k.snowflake.Generate().Int64() obj := event.Object // Write data. var action DataAction switch event.Type { case resourcepb.WatchEvent_ADDED: action = DataActionCreated // Check if resource already exists for create operations _, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{ Namespace: event.Key.Namespace, Group: event.Key.Group, Resource: event.Key.Resource, Name: event.Key.Name, }) if err == nil { // Resource exists, return already exists error return 0, ErrResourceAlreadyExists } if !errors.Is(err, ErrNotFound) { // Some other error occurred return 0, fmt.Errorf("failed to check if resource exists: %w", err) } case resourcepb.WatchEvent_MODIFIED: action = DataActionUpdated case resourcepb.WatchEvent_DELETED: action = DataActionDeleted obj = event.ObjectOld default: return 0, fmt.Errorf("invalid event type: %d", event.Type) } if obj == nil { return 0, fmt.Errorf("object is nil") } // Build the search document doc, err := k.builder.BuildDocument(ctx, event.Key, rv, event.Value) if err != nil { return 0, fmt.Errorf("failed to build document: %w", err) } // Write the data err = k.dataStore.Save(ctx, DataKey{ Namespace: event.Key.Namespace, Group: event.Key.Group, Resource: event.Key.Resource, Name: event.Key.Name, ResourceVersion: rv, Action: action, }, bytes.NewReader(event.Value)) if err != nil { return 0, fmt.Errorf("failed to write data: %w", err) } // Write metadata err = k.metaStore.Save(ctx, MetaDataObj{ Key: MetaDataKey{ Namespace: event.Key.Namespace, Group: event.Key.Group, Resource: event.Key.Resource, Name: event.Key.Name, ResourceVersion: rv, Action: action, Folder: obj.GetFolder(), }, Value: MetaData{ IndexableDocument: *doc, }, }) if err != nil { return 0, fmt.Errorf("failed to write metadata: %w", err) } // Write event err = k.eventStore.Save(ctx, Event{ Namespace: event.Key.Namespace, Group: event.Key.Group, Resource: event.Key.Resource, Name: event.Key.Name, ResourceVersion: rv, Action: action, Folder: obj.GetFolder(), PreviousRV: event.PreviousRV, }) if err != nil { return 0, fmt.Errorf("failed to save event: %w", err) } return rv, nil } func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.ReadRequest) *BackendReadResponse { if req.Key == nil { return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusBadRequest, Message: "missing key"}} } meta, err := k.metaStore.GetResourceKeyAtRevision(ctx, MetaGetRequestKey{ Namespace: req.Key.Namespace, Group: req.Key.Group, Resource: req.Key.Resource, Name: req.Key.Name, }, req.ResourceVersion) if errors.Is(err, ErrNotFound) { return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusNotFound, Message: "not found"}} } else if err != nil { return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}} } data, err := k.dataStore.Get(ctx, DataKey{ Namespace: req.Key.Namespace, Group: req.Key.Group, Resource: req.Key.Resource, Name: req.Key.Name, ResourceVersion: meta.ResourceVersion, Action: meta.Action, }) if err != nil || data == nil { return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}} } value, err := readAndClose(data) if err != nil { return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}} } return &BackendReadResponse{ Key: req.Key, ResourceVersion: meta.ResourceVersion, Value: value, Folder: meta.Folder, } } // ListIterator returns an iterator for listing resources. func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) { if req.Options == nil || req.Options.Key == nil { return 0, fmt.Errorf("missing options or key in ListRequest") } // Parse continue token if provided offset := int64(0) resourceVersion := req.ResourceVersion if req.NextPageToken != "" { token, err := GetContinueToken(req.NextPageToken) if err != nil { return 0, fmt.Errorf("invalid continue token: %w", err) } offset = token.StartOffset resourceVersion = token.ResourceVersion } // We set the listRV to the current time. listRV := k.snowflake.Generate().Int64() if resourceVersion > 0 { listRV = resourceVersion } // Fetch the latest objects keys := make([]MetaDataKey, 0, min(defaultListBufferSize, req.Limit+1)) for metaKey, err := range k.metaStore.ListResourceKeysAtRevision(ctx, MetaListRequestKey{ Namespace: req.Options.Key.Namespace, Group: req.Options.Key.Group, Resource: req.Options.Key.Resource, Name: req.Options.Key.Name, }, resourceVersion) { if err != nil { return 0, err } keys = append(keys, metaKey) } sortMetaKeysByResourceVersion(keys, true) // sort ascending for sql parity if offset > 0 && int64(len(keys)) > offset { keys = keys[offset:] } iter := kvListIterator{ keys: keys, currentIndex: -1, ctx: ctx, listRV: listRV, offset: offset, limit: req.Limit + 1, // TODO: for now we need at least one more item. Fix the caller dataStore: k.dataStore, } err := cb(&iter) if err != nil { return 0, err } return listRV, nil } // kvListIterator implements ListIterator for KV storage type kvListIterator struct { ctx context.Context keys []MetaDataKey currentIndex int dataStore *dataStore listRV int64 offset int64 limit int64 // current rv int64 err error value []byte } func (i *kvListIterator) Next() bool { i.currentIndex++ if i.currentIndex >= len(i.keys) { return false } if int64(i.currentIndex) >= i.limit { return false } i.rv, i.err = i.keys[i.currentIndex].ResourceVersion, nil data, err := i.dataStore.Get(i.ctx, DataKey{ Namespace: i.keys[i.currentIndex].Namespace, Group: i.keys[i.currentIndex].Group, Resource: i.keys[i.currentIndex].Resource, Name: i.keys[i.currentIndex].Name, ResourceVersion: i.keys[i.currentIndex].ResourceVersion, Action: i.keys[i.currentIndex].Action, }) if err != nil { i.err = err return false } i.value, i.err = readAndClose(data) if i.err != nil { return false } // increment the offset i.offset++ return true } func (i *kvListIterator) Error() error { return nil } func (i *kvListIterator) ContinueToken() string { return ContinueToken{ StartOffset: i.offset, ResourceVersion: i.listRV, }.String() } func (i *kvListIterator) ResourceVersion() int64 { return i.rv } func (i *kvListIterator) Namespace() string { return i.keys[i.currentIndex].Namespace } func (i *kvListIterator) Name() string { return i.keys[i.currentIndex].Name } func (i *kvListIterator) Folder() string { return i.keys[i.currentIndex].Folder } func (i *kvListIterator) Value() []byte { return i.value } func validateListHistoryRequest(req *resourcepb.ListRequest) error { if req.Options == nil || req.Options.Key == nil { return fmt.Errorf("missing options or key in ListRequest") } key := req.Options.Key if key.Group == "" { return fmt.Errorf("group is required") } if key.Resource == "" { return fmt.Errorf("resource is required") } if key.Namespace == "" { return fmt.Errorf("namespace is required") } if key.Name == "" { return fmt.Errorf("name is required") } return nil } // filterHistoryKeysByVersion filters history keys based on version match criteria func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListRequest) ([]DataKey, error) { switch req.GetVersionMatchV2() { case resourcepb.ResourceVersionMatchV2_Exact: if req.ResourceVersion <= 0 { return nil, fmt.Errorf("expecting an explicit resource version query when using Exact matching") } var exactKeys []DataKey for _, key := range historyKeys { if key.ResourceVersion == req.ResourceVersion { exactKeys = append(exactKeys, key) } } return exactKeys, nil case resourcepb.ResourceVersionMatchV2_NotOlderThan: if req.ResourceVersion > 0 { var filteredKeys []DataKey for _, key := range historyKeys { if key.ResourceVersion >= req.ResourceVersion { filteredKeys = append(filteredKeys, key) } } return filteredKeys, nil } default: if req.ResourceVersion > 0 { var filteredKeys []DataKey for _, key := range historyKeys { if key.ResourceVersion <= req.ResourceVersion { filteredKeys = append(filteredKeys, key) } } return filteredKeys, nil } } return historyKeys, nil } // applyLiveHistoryFilter applies "live" history logic by ignoring events before the last delete func applyLiveHistoryFilter(filteredKeys []DataKey, req *resourcepb.ListRequest) []DataKey { useLatestDeletionAsMinRV := req.ResourceVersion == 0 && req.Source != resourcepb.ListRequest_TRASH && req.GetVersionMatchV2() != resourcepb.ResourceVersionMatchV2_Exact if !useLatestDeletionAsMinRV { return filteredKeys } latestDeleteRV := int64(0) for _, key := range filteredKeys { if key.Action == DataActionDeleted && key.ResourceVersion > latestDeleteRV { latestDeleteRV = key.ResourceVersion } } if latestDeleteRV > 0 { var liveKeys []DataKey for _, key := range filteredKeys { if key.ResourceVersion > latestDeleteRV { liveKeys = append(liveKeys, key) } } return liveKeys } return filteredKeys } // sortByResourceVersion sorts the history keys based on the sortAscending flag func sortByResourceVersion(filteredKeys []DataKey, sortAscending bool) { if sortAscending { sort.Slice(filteredKeys, func(i, j int) bool { return filteredKeys[i].ResourceVersion < filteredKeys[j].ResourceVersion }) } else { sort.Slice(filteredKeys, func(i, j int) bool { return filteredKeys[i].ResourceVersion > filteredKeys[j].ResourceVersion }) } } // sortMetaKeysByResourceVersion sorts the metadata keys based on the sortAscending flag func sortMetaKeysByResourceVersion(keys []MetaDataKey, sortAscending bool) { if sortAscending { sort.Slice(keys, func(i, j int) bool { return keys[i].ResourceVersion < keys[j].ResourceVersion }) } else { sort.Slice(keys, func(i, j int) bool { return keys[i].ResourceVersion > keys[j].ResourceVersion }) } } // applyPagination filters keys based on pagination parameters func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []DataKey { if lastSeenRV == 0 { return keys } var pagedKeys []DataKey for _, key := range keys { if sortAscending && key.ResourceVersion > lastSeenRV { pagedKeys = append(pagedKeys, key) } else if !sortAscending && key.ResourceVersion < lastSeenRV { pagedKeys = append(pagedKeys, key) } } return pagedKeys } // ListHistory is like ListIterator, but it returns the history of a resource. func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error) (int64, error) { if err := validateListHistoryRequest(req); err != nil { return 0, err } key := req.Options.Key // Parse continue token if provided lastSeenRV := int64(0) sortAscending := req.GetVersionMatchV2() == resourcepb.ResourceVersionMatchV2_NotOlderThan if req.NextPageToken != "" { token, err := GetContinueToken(req.NextPageToken) if err != nil { return 0, fmt.Errorf("invalid continue token: %w", err) } lastSeenRV = token.ResourceVersion sortAscending = token.SortAscending } // Generate a new resource version for the list listRV := k.snowflake.Generate().Int64() // Get all history entries by iterating through datastore keys historyKeys := make([]DataKey, 0, min(defaultListBufferSize, req.Limit+1)) // Use datastore.Keys to get all data keys for this specific resource for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{ Namespace: key.Namespace, Group: key.Group, Resource: key.Resource, Name: key.Name, }) { if err != nil { return 0, err } historyKeys = append(historyKeys, dataKey) } // Check if context has been cancelled if ctx.Err() != nil { return 0, ctx.Err() } // Handle trash differently from regular history if req.Source == resourcepb.ListRequest_TRASH { return k.processTrashEntries(ctx, req, fn, historyKeys, lastSeenRV, sortAscending, listRV) } // Apply filtering based on version match filteredKeys, filterErr := filterHistoryKeysByVersion(historyKeys, req) if filterErr != nil { return 0, filterErr } // Apply "live" history logic: ignore events before the last delete filteredKeys = applyLiveHistoryFilter(filteredKeys, req) // Sort the entries if not already sorted correctly sortByResourceVersion(filteredKeys, sortAscending) // Pagination: filter out items up to and including lastSeenRV pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending) iter := kvHistoryIterator{ keys: pagedKeys, currentIndex: -1, ctx: ctx, listRV: listRV, sortAscending: sortAscending, dataStore: k.dataStore, } err := fn(&iter) if err != nil { return 0, err } return listRV, nil } // processTrashEntries handles the special case of listing deleted items (trash) func (k *kvStorageBackend) processTrashEntries(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error, historyKeys []DataKey, lastSeenRV int64, sortAscending bool, listRV int64) (int64, error) { // Filter to only deleted entries var deletedKeys []DataKey for _, key := range historyKeys { if key.Action == DataActionDeleted { deletedKeys = append(deletedKeys, key) } } // Check if the resource currently exists (is live) // If it exists, don't return any trash entries _, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{ Namespace: req.Options.Key.Namespace, Group: req.Options.Key.Group, Resource: req.Options.Key.Resource, Name: req.Options.Key.Name, }) var trashKeys []DataKey if errors.Is(err, ErrNotFound) { // Resource doesn't exist currently, so we can return the latest delete // Find the latest delete event var latestDelete *DataKey for _, key := range deletedKeys { if latestDelete == nil || key.ResourceVersion > latestDelete.ResourceVersion { latestDelete = &key } } if latestDelete != nil { trashKeys = append(trashKeys, *latestDelete) } } // If err != ErrNotFound, the resource exists, so no trash entries should be returned // Apply version filtering filteredKeys, err := filterHistoryKeysByVersion(trashKeys, req) if err != nil { return 0, err } // Sort the entries sortByResourceVersion(filteredKeys, sortAscending) // Pagination: filter out items up to and including lastSeenRV pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending) iter := kvHistoryIterator{ keys: pagedKeys, currentIndex: -1, ctx: ctx, listRV: listRV, sortAscending: sortAscending, dataStore: k.dataStore, } err = fn(&iter) if err != nil { return 0, err } return listRV, nil } // kvHistoryIterator implements ListIterator for KV storage history type kvHistoryIterator struct { ctx context.Context keys []DataKey currentIndex int listRV int64 sortAscending bool dataStore *dataStore // current rv int64 err error value []byte folder string } func (i *kvHistoryIterator) Next() bool { i.currentIndex++ if i.currentIndex >= len(i.keys) { return false } key := i.keys[i.currentIndex] i.rv = key.ResourceVersion // Read the value from the ReadCloser data, err := i.dataStore.Get(i.ctx, key) if err != nil { i.err = err return false } if data == nil { i.err = fmt.Errorf("data is nil") return false } i.value, i.err = readAndClose(data) if i.err != nil { return false } // Extract the folder from the meta data partial := &metav1.PartialObjectMetadata{} err = json.Unmarshal(i.value, partial) if err != nil { i.err = err return false } meta, err := utils.MetaAccessor(partial) if err != nil { i.err = err return false } i.folder = meta.GetFolder() i.err = nil return true } func (i *kvHistoryIterator) Error() error { return i.err } func (i *kvHistoryIterator) ContinueToken() string { if i.currentIndex < 0 || i.currentIndex >= len(i.keys) { return "" } token := ContinueToken{ StartOffset: i.rv, ResourceVersion: i.keys[i.currentIndex].ResourceVersion, SortAscending: i.sortAscending, } return token.String() } func (i *kvHistoryIterator) ResourceVersion() int64 { return i.rv } func (i *kvHistoryIterator) Namespace() string { if i.currentIndex >= 0 && i.currentIndex < len(i.keys) { return i.keys[i.currentIndex].Namespace } return "" } func (i *kvHistoryIterator) Name() string { if i.currentIndex >= 0 && i.currentIndex < len(i.keys) { return i.keys[i.currentIndex].Name } return "" } func (i *kvHistoryIterator) Folder() string { return i.folder } func (i *kvHistoryIterator) Value() []byte { return i.value } // WatchWriteEvents returns a channel that receives write events. func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { // Create a channel to receive events events := make(chan *WrittenEvent, 10000) // TODO: make this configurable notifierEvents := k.notifier.Watch(ctx, defaultWatchOptions()) go func() { for event := range notifierEvents { // fetch the data dataReader, err := k.dataStore.Get(ctx, DataKey{ Namespace: event.Namespace, Group: event.Group, Resource: event.Resource, Name: event.Name, ResourceVersion: event.ResourceVersion, Action: event.Action, }) if err != nil || dataReader == nil { k.log.Error("failed to get data for event", "error", err) continue } data, err := readAndClose(dataReader) if err != nil { k.log.Error("failed to read and close data for event", "error", err) continue } var t resourcepb.WatchEvent_Type switch event.Action { case DataActionCreated: t = resourcepb.WatchEvent_ADDED case DataActionUpdated: t = resourcepb.WatchEvent_MODIFIED case DataActionDeleted: t = resourcepb.WatchEvent_DELETED } events <- &WrittenEvent{ Key: &resourcepb.ResourceKey{ Namespace: event.Namespace, Group: event.Group, Resource: event.Resource, Name: event.Name, }, Type: t, Folder: event.Folder, Value: data, ResourceVersion: event.ResourceVersion, PreviousRV: event.PreviousRV, Timestamp: event.ResourceVersion / time.Second.Nanoseconds(), // convert to seconds } } close(events) }() return events, nil } // GetResourceStats returns resource stats within the storage backend. // TODO: this isn't very efficient, we should use a more efficient algorithm. func (k *kvStorageBackend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) { stats := make([]ResourceStats, 0) res := make(map[string]map[string]bool) rvs := make(map[string]int64) // Use datastore.Keys to get all data keys for the namespace for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: namespace}) { if err != nil { return nil, err } key := fmt.Sprintf("%s/%s/%s", dataKey.Namespace, dataKey.Group, dataKey.Resource) if _, ok := res[key]; !ok { res[key] = make(map[string]bool) rvs[key] = 1 } res[key][dataKey.Name] = dataKey.Action != DataActionDeleted rvs[key] = dataKey.ResourceVersion } for key, names := range res { parts := strings.Split(key, "/") count := int64(0) for _, exists := range names { if exists { count++ } } if count <= int64(minCount) { continue } stats = append(stats, ResourceStats{ NamespacedResource: NamespacedResource{ Namespace: parts[0], Group: parts[1], Resource: parts[2], }, Count: count, ResourceVersion: rvs[key], }) } return stats, nil } // readAndClose reads all data from a ReadCloser and ensures it's closed, // combining any errors from both operations. func readAndClose(r io.ReadCloser) ([]byte, error) { data, err := io.ReadAll(r) return data, errors.Join(err, r.Close()) }