package sql import ( "context" "fmt" "net/http" "os" "sync" "time" "github.com/google/uuid" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/parquet" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) var ( _ resource.BulkProcessingBackend = (*backend)(nil) ) type bulkRV struct { max int64 counter int64 } // When executing a bulk import we can fake the RV values func newBulkRV() *bulkRV { t := time.Now().Truncate(time.Second * 10) return &bulkRV{ max: (t.UnixMicro() / 10000000) * 10000000, counter: 0, } } func (x *bulkRV) next(obj metav1.Object) int64 { ts := obj.GetCreationTimestamp().UnixMicro() anno := obj.GetAnnotations() if anno != nil { v := anno[utils.AnnoKeyUpdatedTimestamp] t, err := time.Parse(time.RFC3339, v) if err == nil { ts = t.UnixMicro() } } if ts > x.max || ts < 10000000 { ts = x.max } x.counter++ return (ts/10000000)*10000000 + x.counter } type bulkLock struct { running map[string]bool mu sync.Mutex } func (x *bulkLock) Start(keys []*resourcepb.ResourceKey) error { x.mu.Lock() defer x.mu.Unlock() // First verify that it is not already running ids := make([]string, len(keys)) for i, k := range keys { id := resource.NSGR(k) if x.running[id] { return &apierrors.StatusError{ErrStatus: metav1.Status{ Code: http.StatusPreconditionFailed, Message: "bulk export is already running", }} } ids[i] = id } // Then add the keys to the lock for _, k := range ids { x.running[k] = true } return nil } func (x *bulkLock) Finish(keys []*resourcepb.ResourceKey) { x.mu.Lock() defer x.mu.Unlock() for _, k := range keys { delete(x.running, resource.NSGR(k)) } } func (x *bulkLock) Active() bool { x.mu.Lock() defer x.mu.Unlock() return len(x.running) > 0 } func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse { err := b.bulkLock.Start(setting.Collection) if err != nil { return &resourcepb.BulkResponse{ Error: resource.AsErrorResult(err), } } defer b.bulkLock.Finish(setting.Collection) // We may want to first write parquet, then read parquet if b.dialect.DialectName() == "sqlite" { file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet") if err != nil { return &resourcepb.BulkResponse{ Error: resource.AsErrorResult(err), } } writer, err := parquet.NewParquetWriter(file) if err != nil { return &resourcepb.BulkResponse{ Error: resource.AsErrorResult(err), } } // write bulk to parquet rsp := writer.ProcessBulk(ctx, setting, iter) if rsp.Error != nil { return rsp } b.log.Info("using parquet buffer", "parquet", file) // Replace the iterator with one from parquet iter, err = parquet.NewParquetReader(file.Name(), 50) if err != nil { return &resourcepb.BulkResponse{ Error: resource.AsErrorResult(err), } } } return b.processBulk(ctx, setting, iter) } // internal bulk process func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse { rsp := &resourcepb.BulkResponse{} err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { rollbackWithError := func(err error) error { txerr := tx.Rollback() if txerr != nil { b.log.Warn("rollback", "error", txerr) } else { b.log.Info("rollback") } return err } bulk := &bulkWroker{ ctx: ctx, tx: tx, dialect: b.dialect, logger: logging.FromContext(ctx), } // Calculate the RV based on incoming request timestamps rv := newBulkRV() summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection)*4) // First clear everything in the transaction if setting.RebuildCollection { for _, key := range setting.Collection { summary, err := bulk.deleteCollection(key) if err != nil { return rollbackWithError(err) } summaries[resource.NSGR(key)] = summary rsp.Summary = append(rsp.Summary, summary) } } obj := &unstructured.Unstructured{} // Write each event into the history for iter.Next() { if iter.RollbackRequested() { return rollbackWithError(nil) } req := iter.Request() if req == nil { return rollbackWithError(fmt.Errorf("missing request")) } rsp.Processed++ if req.Action == resourcepb.BulkRequest_UNKNOWN { rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{ Key: req.Key, Action: req.Action, Error: "unknown action", }) continue } err := obj.UnmarshalJSON(req.Value) if err != nil { rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{ Key: req.Key, Action: req.Action, Error: "unable to unmarshal json", }) continue } // Write the event to history if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: resource.WriteEvent{ Key: req.Key, Type: resourcepb.WatchEvent_Type(req.Action), Value: req.Value, PreviousRV: -1, // Used for WATCH, but we want to skip watch events }, Folder: req.Folder, GUID: uuid.New().String(), ResourceVersion: rv.next(obj), }); err != nil { return rollbackWithError(fmt.Errorf("insert into resource history: %w", err)) } } // Now update the resource table from history for _, key := range setting.Collection { k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource) summary := summaries[k] if summary == nil { return rollbackWithError(fmt.Errorf("missing summary key for: %s", k)) } err := bulk.syncCollection(key, summary) if err != nil { return err } // Make sure the collection RV is above our last written event _, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) { return "", nil }) if err != nil { b.log.Warn("error increasing RV", "error", err) } } return nil }) if err != nil { rsp.Error = resource.AsErrorResult(err) } return rsp } type bulkWroker struct { ctx context.Context tx db.ContextExecer dialect sqltemplate.Dialect logger logging.Logger } // This will remove everything from the `resource` and `resource_history` table for a given namespace/group/resource func (w *bulkWroker) deleteCollection(key *resourcepb.ResourceKey) (*resourcepb.BulkResponse_Summary, error) { summary := &resourcepb.BulkResponse_Summary{ Namespace: key.Namespace, Group: key.Group, Resource: key.Resource, } // First delete history res, err := dbutil.Exec(w.ctx, w.tx, sqlResourceHistoryDelete, &sqlResourceHistoryDeleteRequest{ SQLTemplate: sqltemplate.New(w.dialect), Namespace: key.Namespace, Group: key.Group, Resource: key.Resource, }) if err != nil { return nil, err } summary.PreviousHistory, err = res.RowsAffected() if err != nil { return nil, err } // Next delete the active resource table res, err = dbutil.Exec(w.ctx, w.tx, sqlResourceDelete, &sqlResourceRequest{ SQLTemplate: sqltemplate.New(w.dialect), WriteEvent: resource.WriteEvent{ Key: key, }, }) if err != nil { return nil, err } summary.PreviousCount, err = res.RowsAffected() return summary, err } // Copy the latest value from history into the active resource table func (w *bulkWroker) syncCollection(key *resourcepb.ResourceKey, summary *resourcepb.BulkResponse_Summary) error { w.logger.Info("synchronize collection", "key", resource.NSGR(key)) _, err := dbutil.Exec(w.ctx, w.tx, sqlResourceInsertFromHistory, &sqlResourceInsertFromHistoryRequest{ SQLTemplate: sqltemplate.New(w.dialect), Key: key, }) if err != nil { return err } w.logger.Info("get stats (still in transaction)", "key", resource.NSGR(key)) rows, err := dbutil.QueryRows(w.ctx, w.tx, sqlResourceStats, &sqlStatsRequest{ SQLTemplate: sqltemplate.New(w.dialect), Namespace: key.Namespace, Group: key.Group, Resource: key.Resource, }) if err != nil { return err } if rows != nil { defer func() { _ = rows.Close() }() } if rows.Next() { row := resource.ResourceStats{} return rows.Scan(&row.Namespace, &row.Group, &row.Resource, &summary.Count, &summary.ResourceVersion) } return err }