mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 17:32:47 +08:00
115 lines
4.0 KiB
Go
115 lines
4.0 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/db"
|
|
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
|
"github.com/grafana/grafana/pkg/util/xorm"
|
|
)
|
|
|
|
// contextSessionTxKey is the key used to store the transaction in the context.
|
|
type contextSessionTxKey struct{}
|
|
|
|
// Implements contracts.Database
|
|
type Database struct {
|
|
dbType string
|
|
sqlx *sqlx.DB
|
|
tracer trace.Tracer
|
|
|
|
// Keep the xorm.Engine instance and its references alive until the apiserver is shut down.
|
|
// This is only needed because the xorm.Engine calls a runtime.SetFinalizer, in a RAII-like pattern to close the DB,
|
|
// when the engine is garbage collected. Normally, this will only ever happen when the server shuts down.
|
|
// Ref: pkg/util/xorm/xorm.go:118 (it seems to be a relic from the xorm codebase that was copied over).
|
|
// In single tenant Grafana, there are many other services and references to the xorm.Engine, so it never gets GC'd.
|
|
// At some point in the future if we migrate everything away from it, we need to revisit how we set up the DB opening.
|
|
// However, with the multi-tenant apiserver, we are no longer using the xorm.Engine directly for our DB queries.
|
|
// We only use it to bootstrap the database and run migrations.
|
|
// Instead, we use a pointer to *sql.DB directly, and that is created from *xorm.Engine -> *core.DB (also xorm) -> *sql.DB.
|
|
// The GC notices that the xorm.Engine is no longer referenced, and calls the finalizer to close the DB, because we
|
|
// only reference the pointer to *sql.DB. Here we tie the lifetime of the xorm.Engine to the Database we use for queries.
|
|
engine *xorm.Engine
|
|
}
|
|
|
|
func ProvideDatabase(db db.DB, tracer trace.Tracer) *Database {
|
|
engine := db.GetEngine()
|
|
|
|
return &Database{
|
|
dbType: string(db.GetDBType()),
|
|
sqlx: sqlx.NewDb(engine.DB().DB, db.GetDialect().DriverName()),
|
|
engine: engine,
|
|
tracer: tracer,
|
|
}
|
|
}
|
|
|
|
func (db *Database) DriverName() string {
|
|
return db.dbType
|
|
}
|
|
|
|
func (db *Database) Transaction(ctx context.Context, callback func(context.Context) error) (err error) {
|
|
// If another transaction is already open, we just use that one instead of nesting.
|
|
sqlxTx, ok := ctx.Value(contextSessionTxKey{}).(*sqlx.Tx)
|
|
if sqlxTx != nil && ok {
|
|
// We are already in a transaction, so we don't commit or rollback, let the outermost transaction do it.
|
|
return callback(ctx)
|
|
}
|
|
|
|
spanCtx, span := db.tracer.Start(ctx, "Database.Transaction")
|
|
defer span.End()
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
span.SetStatus(codes.Error, "Transaction failed")
|
|
span.RecordError(err)
|
|
}
|
|
}()
|
|
|
|
sqlxTx, err = db.sqlx.BeginTxx(spanCtx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Save it in the context so the transaction can be reused in case it is nested.
|
|
txCtx := context.WithValue(spanCtx, contextSessionTxKey{}, sqlxTx)
|
|
|
|
if err := callback(txCtx); err != nil {
|
|
if rbErr := sqlxTx.Rollback(); rbErr != nil {
|
|
return errors.Join(err, rbErr)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return sqlxTx.Commit()
|
|
}
|
|
|
|
func (db *Database) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
|
|
spanCtx, span := db.tracer.Start(ctx, "Database.ExecContext")
|
|
defer span.End()
|
|
|
|
// If another transaction is already open, we just use that one instead of nesting.
|
|
if tx, ok := ctx.Value(contextSessionTxKey{}).(*sqlx.Tx); tx != nil && ok {
|
|
return tx.ExecContext(spanCtx, db.sqlx.Rebind(query), args...)
|
|
}
|
|
|
|
return db.sqlx.ExecContext(spanCtx, db.sqlx.Rebind(query), args...)
|
|
}
|
|
|
|
func (db *Database) QueryContext(ctx context.Context, query string, args ...any) (contracts.Rows, error) {
|
|
spanCtx, span := db.tracer.Start(ctx, "Database.QueryContext")
|
|
defer span.End()
|
|
|
|
// If another transaction is already open, we just use that one instead of nesting.
|
|
if tx, ok := ctx.Value(contextSessionTxKey{}).(*sqlx.Tx); tx != nil && ok {
|
|
return tx.QueryContext(spanCtx, db.sqlx.Rebind(query), args...)
|
|
}
|
|
|
|
return db.sqlx.QueryContext(spanCtx, db.sqlx.Rebind(query), args...)
|
|
}
|