mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 07:42:31 +08:00

* SQLStore: Fix Postgres dialect treating "false" migrator default as true Previously, when creating a migration you could choose a default value for a new boolean column that looked correct but would be interpreted incorrectly by the Postgres dialect. For example, values such as "false" or "FALSE" would be treated as true by the Postgres dialect. This refactors how migration dialects determine the Default column value for boolean type columns. Each dialect now uses the same base code to parse the Default literal and panics if an unknown value is encountered. So, now AddColumnMigration and AddTableMigration will ensure that across dialects: - The exact same Default literals will be allowed. - The literals are converted to equivalent defaults in their DDL. - An error will be thrown if an invalid literal is provided.
350 lines
9.1 KiB
Go
350 lines
9.1 KiB
Go
package migrator
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/lib/pq"
|
|
"xorm.io/xorm"
|
|
)
|
|
|
|
type PostgresDialect struct {
|
|
BaseDialect
|
|
}
|
|
|
|
func NewPostgresDialect() Dialect {
|
|
d := PostgresDialect{}
|
|
d.BaseDialect.dialect = &d
|
|
d.BaseDialect.driverName = Postgres
|
|
return &d
|
|
}
|
|
|
|
func (db *PostgresDialect) SupportEngine() bool {
|
|
return false
|
|
}
|
|
|
|
func (db *PostgresDialect) Quote(name string) string {
|
|
return "\"" + name + "\""
|
|
}
|
|
|
|
func (db *PostgresDialect) LikeStr() string {
|
|
return "ILIKE"
|
|
}
|
|
|
|
func (db *PostgresDialect) AutoIncrStr() string {
|
|
return ""
|
|
}
|
|
|
|
func (db *PostgresDialect) BooleanStr(value bool) string {
|
|
return strconv.FormatBool(value)
|
|
}
|
|
|
|
func (db *PostgresDialect) BatchSize() int {
|
|
return 1000
|
|
}
|
|
|
|
func (db *PostgresDialect) SQLType(c *Column) string {
|
|
var res string
|
|
switch t := c.Type; t {
|
|
case DB_TinyInt:
|
|
res = DB_SmallInt
|
|
return res
|
|
case DB_MediumInt, DB_Int, DB_Integer:
|
|
if c.IsAutoIncrement {
|
|
return DB_Serial
|
|
}
|
|
return DB_Integer
|
|
case DB_Serial, DB_BigSerial:
|
|
c.IsAutoIncrement = true
|
|
c.Nullable = false
|
|
res = t
|
|
case DB_Binary, DB_VarBinary:
|
|
return DB_Bytea
|
|
case DB_DateTime:
|
|
res = DB_TimeStamp
|
|
case DB_TimeStampz:
|
|
return "timestamp with time zone"
|
|
case DB_Float:
|
|
res = DB_Real
|
|
case DB_TinyText, DB_MediumText, DB_LongText:
|
|
res = DB_Text
|
|
case DB_NVarchar:
|
|
res = DB_Varchar
|
|
case DB_Uuid:
|
|
res = DB_Uuid
|
|
case DB_Blob, DB_TinyBlob, DB_MediumBlob, DB_LongBlob:
|
|
return DB_Bytea
|
|
case DB_Double:
|
|
return "DOUBLE PRECISION"
|
|
default:
|
|
if c.IsAutoIncrement {
|
|
return DB_Serial
|
|
}
|
|
res = t
|
|
}
|
|
|
|
var hasLen1 = (c.Length > 0)
|
|
var hasLen2 = (c.Length2 > 0)
|
|
if hasLen2 {
|
|
res += "(" + strconv.Itoa(c.Length) + "," + strconv.Itoa(c.Length2) + ")"
|
|
} else if hasLen1 {
|
|
res += "(" + strconv.Itoa(c.Length) + ")"
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (db *PostgresDialect) IndexCheckSQL(tableName, indexName string) (string, []interface{}) {
|
|
args := []interface{}{tableName, indexName}
|
|
sql := "SELECT 1 FROM " + db.Quote("pg_indexes") + " WHERE" + db.Quote("tablename") + "=? AND " + db.Quote("indexname") + "=?"
|
|
return sql, args
|
|
}
|
|
|
|
func (db *PostgresDialect) DropIndexSQL(tableName string, index *Index) string {
|
|
quote := db.Quote
|
|
idxName := index.XName(tableName)
|
|
return fmt.Sprintf("DROP INDEX %v CASCADE", quote(idxName))
|
|
}
|
|
|
|
func (db *PostgresDialect) UpdateTableSQL(tableName string, columns []*Column) string {
|
|
var statements = []string{}
|
|
|
|
for _, col := range columns {
|
|
statements = append(statements, "ALTER "+db.Quote(col.Name)+" TYPE "+db.SQLType(col))
|
|
}
|
|
|
|
return "ALTER TABLE " + db.Quote(tableName) + " " + strings.Join(statements, ", ") + ";"
|
|
}
|
|
|
|
func (db *PostgresDialect) CleanDB(engine *xorm.Engine) error {
|
|
sess := engine.NewSession()
|
|
defer sess.Close()
|
|
|
|
if _, err := sess.Exec("DROP SCHEMA public CASCADE;"); err != nil {
|
|
return fmt.Errorf("%v: %w", "failed to drop schema public", err)
|
|
}
|
|
|
|
if _, err := sess.Exec("CREATE SCHEMA public;"); err != nil {
|
|
return fmt.Errorf("%v: %w", "failed to create schema public", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TruncateDBTables truncates all the tables.
|
|
// A special case is the dashboard_acl table where we keep the default permissions.
|
|
func (db *PostgresDialect) TruncateDBTables(engine *xorm.Engine) error {
|
|
tables, err := engine.DBMetas()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sess := engine.NewSession()
|
|
defer sess.Close()
|
|
|
|
for _, table := range tables {
|
|
switch table.Name {
|
|
case "":
|
|
continue
|
|
case "migration_log":
|
|
continue
|
|
case "dashboard_acl":
|
|
// keep default dashboard permissions
|
|
if _, err := sess.Exec(fmt.Sprintf("DELETE FROM %v WHERE dashboard_id != -1 AND org_id != -1;", db.Quote(table.Name))); err != nil {
|
|
return fmt.Errorf("failed to truncate table %q: %w", table.Name, err)
|
|
}
|
|
if _, err := sess.Exec(fmt.Sprintf("ALTER SEQUENCE %v RESTART WITH 3;", db.Quote(fmt.Sprintf("%v_id_seq", table.Name)))); err != nil {
|
|
return fmt.Errorf("failed to reset table %q: %w", table.Name, err)
|
|
}
|
|
default:
|
|
if _, err := sess.Exec(fmt.Sprintf("TRUNCATE TABLE %v RESTART IDENTITY CASCADE;", db.Quote(table.Name))); err != nil {
|
|
if db.isUndefinedTable(err) {
|
|
continue
|
|
}
|
|
return fmt.Errorf("failed to truncate table %q: %w", table.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *PostgresDialect) isThisError(err error, errcode string) bool {
|
|
var driverErr *pq.Error
|
|
if errors.As(err, &driverErr) {
|
|
if string(driverErr.Code) == errcode {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (db *PostgresDialect) ErrorMessage(err error) string {
|
|
var driverErr *pq.Error
|
|
if errors.As(err, &driverErr) {
|
|
return driverErr.Message
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (db *PostgresDialect) isUndefinedTable(err error) bool {
|
|
return db.isThisError(err, "42P01")
|
|
}
|
|
|
|
func (db *PostgresDialect) IsUniqueConstraintViolation(err error) bool {
|
|
return db.isThisError(err, "23505")
|
|
}
|
|
|
|
func (db *PostgresDialect) IsDeadlock(err error) bool {
|
|
return db.isThisError(err, "40P01")
|
|
}
|
|
|
|
func (db *PostgresDialect) PostInsertId(table string, sess *xorm.Session) error {
|
|
if table != "org" {
|
|
return nil
|
|
}
|
|
|
|
// sync primary key sequence of org table
|
|
if _, err := sess.Exec("SELECT setval('org_id_seq', (SELECT max(id) FROM org));"); err != nil {
|
|
return fmt.Errorf("failed to sync primary key for org table: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpsertSQL returns the upsert sql statement for PostgreSQL dialect
|
|
func (db *PostgresDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string {
|
|
str, _ := db.UpsertMultipleSQL(tableName, keyCols, updateCols, 1)
|
|
return str
|
|
}
|
|
|
|
// UpsertMultipleSQL returns the upsert sql statement for PostgreSQL dialect
|
|
func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCols []string, count int) (string, error) {
|
|
if count < 1 {
|
|
return "", fmt.Errorf("upsert statement must have count >= 1. Got %v", count)
|
|
}
|
|
columnsStr := strings.Builder{}
|
|
onConflictStr := strings.Builder{}
|
|
setStr := strings.Builder{}
|
|
|
|
const separator = ", "
|
|
separatorVar := separator
|
|
for i, c := range updateCols {
|
|
if i == len(updateCols)-1 {
|
|
separatorVar = ""
|
|
}
|
|
|
|
columnsStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar))
|
|
setStr.WriteString(fmt.Sprintf("%s=EXCLUDED.%s%s", db.Quote(c), db.Quote(c), separatorVar))
|
|
}
|
|
|
|
separatorVar = separator
|
|
for i, c := range keyCols {
|
|
if i == len(keyCols)-1 {
|
|
separatorVar = ""
|
|
}
|
|
onConflictStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar))
|
|
}
|
|
|
|
valuesStr := strings.Builder{}
|
|
separatorVar = separator
|
|
nextPlaceHolder := 1
|
|
|
|
for i := 0; i < count; i++ {
|
|
if i == count-1 {
|
|
separatorVar = ""
|
|
}
|
|
|
|
colPlaceHoldersStr := strings.Builder{}
|
|
placeHolderSep := separator
|
|
for j := 1; j <= len(updateCols); j++ {
|
|
if j == len(updateCols) {
|
|
placeHolderSep = ""
|
|
}
|
|
placeHolder := fmt.Sprintf("$%v%s", nextPlaceHolder, placeHolderSep)
|
|
nextPlaceHolder++
|
|
colPlaceHoldersStr.WriteString(placeHolder)
|
|
}
|
|
colPlaceHolders := colPlaceHoldersStr.String()
|
|
|
|
valuesStr.WriteString(fmt.Sprintf("(%s)%s", colPlaceHolders, separatorVar))
|
|
}
|
|
|
|
s := fmt.Sprintf(`INSERT INTO %s (%s) VALUES %s ON CONFLICT (%s) DO UPDATE SET %s;`,
|
|
tableName,
|
|
columnsStr.String(),
|
|
valuesStr.String(),
|
|
onConflictStr.String(),
|
|
setStr.String(),
|
|
)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (db *PostgresDialect) Lock(cfg LockCfg) error {
|
|
// trying to obtain the lock for a resource identified by a 64-bit or 32-bit key value
|
|
// the lock is exclusive: multiple lock requests stack, so that if the same resource is locked three times
|
|
// it must then be unlocked three times to be released for other sessions' use.
|
|
// it will either obtain the lock immediately and return true,
|
|
// or return false if the lock cannot be acquired immediately.
|
|
query := "SELECT pg_try_advisory_lock(?)"
|
|
var success bool
|
|
|
|
_, err := cfg.Session.SQL(query, cfg.Key).Get(&success)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !success {
|
|
return ErrLockDB
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// OrderBy normalizes ordering so that nulls end up last in sorting, which they do by default in both sqlite and mysql but not postgres
|
|
// order should be a string like `dashboard.id ASC`
|
|
func (db *PostgresDialect) OrderBy(order string) string {
|
|
nullSort := "FIRST"
|
|
normalizedOrder := strings.ToUpper(strings.TrimSpace(order))
|
|
|
|
if strings.HasSuffix(normalizedOrder, " DESC") {
|
|
nullSort = "LAST"
|
|
}
|
|
|
|
return fmt.Sprintf("%s NULLS %s", order, nullSort)
|
|
}
|
|
|
|
func (db *PostgresDialect) Unlock(cfg LockCfg) error {
|
|
// trying to release a previously-acquired exclusive session level advisory lock.
|
|
// it will either return true if the lock is successfully released or
|
|
// false if the lock was not held (in addition an SQL warning will be reported by the server)
|
|
query := "SELECT pg_advisory_unlock(?)"
|
|
var success bool
|
|
|
|
_, err := cfg.Session.SQL(query, cfg.Key).Get(&success)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !success {
|
|
return ErrReleaseLockDB
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *PostgresDialect) GetDBName(dsn string) (string, error) {
|
|
if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") {
|
|
parsedDSN, err := pq.ParseURL(dsn)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
dsn = parsedDSN
|
|
}
|
|
re := regexp.MustCompile(`dbname=(\w+)`)
|
|
submatch := re.FindSubmatch([]byte(dsn))
|
|
if len(submatch) < 2 {
|
|
return "", fmt.Errorf("failed to get database name")
|
|
}
|
|
return string(submatch[1]), nil
|
|
}
|