Files
lotus/lib/sqlite/sqlite.go

170 lines
5.8 KiB
Go

package sqlite
import (
"context"
"database/sql"
"errors"
"io/fs"
"os"
"path/filepath"
"strconv"
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)
var log = logging.Logger("sqlite")
type MigrationFunc func(ctx context.Context, tx *sql.Tx) error
var pragmas = []string{
"PRAGMA synchronous = normal",
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA wal_autocheckpoint = 256", // checkpoint @ 256 pages
"PRAGMA journal_size_limit = 0", // always reset journal and wal files
}
const metaTableDdl = `CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`
// metaDdl returns the DDL statements required to create the _meta table and add the required
// up to the given version.
func metaDdl(version uint64) []string {
var ddls []string
for i := 1; i <= int(version); i++ {
ddls = append(ddls, `INSERT OR IGNORE INTO _meta (version) VALUES (`+strconv.Itoa(i)+`)`)
}
return append([]string{metaTableDdl}, ddls...)
}
// Open opens a database at the given path. If the database does not exist, it will be created.
func Open(path string) (*sql.DB, bool, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, false, xerrors.Errorf("error creating database base directory [@ %s]: %w", path, err)
}
_, err := os.Stat(path)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, false, xerrors.Errorf("error checking file status for database [@ %s]: %w", path, err)
}
exists := err == nil
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, false, xerrors.Errorf("error opening database [@ %s]: %w", path, err)
}
for _, pragma := range pragmas {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, false, xerrors.Errorf("error setting database pragma %q: %w", pragma, err)
}
}
return db, exists, nil
}
// InitDb initializes the database by checking whether it needs to be created or upgraded.
// The ddls are the DDL statements to create the tables in the database and their initial required
// content. The schemaVersion will be set inside the databse if it is newly created. Otherwise, the
// version is read from the databse and returned. This value should be checked against the expected
// version to determine if the database needs to be upgraded.
// It is up to the caller to close the database if an error is returned by this function.
func InitDb(
ctx context.Context,
name string,
db *sql.DB,
ddls []string,
versionMigrations []MigrationFunc,
) error {
schemaVersion := len(versionMigrations) + 1
q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if q != nil {
defer func() { _ = q.Close() }()
}
if errors.Is(err, sql.ErrNoRows) || !q.Next() {
// empty database, create the schema including the _meta table and its versions
ddls := append(metaDdl(uint64(schemaVersion)), ddls...)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
return xerrors.Errorf("failed to %s database execute ddl %q: %w", name, ddl, err)
}
}
return nil
}
if err != nil {
return xerrors.Errorf("error looking for %s database _meta table: %w", name, err)
}
if err := q.Close(); err != nil {
return xerrors.Errorf("error closing %s database _meta table query: %w", name, err)
}
// check the schema version to see if we need to upgrade the database schema
var foundVersion int
if err = db.QueryRow("SELECT max(version) FROM _meta").Scan(&foundVersion); err != nil {
return xerrors.Errorf("invalid %s database version: no version found", name)
}
if foundVersion > schemaVersion {
return xerrors.Errorf("invalid %s database version: version %d is greater than the number of migrations %d", name, foundVersion, len(versionMigrations))
}
runVacuum := foundVersion != schemaVersion
// run a migration for each version that we have not yet applied, where foundVersion is what is
// currently in the database and schemaVersion is the target version. If they are the same,
// nothing is run.
for i := foundVersion + 1; i <= schemaVersion; i++ {
now := time.Now()
log.Infof("Migrating %s database to version %d...", name, i)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("failed to start %s database transaction: %w", name, err)
}
defer func() { _ = tx.Rollback() }()
// versions start at 1, but the migrations are 0-indexed where the first migration would take us to version 2
if err := versionMigrations[i-2](ctx, tx); err != nil {
return xerrors.Errorf("failed to migrate %s database to version %d: %w", name, i, err)
}
if _, err := tx.ExecContext(ctx, `INSERT OR IGNORE INTO _meta (version) VALUES (?)`, i); err != nil {
return xerrors.Errorf("failed to update %s database _meta table: %w", name, err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("failed to commit %s database v%d migration transaction: %w", name, i, err)
}
log.Infof("Successfully migrated %s database from version %d to %d in %s", name, i-1, i, time.Since(now))
}
if runVacuum {
// During the large migrations, we have likely increased the WAL size a lot, so lets do some
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
// as this would be a good time to do it when no other writes are happening.
log.Infof("Performing %s database vacuum and wal checkpointing to free up space after the migration", name)
_, err := db.ExecContext(ctx, "VACUUM")
if err != nil {
log.Warnf("error vacuuming %s database: %s", name, err)
}
_, err = db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
if err != nil {
log.Warnf("error checkpointing %s database wal: %s", name, err)
}
}
return nil
}