package resource import ( "bytes" "context" "errors" "fmt" "io" "net/http" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "gocloud.dev/blob" _ "gocloud.dev/blob/fileblob" _ "gocloud.dev/blob/memblob" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) type CDKBackendOptions struct { Tracer trace.Tracer Bucket CDKBucket RootFolder string } func NewCDKBackend(ctx context.Context, opts CDKBackendOptions) (StorageBackend, error) { if opts.Tracer == nil { opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store") } if opts.Bucket == nil { return nil, fmt.Errorf("missing bucket") } found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{ Prefix: opts.RootFolder, Delimiter: "/", }) if err != nil { return nil, err } if found == nil { return nil, fmt.Errorf("the root folder does not exist") } backend := &cdkBackend{ tracer: opts.Tracer, bucket: opts.Bucket, root: opts.RootFolder, } backend.rv.Swap(time.Now().UnixMilli()) return backend, nil } type cdkBackend struct { tracer trace.Tracer bucket CDKBucket root string mutex sync.Mutex rv atomic.Int64 // Simple watch stream -- NOTE, this only works for single tenant! broadcaster Broadcaster[*WrittenEvent] stream chan<- *WrittenEvent } func (s *cdkBackend) getPath(key *resourcepb.ResourceKey, rv int64) string { var buffer bytes.Buffer buffer.WriteString(s.root) if key.Group == "" { return buffer.String() } buffer.WriteString(key.Group) if key.Resource == "" { return buffer.String() } buffer.WriteString("/") buffer.WriteString(key.Resource) if key.Namespace == "" { if key.Name == "" { return buffer.String() } buffer.WriteString("/__cluster__") } else { buffer.WriteString("/") buffer.WriteString(key.Namespace) } if key.Name == "" { return buffer.String() } buffer.WriteString("/") buffer.WriteString(key.Name) if rv > 0 { buffer.WriteString(fmt.Sprintf("/%d.json", rv)) } return buffer.String() } // GetResourceStats implements Backend. func (s *cdkBackend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) { return nil, fmt.Errorf("not implemented") } func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { if event.Type == resourcepb.WatchEvent_ADDED { // ReadResource deals with deleted values (i.e. a file exists but has generation -999). resp := s.ReadResource(ctx, &resourcepb.ReadRequest{Key: event.Key}) if resp.Error != nil && resp.Error.Code != http.StatusNotFound { return 0, GetError(resp.Error) } if resp.Value != nil { return 0, ErrResourceAlreadyExists } } // Scope the lock { s.mutex.Lock() defer s.mutex.Unlock() rv = s.rv.Add(1) err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{ ContentType: "application/json", }) } // Async notify all subscribers if s.stream != nil { go func() { write := &WrittenEvent{ Type: event.Type, Key: event.Key, PreviousRV: event.PreviousRV, Value: event.Value, Timestamp: time.Now().UnixMilli(), ResourceVersion: rv, } s.stream <- write }() } return rv, err } func (s *cdkBackend) ReadResource(ctx context.Context, req *resourcepb.ReadRequest) *BackendReadResponse { rv := req.ResourceVersion path := s.getPath(req.Key, rv) if rv < 1 { iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"}) for { obj, err := iter.Next(ctx) if errors.Is(err, io.EOF) { break } if strings.HasSuffix(obj.Key, ".json") { idx := strings.LastIndex(obj.Key, "/") + 1 edx := strings.LastIndex(obj.Key, ".") if idx > 0 { v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) if err == nil && v > rv { rv = v path = obj.Key // find the path with biggest resource version } } } } } raw, err := s.bucket.ReadAll(ctx, path) if raw == nil && req.ResourceVersion > 0 { if req.ResourceVersion > s.rv.Load() { return &BackendReadResponse{ Error: &resourcepb.ErrorResult{ Code: http.StatusGatewayTimeout, Reason: string(metav1.StatusReasonTimeout), // match etcd behavior Message: "ResourceVersion is larger than max", Details: &resourcepb.ErrorDetails{ Causes: []*resourcepb.ErrorCause{ { Reason: string(metav1.CauseTypeResourceVersionTooLarge), Message: fmt.Sprintf("requested: %d, current %d", req.ResourceVersion, s.rv.Load()), }, }, }, }, } } // If the there was an explicit request, get the latest rsp := s.ReadResource(ctx, &resourcepb.ReadRequest{Key: req.Key}) if rsp != nil && len(rsp.Value) > 0 { raw = rsp.Value rv = rsp.ResourceVersion err = nil } } if err == nil && isDeletedValue(raw) { raw = nil } if raw == nil { return &BackendReadResponse{Error: NewNotFoundError(req.Key)} } return &BackendReadResponse{ Key: req.Key, Folder: "", // TODO: implement this ResourceVersion: rv, Value: raw, } } func isDeletedValue(raw []byte) bool { if bytes.Contains(raw, []byte(`"generation":-999`)) { tmp := &unstructured.Unstructured{} err := tmp.UnmarshalJSON(raw) if err == nil && tmp.GetGeneration() == utils.DeletedGeneration { return true } } return false } func (s *cdkBackend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) { resources, err := buildTree(ctx, s, req.Options.Key) if err != nil { return 0, err } err = cb(resources) return resources.listRV, err } func (s *cdkBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) { return 0, fmt.Errorf("listing from history not supported in CDK backend") } func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { s.mutex.Lock() defer s.mutex.Unlock() if s.broadcaster == nil { var err error s.broadcaster, err = NewBroadcaster(context.Background(), func(c chan<- *WrittenEvent) error { s.stream = c return nil }) if err != nil { return nil, err } } return s.broadcaster.Subscribe(ctx) } // group > resource > namespace > name > versions type cdkResource struct { prefix string versions []cdkVersion } type cdkVersion struct { rv int64 key string } type cdkListIterator struct { bucket CDKBucket ctx context.Context err error listRV int64 resources []cdkResource index int currentRV int64 currentKey string currentVal []byte } // Next implements ListIterator. func (c *cdkListIterator) Next() bool { if c.err != nil { return false } for { c.currentVal = nil c.index += 1 if c.index >= len(c.resources) { return false } item := c.resources[c.index] latest := item.versions[0] raw, err := c.bucket.ReadAll(c.ctx, latest.key) if err != nil { c.err = err return false } if !isDeletedValue(raw) { c.currentRV = latest.rv c.currentKey = latest.key c.currentVal = raw return true } } } // Error implements ListIterator. func (c *cdkListIterator) Error() error { return c.err } // ResourceVersion implements ListIterator. func (c *cdkListIterator) ResourceVersion() int64 { return c.currentRV } // Value implements ListIterator. func (c *cdkListIterator) Value() []byte { return c.currentVal } // ContinueToken implements ListIterator. func (c *cdkListIterator) ContinueToken() string { return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey) } // Name implements ListIterator. func (c *cdkListIterator) Name() string { return c.currentKey // TODO (parse name from key) } // Namespace implements ListIterator. func (c *cdkListIterator) Namespace() string { return c.currentKey // TODO (parse namespace from key) } func (c *cdkListIterator) Folder() string { return "" // TODO: implement this } var _ ListIterator = (*cdkListIterator)(nil) func buildTree(ctx context.Context, s *cdkBackend, key *resourcepb.ResourceKey) (*cdkListIterator, error) { byPrefix := make(map[string]*cdkResource) path := s.getPath(key, 0) iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive for { obj, err := iter.Next(ctx) if errors.Is(err, io.EOF) { break } if strings.HasSuffix(obj.Key, ".json") { idx := strings.LastIndex(obj.Key, "/") + 1 edx := strings.LastIndex(obj.Key, ".") if idx > 0 { rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) if err == nil { prefix := obj.Key[:idx] res, ok := byPrefix[prefix] if !ok { res = &cdkResource{prefix: prefix} byPrefix[prefix] = res } res.versions = append(res.versions, cdkVersion{ rv: rv, key: obj.Key, }) } } } } // Now sort all versions resources := make([]cdkResource, 0, len(byPrefix)) for _, res := range byPrefix { sort.Slice(res.versions, func(i, j int) bool { return res.versions[i].rv > res.versions[j].rv }) resources = append(resources, *res) } sort.Slice(resources, func(i, j int) bool { a := resources[i].prefix b := resources[j].prefix return a < b }) return &cdkListIterator{ ctx: ctx, bucket: s.bucket, resources: resources, listRV: s.rv.Load(), index: -1, // must call next first }, nil }