Files
lotus/lib/sqlite/sqlite.go
Aarsh Shah dcc903c65d feat: a new ChainIndexer to index tipsets, messages and events (#12421)
* chain index complete for msgs and txns

* dont need observer changes for now

* changes

* fix tests

* fix tests

* use th right context

* index empty tipsets correctly

* implement automated backfilling

* add event indexing and remove all old indices

* fix test

* revert deployment test changes

* revert test changes and better error handling for eth tx index lookups

* fix sql statments naming convention

* address review for Index GC

* more changes as per review

* changes as per review

* fix config

* mark events as reverted during reconciliation

* better reconciliation; pens down and code complete; also reconcile events

* fix tests

* improve config and docs

* improve docs and error handling

* improve read logic

* improve docs

* better logging and handle ennable event storage

* improve logs and index init proc

* better logging

* fix bugs based on calibnet testing

* create sqliite Indices

* gc should be based on epochs

* fix event query

* foreign keys should be enabled on the DB

* reverted tipsets should be removed as part of GC

* release read lock

* make it easy to backfill an empty index using reconciliation

* better docs for reconciliation

* fix conflicts with master

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix go mod

* fix formatting

* revert config changes

* address changes in observer

* remove top level chainindex package

* changes as per review

* changes as per review

* changes as per review

* handle index with reverted tipsets during reconciliation

* changes as per review

* fix type of max reconcile epoch

* changes to reconciliation as per review

* log ipld error

* better logging of progress

* disable chain indexer hydrate from snapshot based on config

* always populate index

* make config easy to reason about

* fix config

* fix messaging

* revert config changes

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes as per review

* make error messages homogenous

* fix indentation

* changes as per review

* feat: recompute tipset to generate missing events if event indexing is enabled (#12463)

* auto repair events

* make jen

* fix leaky abstraction

* better docs for gc retention epoch

* imrpove DB handling (#12485)

* fix conflict

* fix lite node config for indexer

* exclude reverted events from eth get logs if client queries by epoch

* Simply addressing for event lookups in the index.

simply addressing for event lookups

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix tests

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* feat: migration("re-indexing"), backfilling and diasgnostics tooling for the `ChainIndexer` (#12450)

* fix conflicts with chain indexer

* feat: chain indexer todos [skip changelog] (#12462)

* feat: finish todos of validation api

* feat: add indexed data verification with chain store

* feat: address comments and finish TODO

* fix: build issue

* address comments

* fix: ci issue

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes to Index Validation API based on Rodds first review

* build chain indexer API

* improve error handling

* feat: lotus-shed tooling for chain indexer (#12474)

* feat: add lotus-shed command for backfilling chain indexer

* feat: add lotus-shed command for inspecting the chain indexer

* feat: use single lotus-shed command to inspect and backfill

* fix: remove the unused queries

* small changes

* add change log

* backfilling improvements and fixes

* finish chain index validation and backfill tooling

* user documentation for the

* validate from epoch

* Apply suggestions from code review

Suggestions from Steve's read of the user doc.

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes to user doc as per review

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes to user doc as per review

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes as per review

* feat: add event entries count in validation API (#12506)

* feat: add event entry count in validation API

* address comments

* use sqllite defaults (#12504)

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* write chain index to a different dir

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* fix conflicts

* UX improvements to backfilling

* feat: tests for the chain indexer (#12521)

* ddl tests

* tests for the chain indexer

* finish unit tests for chain indexer

* fix formatting

* cleanup reverted tipsets to avoid db bloat

* fix logging

* test for filter by address

* test gc cascade delete

* fix db locked error during backfilling

* fix var name

* increase db locked timeout

* fix db locked issue

* reduce db lock timeout

* no lock in gc

* reconcile does not need lock

* improved error handling

* Update chain-indexing-overview-for-rpc-providers.md

Doc updates based on @jennijuju feedack.

* Update chain-indexing-overview-for-rpc-providers.MD

Fixes after reviewing 33c1ca1831

* better metrics for backfilling

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* tests for changes to event addressing

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes as per review -> round 1

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* log tipset key cid

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix docs

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix tests

* fix tests

* make jen

* fix conflicts

---------

Co-authored-by: Aryan Tikarya <aryan.tikarya@dojima.network>
Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Steve Loeppky <biglep@filoz.org>

* fix lint

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* remove reverted flag from RPC

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix testing of events and dummy chain store

* remove lotus shed commands for old Indices

* change type of event counts to uint64

* only recompute events if theyre not found

* short-circuit empty events path for older tipsets

* chain indexer must be enabled if ETH RPC is enabled

* change name of message_id column to id in tipset_message table

* only expose SetRecomputeTipSetStateFunc

* dont block on head indexing for reading messages

* document why we're only checking for missing events for a single tipset

* document when we query for reverted events

* simplify event collection

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix test

* change event_id to id in the event table

* change head indexed timeout

* remove deprecated config options

* fail ETH RPC calls if ChainIndexer is disabled

* fix docs

* remove the tipset key cid func from lotus shed

* address review comments

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* chore(events): remove unnecessary DisableRealTimeFilterAPI (#12610)

* feat(cli): add --quiet to chainindex validate-backfill + cleanups (#12611)

* fix tests

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* error type for disabled chainindexer

* fix(chainindex): recompute tipset when we find no receipts

* fix(chainindexer): backfilling should halt when chain state data is missing and not backfill parents (#12619)

* fix backfilling UX

* Update chain/index/api.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* address review

---------

Co-authored-by: Rod Vagg <rod@vagg.org>

* reduce log noise

* make jen

* make jen

* docs: finishing chain-indexer-overview-for-operators.md (#12600)

* Followup to PR #12450 for doc updates

This is being used to resolve the unresolved items in https://github.com/filecoin-project/lotus/pull/12450 since that PR is unwieldly at this point.

* Incorporated some items and added TODOs based on unresolved items from https://github.com/filecoin-project/lotus/pull/12450

* Incorporating more feedback

* Pointing to issue to learn about benefits

* Formatting fixes

* Apply most of the suggestions from @rvagg code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Incorporating feedback from https://github.com/filecoin-project/lotus/pull/12600#discussion_r1802519453

* Addressing https://github.com/filecoin-project/lotus/pull/12600#discussion_r1802540042 and more

* Moved chain-indexer docs to documentation
Renamed
Added ToC

We can move to lotus-docs later

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* Added upgrade path when importing chain state from a snapshot.

* Typo fixes

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* chore(doc): "regular checks" section for chainindexer docs (#12612)

* Apply suggestions from @rvagg code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Incorporating @aarshkshah1992 feedback

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>

* remove go mod replace

* remove unnecessary changes from CHANGELOG

* fix test

* compare events AMT root (#12632)

* fix(chainindex): retry transaction if database connection is lost (#12657)

* retry database lost connection

* log context cancellation

* address review

* fix gateway itest: no chainindexer for lite nodes

* fix changelog

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Aryan Tikarya <aryan.tikarya@dojima.network>
Co-authored-by: Steve Loeppky <biglep@filoz.org>
2024-10-31 09:58:19 +00:00

177 lines
6.0 KiB
Go

package sqlite
import (
"context"
"database/sql"
"errors"
"io/fs"
"os"
"path/filepath"
"strconv"
"time"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
)
var log = logging.Logger("sqlite")
type MigrationFunc func(ctx context.Context, tx *sql.Tx) error
var pragmas = []string{
"PRAGMA synchronous = normal",
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA journal_size_limit = 0", // always reset journal and wal files
"PRAGMA foreign_keys = ON",
}
const metaTableDdl = `CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`
// metaDdl returns the DDL statements required to create the _meta table and add the required
// up to the given version.
func metaDdl(version uint64) []string {
var ddls []string
for i := 1; i <= int(version); i++ {
ddls = append(ddls, `INSERT OR IGNORE INTO _meta (version) VALUES (`+strconv.Itoa(i)+`)`)
}
return append([]string{metaTableDdl}, ddls...)
}
// Open opens a database at the given path. If the database does not exist, it will be created.
func Open(path string) (*sql.DB, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, xerrors.Errorf("error creating database base directory [@ %s]: %w", path, err)
}
_, err := os.Stat(path)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, xerrors.Errorf("error checking file status for database [@ %s]: %w", path, err)
}
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, xerrors.Errorf("error opening database [@ %s]: %w", path, err)
}
for _, pragma := range pragmas {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("error setting database pragma %q: %w", pragma, err)
}
}
var foreignKeysEnabled int
if err := db.QueryRow("PRAGMA foreign_keys;").Scan(&foreignKeysEnabled); err != nil {
return nil, xerrors.Errorf("failed to check foreign keys setting: %w", err)
}
if foreignKeysEnabled == 0 {
return nil, xerrors.Errorf("foreign keys are not enabled for database [@ %s]", path)
}
return db, nil
}
// InitDb initializes the database by checking whether it needs to be created or upgraded.
// The ddls are the DDL statements to create the tables in the database and their initial required
// content. The schemaVersion will be set inside the databse if it is newly created. Otherwise, the
// version is read from the databse and returned. This value should be checked against the expected
// version to determine if the database needs to be upgraded.
// It is up to the caller to close the database if an error is returned by this function.
func InitDb(
ctx context.Context,
name string,
db *sql.DB,
ddls []string,
versionMigrations []MigrationFunc,
) error {
schemaVersion := len(versionMigrations) + 1
q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if q != nil {
defer func() { _ = q.Close() }()
}
if errors.Is(err, sql.ErrNoRows) || !q.Next() {
// empty database, create the schema including the _meta table and its versions
ddls := append(metaDdl(uint64(schemaVersion)), ddls...)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
return xerrors.Errorf("failed to %s database execute ddl %q: %w", name, ddl, err)
}
}
return nil
}
if err != nil {
return xerrors.Errorf("error looking for %s database _meta table: %w", name, err)
}
if err := q.Close(); err != nil {
return xerrors.Errorf("error closing %s database _meta table query: %w", name, err)
}
// check the schema version to see if we need to upgrade the database schema
var foundVersion int
if err = db.QueryRow("SELECT max(version) FROM _meta").Scan(&foundVersion); err != nil {
return xerrors.Errorf("invalid %s database version: no version found", name)
}
if foundVersion > schemaVersion {
return xerrors.Errorf("invalid %s database version: version %d is greater than the number of migrations %d", name, foundVersion, len(versionMigrations))
}
runVacuum := foundVersion != schemaVersion
// run a migration for each version that we have not yet applied, where foundVersion is what is
// currently in the database and schemaVersion is the target version. If they are the same,
// nothing is run.
for i := foundVersion + 1; i <= schemaVersion; i++ {
now := time.Now()
log.Infof("Migrating %s database to version %d...", name, i)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("failed to start %s database transaction: %w", name, err)
}
defer func() { _ = tx.Rollback() }()
// versions start at 1, but the migrations are 0-indexed where the first migration would take us to version 2
if err := versionMigrations[i-2](ctx, tx); err != nil {
return xerrors.Errorf("failed to migrate %s database to version %d: %w", name, i, err)
}
if _, err := tx.ExecContext(ctx, `INSERT OR IGNORE INTO _meta (version) VALUES (?)`, i); err != nil {
return xerrors.Errorf("failed to update %s database _meta table: %w", name, err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("failed to commit %s database v%d migration transaction: %w", name, i, err)
}
log.Infof("Successfully migrated %s database from version %d to %d in %s", name, i-1, i, time.Since(now))
}
if runVacuum {
// During the large migrations, we have likely increased the WAL size a lot, so lets do some
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
// as this would be a good time to do it when no other writes are happening.
log.Infof("Performing %s database vacuum and wal checkpointing to free up space after the migration", name)
_, err := db.ExecContext(ctx, "VACUUM")
if err != nil {
log.Warnf("error vacuuming %s database: %s", name, err)
}
_, err = db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
if err != nil {
log.Warnf("error checkpointing %s database wal: %s", name, err)
}
}
return nil
}