mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 06:32:28 +08:00
167 lines
4.1 KiB
Go
167 lines
4.1 KiB
Go
package dualwrite
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
|
|
"github.com/grafana/grafana-app-sdk/logging"
|
|
"github.com/grafana/grafana/pkg/infra/kvstore"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
)
|
|
|
|
func ProvideStaticServiceForTests(cfg *setting.Cfg) Service {
|
|
if cfg == nil {
|
|
cfg = &setting.Cfg{}
|
|
}
|
|
return &staticService{cfg}
|
|
}
|
|
|
|
func ProvideService(
|
|
features featuremgmt.FeatureToggles,
|
|
reg prometheus.Registerer,
|
|
kv kvstore.KVStore,
|
|
cfg *setting.Cfg) Service {
|
|
enabled := features.IsEnabledGlobally(featuremgmt.FlagManagedDualWriter) ||
|
|
features.IsEnabledGlobally(featuremgmt.FlagProvisioning) // required for git provisioning
|
|
if !enabled && cfg != nil {
|
|
return &staticService{cfg} // fallback to using the dual write flags from cfg
|
|
}
|
|
|
|
db := &keyvalueDB{
|
|
db: kv,
|
|
logger: logging.DefaultLogger.With("logger", "dualwrite.kv"),
|
|
}
|
|
|
|
// TODO: remove this after G12.1
|
|
if cfg != nil {
|
|
migrateFileDBTo(cfg, db)
|
|
}
|
|
|
|
return &service{
|
|
db: db,
|
|
reg: reg,
|
|
enabled: enabled,
|
|
}
|
|
}
|
|
|
|
type service struct {
|
|
db *keyvalueDB
|
|
reg prometheus.Registerer
|
|
enabled bool
|
|
}
|
|
|
|
// Hardcoded list of resources that should be controlled by the database (eventually everything?)
|
|
func (m *service) ShouldManage(gr schema.GroupResource) bool {
|
|
if !m.enabled {
|
|
return false
|
|
}
|
|
switch gr.String() {
|
|
case "folders.folder.grafana.app":
|
|
return true
|
|
case "dashboards.dashboard.grafana.app":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *service) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) {
|
|
v, ok, err := m.db.get(ctx, gr)
|
|
return ok && v.ReadUnified, err
|
|
}
|
|
|
|
// Status implements Service.
|
|
func (m *service) Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) {
|
|
v, found, err := m.db.get(ctx, gr)
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
if !found {
|
|
v = StorageStatus{
|
|
Group: gr.Group,
|
|
Resource: gr.Resource,
|
|
WriteLegacy: true,
|
|
WriteUnified: true, // Write both, but read legacy
|
|
ReadUnified: false,
|
|
Migrated: 0,
|
|
Migrating: 0,
|
|
Runtime: true, // need to explicitly ask for not runtime
|
|
UpdateKey: 1,
|
|
}
|
|
err := m.db.set(ctx, v)
|
|
return v, err
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
// StartMigration implements Service.
|
|
func (m *service) StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) {
|
|
now := time.Now().UnixMilli()
|
|
v, ok, err := m.db.get(ctx, gr)
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
if ok {
|
|
if v.Migrated > 0 {
|
|
return v, fmt.Errorf("already migrated")
|
|
}
|
|
if key != v.UpdateKey {
|
|
return v, fmt.Errorf("migration key mismatch")
|
|
}
|
|
if v.Migrating > 0 {
|
|
return v, fmt.Errorf("migration in progress")
|
|
}
|
|
|
|
v.Migrating = now
|
|
v.UpdateKey++
|
|
} else {
|
|
v = StorageStatus{
|
|
Group: gr.Group,
|
|
Resource: gr.Resource,
|
|
Runtime: true,
|
|
WriteLegacy: true,
|
|
WriteUnified: true,
|
|
ReadUnified: false,
|
|
Migrating: now,
|
|
Migrated: 0, // timestamp
|
|
UpdateKey: 1,
|
|
}
|
|
}
|
|
err = m.db.set(ctx, v)
|
|
return v, err
|
|
}
|
|
|
|
// FinishMigration implements Service.
|
|
func (m *service) Update(ctx context.Context, status StorageStatus) (StorageStatus, error) {
|
|
v, ok, err := m.db.get(ctx, schema.GroupResource{Group: status.Group, Resource: status.Resource})
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
if !ok {
|
|
return v, fmt.Errorf("unable to update status that is not yet saved")
|
|
}
|
|
if status.UpdateKey != v.UpdateKey {
|
|
return v, fmt.Errorf("key mismatch (resource: %s, expected:%d, received: %d)", v.Resource, v.UpdateKey, status.UpdateKey)
|
|
}
|
|
if status.Migrating > 0 {
|
|
return v, fmt.Errorf("update can not change migrating status")
|
|
}
|
|
if status.ReadUnified {
|
|
if status.Migrated == 0 {
|
|
return v, fmt.Errorf("can not read from unified before a migration")
|
|
}
|
|
if !status.WriteUnified {
|
|
return v, fmt.Errorf("must write to unified when reading from unified")
|
|
}
|
|
}
|
|
if !status.WriteLegacy && !status.WriteUnified {
|
|
return v, fmt.Errorf("must write either legacy or unified")
|
|
}
|
|
status.UpdateKey++
|
|
return status, m.db.set(ctx, status)
|
|
}
|