Files

167 lines
4.1 KiB
Go

package dualwrite
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
)
func ProvideStaticServiceForTests(cfg *setting.Cfg) Service {
if cfg == nil {
cfg = &setting.Cfg{}
}
return &staticService{cfg}
}
func ProvideService(
features featuremgmt.FeatureToggles,
reg prometheus.Registerer,
kv kvstore.KVStore,
cfg *setting.Cfg) Service {
enabled := features.IsEnabledGlobally(featuremgmt.FlagManagedDualWriter) ||
features.IsEnabledGlobally(featuremgmt.FlagProvisioning) // required for git provisioning
if !enabled && cfg != nil {
return &staticService{cfg} // fallback to using the dual write flags from cfg
}
db := &keyvalueDB{
db: kv,
logger: logging.DefaultLogger.With("logger", "dualwrite.kv"),
}
// TODO: remove this after G12.1
if cfg != nil {
migrateFileDBTo(cfg, db)
}
return &service{
db: db,
reg: reg,
enabled: enabled,
}
}
type service struct {
db *keyvalueDB
reg prometheus.Registerer
enabled bool
}
// Hardcoded list of resources that should be controlled by the database (eventually everything?)
func (m *service) ShouldManage(gr schema.GroupResource) bool {
if !m.enabled {
return false
}
switch gr.String() {
case "folders.folder.grafana.app":
return true
case "dashboards.dashboard.grafana.app":
return true
}
return false
}
func (m *service) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) {
v, ok, err := m.db.get(ctx, gr)
return ok && v.ReadUnified, err
}
// Status implements Service.
func (m *service) Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) {
v, found, err := m.db.get(ctx, gr)
if err != nil {
return v, err
}
if !found {
v = StorageStatus{
Group: gr.Group,
Resource: gr.Resource,
WriteLegacy: true,
WriteUnified: true, // Write both, but read legacy
ReadUnified: false,
Migrated: 0,
Migrating: 0,
Runtime: true, // need to explicitly ask for not runtime
UpdateKey: 1,
}
err := m.db.set(ctx, v)
return v, err
}
return v, nil
}
// StartMigration implements Service.
func (m *service) StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) {
now := time.Now().UnixMilli()
v, ok, err := m.db.get(ctx, gr)
if err != nil {
return v, err
}
if ok {
if v.Migrated > 0 {
return v, fmt.Errorf("already migrated")
}
if key != v.UpdateKey {
return v, fmt.Errorf("migration key mismatch")
}
if v.Migrating > 0 {
return v, fmt.Errorf("migration in progress")
}
v.Migrating = now
v.UpdateKey++
} else {
v = StorageStatus{
Group: gr.Group,
Resource: gr.Resource,
Runtime: true,
WriteLegacy: true,
WriteUnified: true,
ReadUnified: false,
Migrating: now,
Migrated: 0, // timestamp
UpdateKey: 1,
}
}
err = m.db.set(ctx, v)
return v, err
}
// FinishMigration implements Service.
func (m *service) Update(ctx context.Context, status StorageStatus) (StorageStatus, error) {
v, ok, err := m.db.get(ctx, schema.GroupResource{Group: status.Group, Resource: status.Resource})
if err != nil {
return v, err
}
if !ok {
return v, fmt.Errorf("unable to update status that is not yet saved")
}
if status.UpdateKey != v.UpdateKey {
return v, fmt.Errorf("key mismatch (resource: %s, expected:%d, received: %d)", v.Resource, v.UpdateKey, status.UpdateKey)
}
if status.Migrating > 0 {
return v, fmt.Errorf("update can not change migrating status")
}
if status.ReadUnified {
if status.Migrated == 0 {
return v, fmt.Errorf("can not read from unified before a migration")
}
if !status.WriteUnified {
return v, fmt.Errorf("must write to unified when reading from unified")
}
}
if !status.WriteLegacy && !status.WriteUnified {
return v, fmt.Errorf("must write either legacy or unified")
}
status.UpdateKey++
return status, m.db.set(ctx, status)
}