mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 15:02:28 +08:00

Create DualWriterMetrics once, and pass them around. Don't use prometheus.DefaultRegisterer.
213 lines
7.0 KiB
Go
213 lines
7.0 KiB
Go
package rest
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
var (
|
|
_ rest.Storage = (DualWriter)(nil)
|
|
_ rest.Scoper = (DualWriter)(nil)
|
|
_ rest.TableConvertor = (DualWriter)(nil)
|
|
_ rest.CreaterUpdater = (DualWriter)(nil)
|
|
_ rest.CollectionDeleter = (DualWriter)(nil)
|
|
_ rest.GracefulDeleter = (DualWriter)(nil)
|
|
_ rest.SingularNameProvider = (DualWriter)(nil)
|
|
)
|
|
|
|
// Function that will create a dual writer
|
|
type DualWriteBuilder func(gr schema.GroupResource, legacy Storage, unified Storage) (Storage, error)
|
|
|
|
// Storage is a storage implementation that satisfies the same interfaces as genericregistry.Store.
|
|
type Storage interface {
|
|
rest.Storage
|
|
rest.Scoper
|
|
rest.TableConvertor
|
|
rest.SingularNameProvider
|
|
rest.Getter
|
|
rest.Lister
|
|
rest.CreaterUpdater
|
|
rest.GracefulDeleter
|
|
rest.CollectionDeleter
|
|
}
|
|
|
|
// DualWriter is a storage implementation that writes first to LegacyStorage and then to Storage.
|
|
// If writing to LegacyStorage fails, the write to Storage is skipped and the error is returned.
|
|
// Storage is used for all read operations. This is useful as a migration step from SQL based
|
|
// legacy storage to a more standard kubernetes backed storage interface.
|
|
//
|
|
// NOTE: Only values supported by legacy storage will be preserved in the CREATE/UPDATE commands.
|
|
// For example, annotations, labels, and managed fields may not be preserved. Everything in upstream
|
|
// storage can be recrated from the data in legacy storage.
|
|
//
|
|
// The LegacyStorage implementation must implement the following interfaces:
|
|
// - rest.Storage
|
|
// - rest.TableConvertor
|
|
// - rest.Scoper
|
|
// - rest.SingularNameProvider
|
|
//
|
|
// These interfaces are optional, but they all should be implemented to fully support dual writes:
|
|
// - rest.Creater
|
|
// - rest.Updater
|
|
// - rest.GracefulDeleter
|
|
// - rest.CollectionDeleter
|
|
|
|
type DualWriter interface {
|
|
Storage
|
|
Mode() DualWriterMode
|
|
}
|
|
|
|
type DualWriterMode int
|
|
|
|
const (
|
|
// Mode0 represents writing to and reading from solely LegacyStorage. This mode is enabled when the
|
|
// Unified Storage is disabled. All reads and writes are made to LegacyStorage. None are made to Storage.
|
|
Mode0 DualWriterMode = iota
|
|
// Mode1 represents writing to and reading from LegacyStorage for all primary functionality while additionally
|
|
// reading and writing to Storage on a best effort basis for the sake of collecting metrics.
|
|
Mode1
|
|
// Mode2 is the dual writing mode that represents writing to LegacyStorage and Storage and reading from LegacyStorage.
|
|
// The objects written to storage will include any labels and annotations.
|
|
// When reading values, the results will be from LegacyStorage.
|
|
Mode2
|
|
// Mode3 represents writing to LegacyStorage and Storage and reading from Storage.
|
|
// NOTE: Requesting mode3 will only happen when after a background sync job succeeds
|
|
Mode3
|
|
// Mode4 represents writing and reading from Storage.
|
|
// NOTE: Requesting mode4 will only happen when after a background sync job succeeds
|
|
Mode4
|
|
// Mode5 uses storage regardless of the background sync state
|
|
Mode5
|
|
)
|
|
|
|
type NamespacedKVStore interface {
|
|
Get(ctx context.Context, key string) (string, bool, error)
|
|
Set(ctx context.Context, key, value string) error
|
|
}
|
|
|
|
type ServerLockService interface {
|
|
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
|
}
|
|
|
|
func SetDualWritingMode(
|
|
ctx context.Context,
|
|
kvs NamespacedKVStore,
|
|
cfg *SyncerConfig,
|
|
metrics *DualWriterMetrics,
|
|
) (DualWriterMode, error) {
|
|
if cfg == nil {
|
|
return Mode0, errors.New("syncer config is nil")
|
|
}
|
|
// Mode0 means no DualWriter
|
|
if cfg.Mode == Mode0 {
|
|
return Mode0, nil
|
|
}
|
|
|
|
toMode := map[string]DualWriterMode{
|
|
// It is not possible to initialize a mode 0 dual writer. Mode 0 represents
|
|
// writing to legacy storage without Unified Storage enabled.
|
|
"1": Mode1,
|
|
"2": Mode2,
|
|
"3": Mode3,
|
|
"4": Mode4,
|
|
"5": Mode5,
|
|
}
|
|
errDualWriterSetCurrentMode := errors.New("failed to set current dual writing mode")
|
|
|
|
// Use entity name as key
|
|
kvMode, ok, err := kvs.Get(ctx, cfg.Kind)
|
|
if err != nil {
|
|
return Mode0, errors.New("failed to fetch current dual writing mode")
|
|
}
|
|
|
|
currentMode, exists := toMode[kvMode]
|
|
|
|
// If the mode does not exist in our mapping, we log an error.
|
|
if !exists && ok {
|
|
// Only log if "ok" because initially all instances will have mode unset for playlists.
|
|
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, kvMode)
|
|
}
|
|
|
|
// If the mode does not exist in our mapping, and we also didn't find an entry for this kind, fallback.
|
|
if !exists || !ok {
|
|
// Default to mode 1
|
|
currentMode = Mode1
|
|
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
|
|
return Mode0, errDualWriterSetCurrentMode
|
|
}
|
|
}
|
|
|
|
isUpgradeToReadUnifiedMode := currentMode < Mode3 && cfg.Mode >= Mode3
|
|
if !isUpgradeToReadUnifiedMode {
|
|
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
|
return Mode0, errDualWriterSetCurrentMode
|
|
}
|
|
return cfg.Mode, nil
|
|
}
|
|
|
|
// If SkipDataSync is enabled, we can set the mode directly without running the syncer.
|
|
if cfg.SkipDataSync {
|
|
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
|
return Mode0, errDualWriterSetCurrentMode
|
|
}
|
|
return cfg.Mode, nil
|
|
}
|
|
|
|
// Transitioning to Mode3 or higher from Mode0, Mode1, or Mode2.
|
|
// We need to run the syncer in the current mode before we can upgrade to Mode3 or higher.
|
|
cfgModeTmp := cfg.Mode
|
|
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
|
|
// once in the current active mode before we can upgrade.
|
|
cfg.Mode = currentMode
|
|
syncOk, err := runDataSyncer(ctx, cfg, metrics)
|
|
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
|
|
cfg.Mode = cfgModeTmp
|
|
if err != nil {
|
|
klog.Error("data syncer failed for mode:", kvMode, "err", err)
|
|
return currentMode, nil
|
|
}
|
|
if !syncOk {
|
|
klog.Info("data syncer not ok for mode:", kvMode)
|
|
return currentMode, nil
|
|
}
|
|
// If sync is successful, update the mode to the desired one.
|
|
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
|
return Mode0, errDualWriterSetCurrentMode
|
|
}
|
|
return cfg.Mode, nil
|
|
}
|
|
|
|
var defaultConverter = runtime.UnstructuredConverter(runtime.DefaultUnstructuredConverter)
|
|
|
|
// Compare asserts on the equality of objects returned from both stores (object storage and legacy storage)
|
|
func Compare(storageObj, legacyObj runtime.Object) bool {
|
|
if storageObj == nil || legacyObj == nil {
|
|
return storageObj == nil && legacyObj == nil
|
|
}
|
|
return bytes.Equal(extractSpec(storageObj), extractSpec(legacyObj))
|
|
}
|
|
|
|
func extractSpec(obj runtime.Object) []byte {
|
|
cpy := obj.DeepCopyObject()
|
|
unstObj, err := defaultConverter.ToUnstructured(cpy)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
// we just want to compare the spec field
|
|
jsonObj, err := json.Marshal(unstObj["spec"])
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return jsonObj
|
|
}
|