diff --git a/pkg/apiserver/rest/dualwriter_mode1.go b/pkg/apiserver/rest/dualwriter_mode1.go index 6fbb350a95d..3641039b77f 100644 --- a/pkg/apiserver/rest/dualwriter_mode1.go +++ b/pkg/apiserver/rest/dualwriter_mode1.go @@ -2,7 +2,10 @@ package rest import ( "context" + "errors" + "time" + "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -17,6 +20,10 @@ type DualWriterMode1 struct { *dualWriterMetrics } +const ( + mode1Str = "1" +) + // NewDualWriterMode1 returns a new DualWriter in mode 1. // Mode 1 represents writing to and reading from LegacyStorage. func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1 { @@ -27,36 +34,195 @@ func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1 // Create overrides the behavior of the generic DualWriter and writes only to LegacyStorage. func (d *DualWriterMode1) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { - ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.Create(ctx, obj, createValidation, options) + log := d.Log.WithValues("kind", options.Kind) + ctx = klog.NewContext(ctx, log) + var method = "create" + + startLegacy := time.Now() + res, err := d.Legacy.Create(ctx, obj, createValidation, options) + if err != nil { + log.Error(err, "unable to create object in legacy storage") + d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy) + return res, err + } + d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy) + + go func() { + accessorCreated, err := meta.Accessor(res) + if err != nil { + log.Error(err, "unable to get accessor for created object") + } + + accessorOld, err := meta.Accessor(obj) + if err != nil { + log.Error(err, "unable to get accessor for old object") + } + + enrichObject(accessorOld, accessorCreated) + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage create timeout")) + defer cancel() + _, errObjectSt := d.Storage.Create(ctx, obj, createValidation, options) + d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, nil } // Get overrides the behavior of the generic DualWriter and reads only from LegacyStorage. func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { - ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.Get(ctx, name, options) + log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind) + ctx = klog.NewContext(ctx, log) + var method = "get" + + startLegacy := time.Now() + res, errLegacy := d.Legacy.Get(ctx, name, options) + if errLegacy != nil { + log.Error(errLegacy, "unable to get object in legacy storage") + } + d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy) + + go func() { + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout")) + defer cancel() + _, err := d.Storage.Get(ctx, name, options) + d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, errLegacy } // List overrides the behavior of the generic DualWriter and reads only from LegacyStorage. func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.List(ctx, options) + log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind) + ctx = klog.NewContext(ctx, log) + var method = "list" + + startLegacy := time.Now() + res, errLegacy := d.Legacy.List(ctx, options) + if errLegacy != nil { + log.Error(errLegacy, "unable to list object in legacy storage") + } + d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy) + + go func() { + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout")) + defer cancel() + _, err := d.Storage.List(ctx, options) + d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, errLegacy } func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { + log := d.Log.WithValues("name", name, "kind", options.Kind) ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.Delete(ctx, name, deleteValidation, options) + var method = "delete" + + startLegacy := time.Now() + res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options) + if err != nil { + log.Error(err, "unable to delete object in legacy storage") + d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy) + return res, async, err + } + d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy) + + go func() { + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout")) + defer cancel() + _, _, err := d.Storage.Delete(ctx, name, deleteValidation, options) + d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, async, nil } // DeleteCollection overrides the behavior of the generic DualWriter and deletes only from LegacyStorage. func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { - ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) + log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion) + ctx = klog.NewContext(ctx, log) + var method = "delete-collection" + + startLegacy := time.Now() + res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) + if err != nil { + log.Error(err, "unable to delete collection in legacy storage") + d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy) + return res, err + } + d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy) + + go func() { + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout")) + defer cancel() + _, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) + d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, nil } func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - ctx = klog.NewContext(ctx, d.Log) - return d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + log := d.Log.WithValues("name", name, "kind", options.Kind) + ctx = klog.NewContext(ctx, log) + var method = "update" + + startLegacy := time.Now() + res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + if err != nil { + log.Error(err, "unable to update in legacy storage") + d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy) + return res, async, err + } + d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy) + + go func() { + updated, err := objInfo.UpdatedObject(ctx, res) + if err != nil { + log.WithValues("object", updated).Error(err, "could not update or create object") + } + + // get the object to be updated + foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) + if err != nil { + log.WithValues("object", foundObj).Error(err, "could not get object to update") + } + + // if the object is found, create a new updateWrapper with the object found + if foundObj != nil { + accessorOld, err := meta.Accessor(foundObj) + if err != nil { + log.Error(err, "unable to get accessor for original updated object") + } + + accessor, err := meta.Accessor(res) + if err != nil { + log.Error(err, "unable to get accessor for updated object") + } + + accessor.SetResourceVersion(accessorOld.GetResourceVersion()) + accessor.SetUID(accessorOld.GetUID()) + + enrichObject(accessorOld, accessor) + objInfo = &updateWrapper{ + upstream: objInfo, + updated: res, + } + } + startStorage := time.Now() + ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout")) + defer cancel() + _, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage) + }() + + return res, async, nil } func (d *DualWriterMode1) Destroy() { diff --git a/pkg/apiserver/rest/dualwriter_mode1_test.go b/pkg/apiserver/rest/dualwriter_mode1_test.go index 0f7b5378add..5a3ad831e81 100644 --- a/pkg/apiserver/rest/dualwriter_mode1_test.go +++ b/pkg/apiserver/rest/dualwriter_mode1_test.go @@ -13,11 +13,11 @@ import ( "k8s.io/apiserver/pkg/apis/example" ) -var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} -var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} -var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} -var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} -var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}} +var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} +var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} +var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} +var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} +var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}} var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}} func TestMode1_Create(t *testing.T) { diff --git a/pkg/apiserver/rest/dualwriter_mode2.go b/pkg/apiserver/rest/dualwriter_mode2.go index 6be6797ec16..84be2c08e97 100644 --- a/pkg/apiserver/rest/dualwriter_mode2.go +++ b/pkg/apiserver/rest/dualwriter_mode2.go @@ -32,11 +32,12 @@ func NewDualWriterMode2(legacy LegacyStorage, storage Storage) *DualWriterMode2 // Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage. func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { - ctx = klog.NewContext(ctx, d.Log) + log := d.Log.WithValues("kind", options.Kind) + ctx = klog.NewContext(ctx, log) created, err := d.Legacy.Create(ctx, obj, createValidation, options) if err != nil { - d.Log.Error(err, "unable to create object in legacy storage") + log.Error(err, "unable to create object in legacy storage") return created, err } @@ -58,7 +59,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, create rsp, err := d.Storage.Create(ctx, created, createValidation, options) if err != nil { - d.Log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in duplicate storage") + log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in storage") } return rsp, err } @@ -66,7 +67,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, create // Get overrides the behavior of the generic DualWriter. // It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage. func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { - log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion) + log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind) ctx = klog.NewContext(ctx, log) s, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) if err != nil { @@ -83,7 +84,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1. // List overrides the behavior of the generic DualWriter. // It returns Storage entries if possible and falls back to LegacyStorage entries if not. func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion) + log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind) ctx = klog.NewContext(ctx, log) ll, err := d.Legacy.List(ctx, options) if err != nil { @@ -167,7 +168,7 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation } func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { - log := d.Log.WithValues("name", name) + log := d.Log.WithValues("name", name, "kind", options.Kind) ctx = klog.NewContext(ctx, log) deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options) @@ -190,23 +191,21 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat // Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage. func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - var notFound bool - log := d.Log.WithValues("name", name) + log := d.Log.WithValues("name", name, "kind", options.Kind) ctx = klog.NewContext(ctx, log) - // get old and new (updated) object so they can be stored in legacy store - old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) + // get foundObj and new (updated) object so they can be stored in legacy store + foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) if err != nil { if !apierrors.IsNotFound(err) { - log.WithValues("object", old).Error(err, "could not get object to update") + log.WithValues("object", foundObj).Error(err, "could not get object to update") return nil, false, err } - notFound = true log.Info("object not found for update, creating one") } // obj can be populated in case it's found or empty in case it's not found - updated, err := objInfo.UpdatedObject(ctx, old) + updated, err := objInfo.UpdatedObject(ctx, foundObj) if err != nil { log.WithValues("object", updated).Error(err, "could not update or create object") return nil, false, err @@ -218,31 +217,28 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest. return obj, created, err } - if notFound { - return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + // if the object is found, create a new updateWrapper with the object found + if foundObj != nil { + accessorOld, err := meta.Accessor(foundObj) + if err != nil { + log.Error(err, "unable to get accessor for original updated object") + } + + accessor, err := meta.Accessor(obj) + if err != nil { + log.Error(err, "unable to get accessor for updated object") + } + + enrichObject(accessorOld, accessor) + + accessor.SetResourceVersion(accessorOld.GetResourceVersion()) + accessor.SetUID(accessorOld.GetUID()) + + objInfo = &updateWrapper{ + upstream: objInfo, + updated: obj, + } } - - accessor, err := meta.Accessor(obj) - if err != nil { - return nil, false, err - } - - // only if object exists - accessorOld, err := meta.Accessor(old) - if err != nil { - return nil, false, err - } - - enrichObject(accessorOld, accessor) - - // keep the same UID and resource_version - accessor.SetResourceVersion(accessorOld.GetResourceVersion()) - accessor.SetUID(accessorOld.GetUID()) - objInfo = &updateWrapper{ - upstream: objInfo, - updated: obj, - } - // TODO: relies on GuaranteedUpdate creating the object if // it doesn't exist: https://github.com/grafana/grafana/pull/85206 return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) @@ -309,6 +305,9 @@ func enrichObject(accessorO, accessorC metav1.Object) { accessorC.SetLabels(accessorO.GetLabels()) ac := accessorC.GetAnnotations() + if ac == nil { + ac = map[string]string{} + } for k, v := range accessorO.GetAnnotations() { ac[k] = v } diff --git a/pkg/apiserver/rest/dualwriter_mode2_test.go b/pkg/apiserver/rest/dualwriter_mode2_test.go index 845b9586e9b..cbc41a38701 100644 --- a/pkg/apiserver/rest/dualwriter_mode2_test.go +++ b/pkg/apiserver/rest/dualwriter_mode2_test.go @@ -424,8 +424,8 @@ func TestMode2_Update(t *testing.T) { wantErr: true, }, { - name: "error updating storage", - input: "object-fail", + name: "error updating storage with not found object", + input: "not-found", setupLegacyFn: func(m *mock.Mock, input string) { m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) }, @@ -433,7 +433,7 @@ func TestMode2_Update(t *testing.T) { m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error")) }, setupGetFn: func(m *mock.Mock, input string) { - m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil) + m.On("Get", mock.Anything, input, mock.Anything).Return(nil, errors.New("")) }, wantErr: true, }, diff --git a/pkg/apiserver/rest/metrics.go b/pkg/apiserver/rest/metrics.go index 36706efeec7..a4e72457a8f 100644 --- a/pkg/apiserver/rest/metrics.go +++ b/pkg/apiserver/rest/metrics.go @@ -1,6 +1,11 @@ package rest -import "github.com/prometheus/client_golang/prometheus" +import ( + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" +) type dualWriterMetrics struct { legacy *prometheus.HistogramVec @@ -14,7 +19,7 @@ var DualWriterStorageDuration = prometheus.NewHistogramVec(prometheus.HistogramO Help: "Histogram for the runtime of dual writer storage duration per mode", Namespace: "grafana", NativeHistogramBucketFactor: 1.1, -}, []string{"status_code", "mode", "name", "method"}) +}, []string{"is_error", "mode", "kind", "method"}) // DualWriterLegacyDuration is a metric summary for dual writer legacy duration per mode var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -22,7 +27,7 @@ var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOp Help: "Histogram for the runtime of dual writer legacy duration per mode", Namespace: "grafana", NativeHistogramBucketFactor: 1.1, -}, []string{"status_code", "mode", "name", "method"}) +}, []string{"is_error", "mode", "kind", "method"}) // DualWriterOutcome is a metric summary for dual writer outcome comparison between the 2 stores per mode var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -30,7 +35,7 @@ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Help: "Histogram for the runtime of dual writer outcome comparison between the 2 stores per mode", Namespace: "grafana", NativeHistogramBucketFactor: 1.1, -}, []string{"mode", "name", "outcome", "method"}) +}, []string{"mode", "name", "method"}) func (m *dualWriterMetrics) init() { m.legacy = DualWriterLegacyDuration @@ -38,17 +43,21 @@ func (m *dualWriterMetrics) init() { m.outcome = DualWriterOutcome } -// nolint:unused -func (m *dualWriterMetrics) recordLegacyDuration(statusCode string, mode string, name string, method string, duration float64) { - m.legacy.WithLabelValues(statusCode, mode, name, method).Observe(duration) +func (m *dualWriterMetrics) recordLegacyDuration(isError bool, mode string, name string, method string, startFrom time.Time) { + duration := time.Since(startFrom).Seconds() + m.legacy.WithLabelValues(strconv.FormatBool(isError), mode, name, method).Observe(duration) +} + +func (m *dualWriterMetrics) recordStorageDuration(isError bool, mode string, name string, method string, startFrom time.Time) { + duration := time.Since(startFrom).Seconds() + m.storage.WithLabelValues(strconv.FormatBool(isError), mode, name, method).Observe(duration) } // nolint:unused -func (m *dualWriterMetrics) recordStorageDuration(statusCode string, mode string, name string, method string, duration float64) { - m.storage.WithLabelValues(statusCode, mode, name, method).Observe(duration) -} - -// nolint:unused -func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome string, method string) { - m.outcome.WithLabelValues(mode, name, outcome, method).Observe(1) +func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome bool, method string) { + var observeValue float64 + if outcome { + observeValue = 1 + } + m.outcome.WithLabelValues(mode, name, method).Observe(observeValue) }