From 2e32a3d19f73bff62b1825c85ce02c88647aa56e Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Thu, 20 Feb 2025 14:28:03 +0100 Subject: [PATCH] unistore: fix dashboard conversion events (#101053) fix dashboard conversion events --- pkg/storage/unified/apistore/store.go | 2 +- pkg/storage/unified/apistore/stream.go | 34 +++++++++++++-------- pkg/storage/unified/apistore/stream_test.go | 34 +++++++++++++++++++++ 3 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 pkg/storage/unified/apistore/stream_test.go diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index 8d3da4b7c33..115443ad099 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -294,7 +294,7 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption } reporter := apierrors.NewClientErrorReporter(500, "WATCH", "") - decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch) + decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch, s.opts.InternalConversion) return watch.NewStreamWatcher(decoder, reporter), nil } diff --git a/pkg/storage/unified/apistore/stream.go b/pkg/storage/unified/apistore/stream.go index 9546e3e8b64..e38e1a76d16 100644 --- a/pkg/storage/unified/apistore/stream.go +++ b/pkg/storage/unified/apistore/stream.go @@ -19,25 +19,33 @@ import ( ) type streamDecoder struct { - client resource.ResourceStore_WatchClient - newFunc func() runtime.Object - predicate storage.SelectionPredicate - codec runtime.Codec - cancelWatch context.CancelFunc - done sync.WaitGroup + client resource.ResourceStore_WatchClient + newFunc func() runtime.Object + predicate storage.SelectionPredicate + codec runtime.Codec + cancelWatch context.CancelFunc + done sync.WaitGroup + internalConversion func([]byte, runtime.Object) (runtime.Object, error) } -func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc) *streamDecoder { +func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc, internalConversion func([]byte, runtime.Object) (runtime.Object, error)) *streamDecoder { return &streamDecoder{ - client: client, - newFunc: newFunc, - predicate: predicate, - codec: codec, - cancelWatch: cancelWatch, + client: client, + newFunc: newFunc, + predicate: predicate, + codec: codec, + cancelWatch: cancelWatch, + internalConversion: internalConversion, } } func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) { - obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc()) + var obj runtime.Object + var err error + if d.internalConversion != nil { + obj, err = d.internalConversion(w.Value, d.newFunc()) + } else { + obj, _, err = d.codec.Decode(w.Value, nil, d.newFunc()) + } if err == nil { accessor, err := utils.MetaAccessor(obj) if err != nil { diff --git a/pkg/storage/unified/apistore/stream_test.go b/pkg/storage/unified/apistore/stream_test.go new file mode 100644 index 00000000000..e707fbda5d8 --- /dev/null +++ b/pkg/storage/unified/apistore/stream_test.go @@ -0,0 +1,34 @@ +package apistore + +import ( + "testing" + + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestStreamDecoder(t *testing.T) { + t.Run("toObject should handle internal conversion", func(t *testing.T) { + called := false + internalConversion := func(data []byte, obj runtime.Object) (runtime.Object, error) { + called = true + return obj, nil + } + + decoder := &streamDecoder{ + newFunc: func() runtime.Object { return &unstructured.Unstructured{} }, + internalConversion: internalConversion, + } + + event := &resource.WatchEvent_Resource{ + Value: []byte("test"), + } + + obj, err := decoder.toObject(event) + require.NoError(t, err) + require.NotNil(t, obj) + require.True(t, called, "internal conversion function should have been called") + }) +}