Files
2025-06-10 15:33:14 +02:00

160 lines
4.9 KiB
Go

package dualwrite
import (
"context"
"net/http"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/rest"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
)
func (m *service) NewStorage(gr schema.GroupResource, legacy grafanarest.Storage, unified grafanarest.Storage) (grafanarest.Storage, error) {
status, err := m.Status(context.Background(), gr)
if err != nil {
return nil, err
}
if m.enabled && status.Runtime {
// Dynamic storage behavior
return &runtimeDualWriter{
service: m,
legacy: legacy,
unified: unified,
dualwrite: &dualWriter{legacy: legacy, unified: unified}, // not used for read
gr: gr,
}, nil
}
if status.ReadUnified {
if status.WriteLegacy {
// Write both, read unified
return &dualWriter{legacy: legacy, unified: unified, readUnified: true}, nil
}
return unified, nil
}
if status.WriteUnified {
// Write both, read legacy
return &dualWriter{legacy: legacy, unified: unified}, nil
}
return legacy, nil
}
// The runtime dual writer implements the various modes we have described as: mode:1/2/3/4/5
// However the behavior can be configured at runtime rather than just at startup.
// When a resource is marked as "migrating", all write requests will be 503 unavailable
type runtimeDualWriter struct {
service Service
legacy grafanarest.Storage
unified grafanarest.Storage
dualwrite *dualWriter
gr schema.GroupResource
}
func (d *runtimeDualWriter) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
unified, err := d.service.ReadFromUnified(ctx, d.gr)
if err != nil {
return nil, err
}
if unified {
return d.unified.Get(ctx, name, options)
}
return d.legacy.Get(ctx, name, options)
}
func (d *runtimeDualWriter) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
unified, err := d.service.ReadFromUnified(ctx, d.gr)
if err != nil {
return nil, err
}
if unified {
return d.unified.List(ctx, options)
}
return d.legacy.List(ctx, options)
}
func (d *runtimeDualWriter) getWriter(ctx context.Context) (grafanarest.Storage, error) {
status, err := d.service.Status(ctx, d.gr)
if err != nil {
return nil, err
}
if status.Migrating > 0 {
return nil, &apierrors.StatusError{
ErrStatus: metav1.Status{
Code: http.StatusServiceUnavailable,
Message: "the system is migrating",
},
}
}
if status.WriteLegacy {
if status.WriteUnified {
return d.dualwrite, nil
}
return d.legacy, nil // only write legacy (mode0)
}
return d.unified, nil // only write unified (mode4)
}
func (d *runtimeDualWriter) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
store, err := d.getWriter(ctx)
if err != nil {
return nil, err
}
return store.Create(ctx, in, createValidation, options)
}
func (d *runtimeDualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
store, err := d.getWriter(ctx)
if err != nil {
return nil, false, err
}
return store.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
}
func (d *runtimeDualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
store, err := d.getWriter(ctx)
if err != nil {
return nil, false, err
}
return store.Delete(ctx, name, deleteValidation, options)
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
func (d *runtimeDualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
store, err := d.getWriter(ctx)
if err != nil {
return nil, err
}
return store.DeleteCollection(ctx, deleteValidation, options, listOptions)
}
func (d *runtimeDualWriter) Destroy() {
d.dualwrite.Destroy()
}
func (d *runtimeDualWriter) GetSingularName() string {
return d.unified.GetSingularName()
}
func (d *runtimeDualWriter) NamespaceScoped() bool {
return d.unified.NamespaceScoped()
}
func (d *runtimeDualWriter) New() runtime.Object {
return d.unified.New()
}
func (d *runtimeDualWriter) NewList() runtime.Object {
return d.unified.NewList()
}
func (d *runtimeDualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return d.unified.ConvertToTable(ctx, object, tableOptions)
}