mirror of
https://github.com/grafana/grafana.git
synced 2025-08-01 04:41:50 +08:00
DualWriter: remove RV+UID for secondary update (#105543)
This commit is contained in:
@ -24,16 +24,6 @@ var (
|
|||||||
_ rest.SingularNameProvider = (DualWriter)(nil)
|
_ rest.SingularNameProvider = (DualWriter)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
type dualWriteContextKey struct{}
|
|
||||||
|
|
||||||
func IsDualWriteUpdate(ctx context.Context) bool {
|
|
||||||
return ctx.Value(dualWriteContextKey{}) == true
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithDualWriteUpdate(ctx context.Context) context.Context {
|
|
||||||
return context.WithValue(ctx, dualWriteContextKey{}, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Function that will create a dual writer
|
// Function that will create a dual writer
|
||||||
type DualWriteBuilder func(gr schema.GroupResource, legacy Storage, unified Storage) (Storage, error)
|
type DualWriteBuilder func(gr schema.GroupResource, legacy Storage, unified Storage) (Storage, error)
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
|
||||||
"github.com/grafana/grafana-app-sdk/logging"
|
"github.com/grafana/grafana-app-sdk/logging"
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -202,36 +202,44 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r
|
|||||||
func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
||||||
log := d.log.With("method", "Update").WithContext(ctx)
|
log := d.log.With("method", "Update").WithContext(ctx)
|
||||||
|
|
||||||
// The incoming RV is not stable -- it may be from legacy or storage!
|
|
||||||
// This sets a flag in the context and our apistore is more lenient when it exists
|
|
||||||
ctx = grafanarest.WithDualWriteUpdate(ctx)
|
|
||||||
|
|
||||||
// update in legacy first, and then unistore. Will return a failure if either fails.
|
// update in legacy first, and then unistore. Will return a failure if either fails.
|
||||||
//
|
//
|
||||||
// we want to update in legacy first, otherwise if the update from unistore was successful,
|
// we want to update in legacy first, otherwise if the update from unistore was successful,
|
||||||
// but legacy failed, the user would get a failure, but see the update did apply to the source
|
// but legacy failed, the user would get a failure, but see the update did apply to the source
|
||||||
// of truth, and be less likely to retry to save (and get the stores in sync again)
|
// of truth, and be less likely to retry to save (and get the stores in sync again)
|
||||||
|
|
||||||
objFromLegacy, createdLegacy, err := d.legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
legacyInfo := objInfo
|
||||||
|
legacyForceCreate := forceAllowCreate
|
||||||
|
unifiedInfo := objInfo
|
||||||
|
unifiedForceCreate := forceAllowCreate
|
||||||
|
if d.readUnified {
|
||||||
|
legacyInfo = &wrappedUpdateInfo{objInfo}
|
||||||
|
legacyForceCreate = true
|
||||||
|
} else {
|
||||||
|
unifiedInfo = &wrappedUpdateInfo{objInfo}
|
||||||
|
unifiedForceCreate = true
|
||||||
|
}
|
||||||
|
|
||||||
|
objFromLegacy, createdLegacy, err := d.legacy.Update(ctx, name, legacyInfo, createValidation, updateValidation, legacyForceCreate, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With("object", objFromLegacy).Error("could not update in legacy storage", "err", err)
|
log.With("object", objFromLegacy).Error("could not update in legacy storage", "err", err)
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
// If unified storage is our primary store, just update it there and return.
|
|
||||||
if d.readUnified {
|
if d.readUnified {
|
||||||
return d.unified.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
return d.unified.Update(ctx, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options)
|
||||||
} else if d.errorIsOK {
|
} else if d.errorIsOK {
|
||||||
// If unified is not primary, but errors are okay, we can just run in the background.
|
// If unified is not primary, but errors are okay, we can just run in the background.
|
||||||
go func(ctxBg context.Context, cancel context.CancelFunc) {
|
go func(ctxBg context.Context, cancel context.CancelFunc) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if _, _, err := d.unified.Update(ctxBg, name, objInfo, createValidation, updateValidation, forceAllowCreate, options); err != nil {
|
if _, _, err := d.unified.Update(ctxBg, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil {
|
||||||
log.Error("failed background UPDATE to unified storage", "err", err)
|
log.Error("failed background UPDATE to unified storage", "err", err)
|
||||||
}
|
}
|
||||||
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
|
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
|
||||||
return objFromLegacy, createdLegacy, nil
|
return objFromLegacy, createdLegacy, nil
|
||||||
}
|
}
|
||||||
// If we want to check unified errors just run it in foreground.
|
// If we want to check unified errors just run it in foreground.
|
||||||
if _, _, err := d.unified.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options); err != nil {
|
if _, _, err := d.unified.Update(ctx, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
return objFromLegacy, createdLegacy, nil
|
return objFromLegacy, createdLegacy, nil
|
||||||
@ -298,3 +306,27 @@ func (d *dualWriter) NewList() runtime.Object {
|
|||||||
func (d *dualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
|
func (d *dualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
|
||||||
return d.unified.ConvertToTable(ctx, object, tableOptions)
|
return d.unified.ConvertToTable(ctx, object, tableOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type wrappedUpdateInfo struct {
|
||||||
|
objInfo rest.UpdatedObjectInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preconditions implements rest.UpdatedObjectInfo.
|
||||||
|
func (w *wrappedUpdateInfo) Preconditions() *metav1.Preconditions {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdatedObject implements rest.UpdatedObjectInfo.
|
||||||
|
func (w *wrappedUpdateInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
|
||||||
|
obj, err := w.objInfo.UpdatedObject(ctx, oldObj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
meta, err := utils.MetaAccessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
meta.SetResourceVersion("")
|
||||||
|
meta.SetUID("")
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
@ -35,7 +35,6 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||||
"github.com/grafana/grafana/pkg/apiserver/rest"
|
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
)
|
)
|
||||||
@ -557,16 +556,11 @@ func (s *Storage) GuaranteedUpdate(
|
|||||||
existing.SetResourceVersionInt64(readResponse.ResourceVersion)
|
existing.SetResourceVersionInt64(readResponse.ResourceVersion)
|
||||||
res.ResourceVersion = uint64(readResponse.ResourceVersion)
|
res.ResourceVersion = uint64(readResponse.ResourceVersion)
|
||||||
|
|
||||||
if rest.IsDualWriteUpdate(ctx) {
|
if err := preconditions.Check(key, existingObj); err != nil {
|
||||||
// Ignore the RV when updating legacy values
|
if attempt >= MaxUpdateAttempts {
|
||||||
existing.SetResourceVersion("")
|
return fmt.Errorf("precondition failed: %w", err)
|
||||||
} else {
|
|
||||||
if err := preconditions.Check(key, existingObj); err != nil {
|
|
||||||
if attempt >= MaxUpdateAttempts {
|
|
||||||
return fmt.Errorf("precondition failed: %w", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore the full original object before tryUpdate
|
// restore the full original object before tryUpdate
|
||||||
|
Reference in New Issue
Block a user