package sql import ( "context" "database/sql" "errors" "fmt" "math" "sync" "time" "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v5/pgconn" "github.com/lib/pq" "github.com/mattn/go-sqlite3" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" apierrors "k8s.io/apimachinery/pkg/api/errors" "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/util/debouncer" ) const tracePrefix = "sql.resource." const defaultPollingInterval = 100 * time.Millisecond const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream const defaultPrunerHistoryLimit = 20 type Backend interface { resource.StorageBackend resourcepb.DiagnosticsServer resource.LifecycleHooks } type BackendOptions struct { DBProvider db.DBProvider Tracer trace.Tracer Reg prometheus.Registerer PollingInterval time.Duration WatchBufferSize int IsHA bool storageMetrics *resource.StorageMetrics // If true, the backend will prune history on write events. // Will be removed once fully rolled out. withPruner bool // testing SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount } func NewBackend(opts BackendOptions) (Backend, error) { if opts.DBProvider == nil { return nil, errors.New("no db provider") } if opts.Tracer == nil { opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend") } ctx, cancel := context.WithCancel(context.Background()) if opts.PollingInterval == 0 { opts.PollingInterval = defaultPollingInterval } if opts.WatchBufferSize == 0 { opts.WatchBufferSize = defaultWatchBufferSize } return &backend{ isHA: opts.IsHA, done: ctx.Done(), cancel: cancel, log: logging.DefaultLogger.With("logger", "sql-resource-server"), tracer: opts.Tracer, reg: opts.Reg, dbProvider: opts.DBProvider, pollingInterval: opts.PollingInterval, watchBufferSize: opts.WatchBufferSize, storageMetrics: opts.storageMetrics, bulkLock: &bulkLock{running: make(map[string]bool)}, simulatedNetworkLatency: opts.SimulatedNetworkLatency, withPruner: opts.withPruner, }, nil } // pruningKey is a comparable key for pruning history. type pruningKey struct { namespace string group string resource string name string } // Small abstraction to allow for different pruner implementations. // This can be removed once the debouncer is deployed. type pruner interface { Add(key pruningKey) error Start(ctx context.Context) } type noopPruner struct{} func (p *noopPruner) Add(key pruningKey) error { return nil } func (p *noopPruner) Start(ctx context.Context) {} type backend struct { //general isHA bool // server lifecycle done <-chan struct{} cancel context.CancelFunc initOnce sync.Once initErr error // o11y log logging.Logger tracer trace.Tracer reg prometheus.Registerer storageMetrics *resource.StorageMetrics // database dbProvider db.DBProvider db db.DB dialect sqltemplate.Dialect bulkLock *bulkLock // watch streaming //stream chan *resource.WatchEvent pollingInterval time.Duration watchBufferSize int notifier eventNotifier // resource version manager rvManager *resourceVersionManager // testing simulatedNetworkLatency time.Duration historyPruner pruner withPruner bool } func (b *backend) Init(ctx context.Context) error { b.initOnce.Do(func() { b.initErr = b.initLocked(ctx) }) return b.initErr } func (b *backend) initLocked(ctx context.Context) error { dbConn, err := b.dbProvider.Init(ctx) if err != nil { return fmt.Errorf("initialize resource DB: %w", err) } if err := dbConn.PingContext(ctx); err != nil { return fmt.Errorf("ping resource DB: %w", err) } b.db = dbConn driverName := dbConn.DriverName() b.dialect = sqltemplate.DialectForDriver(driverName) if b.dialect == nil { return fmt.Errorf("no dialect for driver %q", driverName) } // Initialize ResourceVersionManager rvManager, err := NewResourceVersionManager(ResourceManagerOptions{ Dialect: b.dialect, DB: b.db, Tracer: b.tracer, }) if err != nil { return fmt.Errorf("failed to create resource version manager: %w", err) } b.rvManager = rvManager // Initialize notifier after dialect is set up notifier, err := newNotifier(b) if err != nil { return fmt.Errorf("failed to create notifier: %w", err) } b.notifier = notifier if err := b.initPruner(ctx); err != nil { return fmt.Errorf("failed to create pruner: %w", err) } return nil } func (b *backend) initPruner(ctx context.Context) error { if !b.withPruner { b.log.Debug("using noop history pruner") b.historyPruner = &noopPruner{} return nil } b.log.Debug("using debounced history pruner") // Initialize history pruner. pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{ Name: "history_pruner", BufferSize: 1000, MinWait: time.Second * 30, MaxWait: time.Minute * 5, ProcessHandler: func(ctx context.Context, key pruningKey) error { return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{ SQLTemplate: sqltemplate.New(b.dialect), HistoryLimit: defaultPrunerHistoryLimit, Key: &resourcepb.ResourceKey{ Namespace: key.namespace, Group: key.group, Resource: key.resource, Name: key.name, }, }) if err != nil { return fmt.Errorf("failed to prune history: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("failed to get rows affected: %w", err) } b.log.Debug("pruned history successfully", "namespace", key.namespace, "group", key.group, "resource", key.resource, "name", key.name, "rows", rows) return nil }) }, ErrorHandler: func(key pruningKey, err error) { b.log.Error("failed to prune history", "namespace", key.namespace, "group", key.group, "resource", key.resource, "name", key.name, "error", err) }, Reg: b.reg, }) if err != nil { return err } b.historyPruner = pruner b.historyPruner.Start(ctx) return nil } func (b *backend) IsHealthy(ctx context.Context, _ *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) { // ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"})) if err := b.db.PingContext(ctx); err != nil { return nil, err } return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_SERVING}, nil } func (b *backend) Stop(_ context.Context) error { b.cancel() return nil } // GetResourceStats implements Backend. func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"GetResourceStats") defer span.End() req := &sqlStatsRequest{ SQLTemplate: sqltemplate.New(b.dialect), Namespace: namespace, MinCount: minCount, // not used in query... yet? } res := make([]resource.ResourceStats, 0, 100) err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req) if err != nil { return err } for rows.Next() { row := resource.ResourceStats{} err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion) if err != nil { return err } if row.Count > int64(minCount) { res = append(res, row) } else { b.log.Debug("skipping stats for resource with count less than min count", "namespace", row.Namespace, "group", row.Group, "resource", row.Resource, "count", row.Count, "minCount", minCount) } } return err }) return res, err } func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) { _, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent") defer span.End() // TODO: validate key ? switch event.Type { case resourcepb.WatchEvent_ADDED: return b.create(ctx, event) case resourcepb.WatchEvent_MODIFIED: return b.update(ctx, event) case resourcepb.WatchEvent_DELETED: return b.delete(ctx, event) default: return 0, fmt.Errorf("unsupported event type") } } func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"Create") defer span.End() folder := "" if event.Object != nil { folder = event.Object.GetFolder() } rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) { // 1. Insert into resource if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, Folder: folder, GUID: event.GUID, }); err != nil { if IsRowAlreadyExistsError(err) { return event.GUID, resource.ErrResourceAlreadyExists } return event.GUID, fmt.Errorf("insert into resource: %w", err) } // 2. Insert into resource history if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, Folder: folder, Generation: event.Object.GetGeneration(), GUID: event.GUID, }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } _ = b.historyPruner.Add(pruningKey{ namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource, name: event.Key.Name, }) if b.simulatedNetworkLatency > 0 { time.Sleep(b.simulatedNetworkLatency) } return event.GUID, nil }) if err != nil { return 0, err } b.notifier.send(ctx, &resource.WrittenEvent{ Type: event.Type, Key: event.Key, PreviousRV: event.PreviousRV, Value: event.Value, ResourceVersion: rv, Folder: folder, }) return rv, nil } // IsRowAlreadyExistsError checks if the error is the result of the row inserted already existing. func IsRowAlreadyExistsError(err error) bool { var sqlite sqlite3.Error if errors.As(err, &sqlite) { return sqlite.ExtendedCode == sqlite3.ErrConstraintUnique } var pg *pgconn.PgError if errors.As(err, &pg) { // https://www.postgresql.org/docs/current/errcodes-appendix.html return pg.Code == "23505" // unique_violation } var pqerr *pq.Error if errors.As(err, &pqerr) { // https://www.postgresql.org/docs/current/errcodes-appendix.html return pqerr.Code == "23505" // unique_violation } var mysqlerr *mysql.MySQLError if errors.As(err, &mysqlerr) { // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html return mysqlerr.Number == 1062 // ER_DUP_ENTRY } return false } func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"Update") defer span.End() folder := "" if event.Object != nil { folder = event.Object.GetFolder() } // Use rvManager.ExecWithRV instead of direct transaction rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) { // 1. Update resource _, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, Folder: folder, GUID: event.GUID, }) if err != nil { return event.GUID, fmt.Errorf("resource update: %w", err) } // 2. Insert into resource history if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, Folder: folder, GUID: event.GUID, Generation: event.Object.GetGeneration(), }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } _ = b.historyPruner.Add(pruningKey{ namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource, name: event.Key.Name, }) return event.GUID, nil }) if err != nil { return 0, err } b.notifier.send(ctx, &resource.WrittenEvent{ Type: event.Type, Key: event.Key, PreviousRV: event.PreviousRV, Value: event.Value, ResourceVersion: rv, Folder: folder, }) return rv, nil } func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"Delete") defer span.End() folder := "" if event.Object != nil { folder = event.Object.GetFolder() } rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) { // 1. delete from resource _, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, GUID: event.GUID, }) if err != nil { return event.GUID, fmt.Errorf("delete resource: %w", err) } // 2. Add event to resource history if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), WriteEvent: event, Folder: folder, GUID: event.GUID, Generation: 0, // object does not exist }); err != nil { return event.GUID, fmt.Errorf("insert into resource history: %w", err) } _ = b.historyPruner.Add(pruningKey{ namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource, name: event.Key.Name, }) return event.GUID, nil }) if err != nil { return 0, err } b.notifier.send(ctx, &resource.WrittenEvent{ Type: event.Type, Key: event.Key, PreviousRV: event.PreviousRV, Value: event.Value, ResourceVersion: rv, Folder: folder, }) return rv, nil } func (b *backend) ReadResource(ctx context.Context, req *resourcepb.ReadRequest) *resource.BackendReadResponse { _, span := b.tracer.Start(ctx, tracePrefix+".Read") defer span.End() // TODO: validate key ? if req.ResourceVersion > 0 { return b.readHistory(ctx, req.Key, req.ResourceVersion) } readReq := &sqlResourceReadRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: req, Response: NewReadResponse(), } var res *resource.BackendReadResponse err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error res, err = dbutil.QueryRow(ctx, tx, sqlResourceRead, readReq) return err }) if errors.Is(err, sql.ErrNoRows) { return &resource.BackendReadResponse{ Error: resource.NewNotFoundError(req.Key), } } else if err != nil { return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)} } return res } func (b *backend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"List") defer span.End() if err := resource.MigrateListRequestVersionMatch(req, b.log); err != nil { return 0, err } if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" { return 0, fmt.Errorf("missing group or resource") } // TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only). // TODO: add support for RemainingItemCount if req.ResourceVersion > 0 || req.NextPageToken != "" { return b.listAtRevision(ctx, req, cb) } return b.listLatest(ctx, req, cb) } func (b *backend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"ListHistory") defer span.End() return b.getHistory(ctx, req, cb) } // listLatest fetches the resources from the resource table. func (b *backend) listLatest(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatest") defer span.End() if req.NextPageToken != "" { return 0, fmt.Errorf("only works for the first page") } if req.ResourceVersion > 0 { return 0, fmt.Errorf("only works for the 'latest' resource version") } iter := &listIter{} err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) if err != nil { return err } listReq := sqlResourceListRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: new(resourcepb.ListRequest), } listReq.Request = proto.Clone(req).(*resourcepb.ListRequest) rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq) if rows != nil { defer func() { if err := rows.Close(); err != nil { b.log.Warn("listLatest error closing rows", "error", err) } }() } if err != nil { return err } iter.rows = rows return cb(iter) }) return iter.listRV, err } // listAtRevision fetches the resources from the resource_history table at a specific revision. func (b *backend) listAtRevision(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"listAtRevision") defer span.End() // Get the RV iter := &listIter{listRV: req.ResourceVersion} if req.NextPageToken != "" { continueToken, err := resource.GetContinueToken(req.NextPageToken) if err != nil { return 0, fmt.Errorf("get continue token (%q): %w", req.NextPageToken, err) } iter.listRV = continueToken.ResourceVersion iter.offset = continueToken.StartOffset if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV { return 0, apierrors.NewBadRequest("request resource version does not math token") } } if iter.listRV < 1 { return 0, apierrors.NewBadRequest("expecting an explicit resource version query") } // The query below has the potential to be EXTREMELY slow if the resource_history table is big. May be helpful to know // which stack is calling this. b.log.Debug("listAtRevision", "ns", req.Options.Key.Namespace, "group", req.Options.Key.Group, "resource", req.Options.Key.Resource, "rv", iter.listRV) err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { limit := int64(0) // ignore limit if iter.offset > 0 { limit = math.MaxInt64 // a limit is required for offset } listReq := sqlResourceHistoryListRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: &historyListRequest{ ResourceVersion: iter.listRV, Limit: limit, Offset: iter.offset, Options: req.Options, }, } rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq) if rows != nil { defer func() { if err := rows.Close(); err != nil { b.log.Warn("listAtRevision error closing rows", "error", err) } }() } if err != nil { return err } iter.rows = rows return cb(iter) }) return iter.listRV, err } // readHistory fetches the resource history from the resource_history table. func (b *backend) readHistory(ctx context.Context, key *resourcepb.ResourceKey, rv int64) *resource.BackendReadResponse { _, span := b.tracer.Start(ctx, tracePrefix+".ReadHistory") defer span.End() readReq := &sqlResourceHistoryReadRequest{ SQLTemplate: sqltemplate.New(b.dialect), Request: &historyReadRequest{ Key: key, ResourceVersion: rv, }, Response: NewReadResponse(), } var res *resource.BackendReadResponse err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error res, err = dbutil.QueryRow(ctx, tx, sqlResourceHistoryRead, readReq) return err }) if errors.Is(err, sql.ErrNoRows) { return &resource.BackendReadResponse{Error: resource.NewNotFoundError(key)} } if err != nil { return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)} } return res } // getHistory fetches the resource history from the resource_history table. func (b *backend) getHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"getHistory") defer span.End() listReq := sqlGetHistoryRequest{ SQLTemplate: sqltemplate.New(b.dialect), Key: req.Options.Key, Trash: req.Source == resourcepb.ListRequest_TRASH, } // We are assuming that users want history in ascending order // when they are using NotOlderThan matching, and descending order // for Unset (default) and Exact matching. listReq.SortAscending = req.GetVersionMatchV2() == resourcepb.ResourceVersionMatchV2_NotOlderThan iter := &listIter{ useCurrentRV: true, // use the current RV for the continue token instead of the listRV } if req.NextPageToken != "" { continueToken, err := resource.GetContinueToken(req.NextPageToken) if err != nil { return 0, fmt.Errorf("get continue token (%q): %w", req.NextPageToken, err) } listReq.StartRV = continueToken.ResourceVersion listReq.SortAscending = continueToken.SortAscending } iter.sortAsc = listReq.SortAscending // Set ExactRV when using Exact matching if req.VersionMatchV2 == resourcepb.ResourceVersionMatchV2_Exact { if req.ResourceVersion <= 0 { return 0, fmt.Errorf("expecting an explicit resource version query when using Exact matching") } listReq.ExactRV = req.ResourceVersion } // Set MinRV when using NotOlderThan matching to filter at the database level if req.ResourceVersion > 0 && req.VersionMatchV2 == resourcepb.ResourceVersionMatchV2_NotOlderThan { listReq.MinRV = req.ResourceVersion } // Ignore last deleted history record when listing the trash, using exact matching or not older than matching with a specific RV useLatestDeletionAsMinRV := listReq.MinRV == 0 && !listReq.Trash && req.VersionMatchV2 != resourcepb.ResourceVersionMatchV2_Exact err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) if err != nil { return err } if useLatestDeletionAsMinRV { latestDeletedRV, err := b.fetchLatestHistoryRV(ctx, tx, b.dialect, req.Options.Key, resourcepb.WatchEvent_DELETED) if err != nil { return err } listReq.MinRV = latestDeletedRV + 1 } var rows db.Rows if listReq.Trash { // unlike history, trash will not return an object if an object of the same name is live // (i.e. in the resource table) rows, err = dbutil.QueryRows(ctx, tx, sqlResourceTrash, listReq) } else { rows, err = dbutil.QueryRows(ctx, tx, sqlResourceHistoryGet, listReq) } if rows != nil { defer func() { if err := rows.Close(); err != nil { b.log.Warn("listLatest error closing rows", "error", err) } }() } if err != nil { return err } iter.rows = rows return cb(iter) }) return iter.listRV, err } func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { return b.notifier.notify(ctx) } // listLatestRVs returns the latest resource version for each (Group, Resource) pair. func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatestRVs") defer span.End() var grvs []*groupResourceVersion err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { var err error grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{ SQLTemplate: sqltemplate.New(b.dialect), groupResourceVersion: new(groupResourceVersion), }) return err }) if err != nil { return nil, err } since := groupResourceRV{} for _, grv := range grvs { if since[grv.Group] == nil { since[grv.Group] = map[string]int64{} } since[grv.Group][grv.Resource] = grv.ResourceVersion } return since, nil } // fetchLatestRV returns the current maximum RV in the resource table func (b *backend) fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"fetchLatestRV") defer span.End() res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ SQLTemplate: sqltemplate.New(d), Group: group, Resource: resource, ReadOnly: true, Response: new(resourceVersionResponse), }) if errors.Is(err, sql.ErrNoRows) { return 1, nil } else if err != nil { return 0, fmt.Errorf("get resource version: %w", err) } return res.ResourceVersion, nil } // fetchLatestHistoryRV returns the current maximum RV in the resource_history table func (b *backend) fetchLatestHistoryRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resourcepb.ResourceKey, eventType resourcepb.WatchEvent_Type) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"fetchLatestHistoryRV") defer span.End() res, err := dbutil.QueryRow(ctx, x, sqlResourceHistoryReadLatestRV, sqlResourceHistoryReadLatestRVRequest{ SQLTemplate: sqltemplate.New(d), Request: &historyReadLatestRVRequest{ Key: key, EventType: eventType, }, Response: new(resourceHistoryReadLatestRVResponse), }) if errors.Is(err, sql.ErrNoRows) { return 0, nil } else if err != nil { return 0, fmt.Errorf("get resource version: %w", err) } return res.ResourceVersion, nil }