mirror of
https://github.com/grafana/grafana.git
synced 2025-08-01 05:01:50 +08:00
unistore: fix dashboard conversion events (#101053)
fix dashboard conversion events
This commit is contained in:
@ -294,7 +294,7 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption
|
||||
}
|
||||
|
||||
reporter := apierrors.NewClientErrorReporter(500, "WATCH", "")
|
||||
decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch)
|
||||
decoder := newStreamDecoder(client, s.newFunc, predicate, s.codec, cancelWatch, s.opts.InternalConversion)
|
||||
|
||||
return watch.NewStreamWatcher(decoder, reporter), nil
|
||||
}
|
||||
|
@ -19,25 +19,33 @@ import (
|
||||
)
|
||||
|
||||
type streamDecoder struct {
|
||||
client resource.ResourceStore_WatchClient
|
||||
newFunc func() runtime.Object
|
||||
predicate storage.SelectionPredicate
|
||||
codec runtime.Codec
|
||||
cancelWatch context.CancelFunc
|
||||
done sync.WaitGroup
|
||||
client resource.ResourceStore_WatchClient
|
||||
newFunc func() runtime.Object
|
||||
predicate storage.SelectionPredicate
|
||||
codec runtime.Codec
|
||||
cancelWatch context.CancelFunc
|
||||
done sync.WaitGroup
|
||||
internalConversion func([]byte, runtime.Object) (runtime.Object, error)
|
||||
}
|
||||
|
||||
func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc) *streamDecoder {
|
||||
func newStreamDecoder(client resource.ResourceStore_WatchClient, newFunc func() runtime.Object, predicate storage.SelectionPredicate, codec runtime.Codec, cancelWatch context.CancelFunc, internalConversion func([]byte, runtime.Object) (runtime.Object, error)) *streamDecoder {
|
||||
return &streamDecoder{
|
||||
client: client,
|
||||
newFunc: newFunc,
|
||||
predicate: predicate,
|
||||
codec: codec,
|
||||
cancelWatch: cancelWatch,
|
||||
client: client,
|
||||
newFunc: newFunc,
|
||||
predicate: predicate,
|
||||
codec: codec,
|
||||
cancelWatch: cancelWatch,
|
||||
internalConversion: internalConversion,
|
||||
}
|
||||
}
|
||||
func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) {
|
||||
obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc())
|
||||
var obj runtime.Object
|
||||
var err error
|
||||
if d.internalConversion != nil {
|
||||
obj, err = d.internalConversion(w.Value, d.newFunc())
|
||||
} else {
|
||||
obj, _, err = d.codec.Decode(w.Value, nil, d.newFunc())
|
||||
}
|
||||
if err == nil {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
|
34
pkg/storage/unified/apistore/stream_test.go
Normal file
34
pkg/storage/unified/apistore/stream_test.go
Normal file
@ -0,0 +1,34 @@
|
||||
package apistore
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
func TestStreamDecoder(t *testing.T) {
|
||||
t.Run("toObject should handle internal conversion", func(t *testing.T) {
|
||||
called := false
|
||||
internalConversion := func(data []byte, obj runtime.Object) (runtime.Object, error) {
|
||||
called = true
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
decoder := &streamDecoder{
|
||||
newFunc: func() runtime.Object { return &unstructured.Unstructured{} },
|
||||
internalConversion: internalConversion,
|
||||
}
|
||||
|
||||
event := &resource.WatchEvent_Resource{
|
||||
Value: []byte("test"),
|
||||
}
|
||||
|
||||
obj, err := decoder.toObject(event)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, obj)
|
||||
require.True(t, called, "internal conversion function should have been called")
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user