Files
lotus/chain/index/ddls_test.go
Aarsh Shah dcc903c65d feat: a new ChainIndexer to index tipsets, messages and events (#12421)
* chain index complete for msgs and txns

* dont need observer changes for now

* changes

* fix tests

* fix tests

* use th right context

* index empty tipsets correctly

* implement automated backfilling

* add event indexing and remove all old indices

* fix test

* revert deployment test changes

* revert test changes and better error handling for eth tx index lookups

* fix sql statments naming convention

* address review for Index GC

* more changes as per review

* changes as per review

* fix config

* mark events as reverted during reconciliation

* better reconciliation; pens down and code complete; also reconcile events

* fix tests

* improve config and docs

* improve docs and error handling

* improve read logic

* improve docs

* better logging and handle ennable event storage

* improve logs and index init proc

* better logging

* fix bugs based on calibnet testing

* create sqliite Indices

* gc should be based on epochs

* fix event query

* foreign keys should be enabled on the DB

* reverted tipsets should be removed as part of GC

* release read lock

* make it easy to backfill an empty index using reconciliation

* better docs for reconciliation

* fix conflicts with master

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix go mod

* fix formatting

* revert config changes

* address changes in observer

* remove top level chainindex package

* changes as per review

* changes as per review

* changes as per review

* handle index with reverted tipsets during reconciliation

* changes as per review

* fix type of max reconcile epoch

* changes to reconciliation as per review

* log ipld error

* better logging of progress

* disable chain indexer hydrate from snapshot based on config

* always populate index

* make config easy to reason about

* fix config

* fix messaging

* revert config changes

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes as per review

* make error messages homogenous

* fix indentation

* changes as per review

* feat: recompute tipset to generate missing events if event indexing is enabled (#12463)

* auto repair events

* make jen

* fix leaky abstraction

* better docs for gc retention epoch

* imrpove DB handling (#12485)

* fix conflict

* fix lite node config for indexer

* exclude reverted events from eth get logs if client queries by epoch

* Simply addressing for event lookups in the index.

simply addressing for event lookups

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix tests

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* feat: migration("re-indexing"), backfilling and diasgnostics tooling for the `ChainIndexer` (#12450)

* fix conflicts with chain indexer

* feat: chain indexer todos [skip changelog] (#12462)

* feat: finish todos of validation api

* feat: add indexed data verification with chain store

* feat: address comments and finish TODO

* fix: build issue

* address comments

* fix: ci issue

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes to Index Validation API based on Rodds first review

* build chain indexer API

* improve error handling

* feat: lotus-shed tooling for chain indexer (#12474)

* feat: add lotus-shed command for backfilling chain indexer

* feat: add lotus-shed command for inspecting the chain indexer

* feat: use single lotus-shed command to inspect and backfill

* fix: remove the unused queries

* small changes

* add change log

* backfilling improvements and fixes

* finish chain index validation and backfill tooling

* user documentation for the

* validate from epoch

* Apply suggestions from code review

Suggestions from Steve's read of the user doc.

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes to user doc as per review

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes to user doc as per review

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* changes as per review

* feat: add event entries count in validation API (#12506)

* feat: add event entry count in validation API

* address comments

* use sqllite defaults (#12504)

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* write chain index to a different dir

* Apply suggestions from code review

Co-authored-by: Steve Loeppky <biglep@filoz.org>

* fix conflicts

* UX improvements to backfilling

* feat: tests for the chain indexer (#12521)

* ddl tests

* tests for the chain indexer

* finish unit tests for chain indexer

* fix formatting

* cleanup reverted tipsets to avoid db bloat

* fix logging

* test for filter by address

* test gc cascade delete

* fix db locked error during backfilling

* fix var name

* increase db locked timeout

* fix db locked issue

* reduce db lock timeout

* no lock in gc

* reconcile does not need lock

* improved error handling

* Update chain-indexing-overview-for-rpc-providers.md

Doc updates based on @jennijuju feedack.

* Update chain-indexing-overview-for-rpc-providers.MD

Fixes after reviewing 33c1ca1831

* better metrics for backfilling

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update chain/index/chain-indexing-overview-for-rpc-providers.MD

Co-authored-by: Rod Vagg <rod@vagg.org>

* tests for changes to event addressing

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* changes as per review -> round 1

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* log tipset key cid

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix docs

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix tests

* fix tests

* make jen

* fix conflicts

---------

Co-authored-by: Aryan Tikarya <aryan.tikarya@dojima.network>
Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Steve Loeppky <biglep@filoz.org>

* fix lint

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* remove reverted flag from RPC

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix testing of events and dummy chain store

* remove lotus shed commands for old Indices

* change type of event counts to uint64

* only recompute events if theyre not found

* short-circuit empty events path for older tipsets

* chain indexer must be enabled if ETH RPC is enabled

* change name of message_id column to id in tipset_message table

* only expose SetRecomputeTipSetStateFunc

* dont block on head indexing for reading messages

* document why we're only checking for missing events for a single tipset

* document when we query for reverted events

* simplify event collection

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix test

* change event_id to id in the event table

* change head indexed timeout

* remove deprecated config options

* fail ETH RPC calls if ChainIndexer is disabled

* fix docs

* remove the tipset key cid func from lotus shed

* address review comments

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* chore(events): remove unnecessary DisableRealTimeFilterAPI (#12610)

* feat(cli): add --quiet to chainindex validate-backfill + cleanups (#12611)

* fix tests

* Apply suggestions from code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* error type for disabled chainindexer

* fix(chainindex): recompute tipset when we find no receipts

* fix(chainindexer): backfilling should halt when chain state data is missing and not backfill parents (#12619)

* fix backfilling UX

* Update chain/index/api.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* address review

---------

Co-authored-by: Rod Vagg <rod@vagg.org>

* reduce log noise

* make jen

* make jen

* docs: finishing chain-indexer-overview-for-operators.md (#12600)

* Followup to PR #12450 for doc updates

This is being used to resolve the unresolved items in https://github.com/filecoin-project/lotus/pull/12450 since that PR is unwieldly at this point.

* Incorporated some items and added TODOs based on unresolved items from https://github.com/filecoin-project/lotus/pull/12450

* Incorporating more feedback

* Pointing to issue to learn about benefits

* Formatting fixes

* Apply most of the suggestions from @rvagg code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Incorporating feedback from https://github.com/filecoin-project/lotus/pull/12600#discussion_r1802519453

* Addressing https://github.com/filecoin-project/lotus/pull/12600#discussion_r1802540042 and more

* Moved chain-indexer docs to documentation
Renamed
Added ToC

We can move to lotus-docs later

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* Added upgrade path when importing chain state from a snapshot.

* Typo fixes

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

* chore(doc): "regular checks" section for chainindexer docs (#12612)

* Apply suggestions from @rvagg code review

Co-authored-by: Rod Vagg <rod@vagg.org>

* Incorporating @aarshkshah1992 feedback

* Update documentation/en/chain-indexer-overview-for-operators.md

Co-authored-by: Rod Vagg <rod@vagg.org>

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>

* remove go mod replace

* remove unnecessary changes from CHANGELOG

* fix test

* compare events AMT root (#12632)

* fix(chainindex): retry transaction if database connection is lost (#12657)

* retry database lost connection

* log context cancellation

* address review

* fix gateway itest: no chainindexer for lite nodes

* fix changelog

---------

Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Aryan Tikarya <aryan.tikarya@dojima.network>
Co-authored-by: Steve Loeppky <biglep@filoz.org>
2024-10-31 09:58:19 +00:00

867 lines
26 KiB
Go

package index
import (
"database/sql"
"testing"
"github.com/stretchr/testify/require"
)
const (
tipsetKeyCid1 = "test_tipset_key"
tipsetKeyCid2 = "test_tipset_key_2"
messageCid1 = "test_message_cid"
messageCid2 = "test_message_cid_2"
emitterAddr1 = "test_emitter_addr"
)
func TestHasRevertedEventsInTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// running on empty DB should return false
verifyHasRevertedEventsInTipsetStmt(t, s, []byte(tipsetKeyCid1), false)
// Insert tipset with a reverted event
ts := tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
}
messageID := insertTipsetMessage(t, s, ts)
// this event will be un-reverted later
insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte(emitterAddr1),
reverted: true,
})
// this event should not be un-reverted
ts = tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid2),
height: 1,
reverted: false,
messageCid: []byte(messageCid2),
messageIndex: 0,
}
messageID2 := insertTipsetMessage(t, s, ts)
insertEvent(t, s, event{
messageID: messageID2,
eventIndex: 0,
emitterId: 2,
emitterAddr: []byte(emitterAddr1),
reverted: true,
})
// Verify `hasRevertedEventsInTipset` returns true
verifyHasRevertedEventsInTipsetStmt(t, s, []byte(tipsetKeyCid1), true)
verifyHasRevertedEventsInTipsetStmt(t, s, []byte(tipsetKeyCid2), true)
// change event to non-reverted
updateEventsToNonReverted(t, s, []byte(tipsetKeyCid1))
// Verify `hasRevertedEventsInTipset` returns false
verifyHasRevertedEventsInTipsetStmt(t, s, []byte(tipsetKeyCid1), false)
verifyHasRevertedEventsInTipsetStmt(t, s, []byte(tipsetKeyCid2), true)
}
func TestGetNonRevertedTipsetCountStmts(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// running on empty DB should return 0
verifyNonRevertedEventEntriesCount(t, s, []byte(tipsetKeyCid1), 0)
verifyNonRevertedEventCount(t, s, []byte(tipsetKeyCid1), 0)
verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 0)
// Insert non-reverted tipset
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
})
// Insert event
eventID1 := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte(emitterAddr1),
reverted: false,
})
eventID2 := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 1,
emitterId: 2,
emitterAddr: []byte(emitterAddr1),
reverted: false,
})
// Insert event entry
insertEventEntry(t, s, eventEntry{
eventID: eventID1,
indexed: true,
flags: []byte("test_flags"),
key: "test_key",
codec: 1,
value: []byte("test_value"),
})
insertEventEntry(t, s, eventEntry{
eventID: eventID2,
indexed: true,
flags: []byte("test_flags2"),
key: "test_key2",
codec: 2,
value: []byte("test_value2"),
})
// verify 2 event entries
verifyNonRevertedEventEntriesCount(t, s, []byte(tipsetKeyCid1), 2)
// Verify event count
verifyNonRevertedEventCount(t, s, []byte(tipsetKeyCid1), 2)
// verify message count is 1
verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 1)
// mark tipset as reverted
revertTipset(t, s, []byte(tipsetKeyCid1))
// Verify `getNonRevertedTipsetEventEntriesCountStmt` returns 0
verifyNonRevertedEventEntriesCount(t, s, []byte(tipsetKeyCid1), 0)
// verify event count is 0
verifyNonRevertedEventCount(t, s, []byte(tipsetKeyCid1), 0)
// verify message count is 0
verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 0)
}
func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Insert a tipset message
tsKeyCid := []byte("test_tipset_key")
msgCid := []byte("test_message_cid")
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: tsKeyCid,
height: 1,
reverted: false,
messageCid: msgCid,
messageIndex: 0,
})
// Insert events
event1ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte("emitter_addr_1"),
reverted: false,
})
event2ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 1,
emitterId: 2,
emitterAddr: []byte("emitter_addr_2"),
reverted: false,
})
// Insert event entries
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: true,
flags: []byte{0x01},
key: "key1",
codec: 1,
value: []byte("value1"),
})
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: false,
flags: []byte{0x00},
key: "key2",
codec: 2,
value: []byte("value2"),
})
insertEventEntry(t, s, eventEntry{
eventID: event2ID,
indexed: true,
flags: []byte{0x01},
key: "key3",
codec: 3,
value: []byte("value3"),
})
// Test getEventIdAndEmitterIdStmt
rows, err := s.stmts.getEventIdAndEmitterIdStmt.Query(tsKeyCid, msgCid)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()
var eventIDs []int64
var emitterIDs []uint64
for rows.Next() {
var eventID int64
var emitterID uint64
err := rows.Scan(&eventID, &emitterID)
require.NoError(t, err)
eventIDs = append(eventIDs, eventID)
emitterIDs = append(emitterIDs, emitterID)
}
require.NoError(t, rows.Err())
require.Equal(t, []int64{event1ID, event2ID}, eventIDs)
require.Equal(t, []uint64{1, 2}, emitterIDs)
// Test getEventEntriesStmt for event1
rows, err = s.stmts.getEventEntriesStmt.Query(event1ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()
var entries []eventEntry
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 2)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key1", entries[0].key)
require.Equal(t, 1, entries[0].codec)
require.Equal(t, []byte("value1"), entries[0].value)
require.Equal(t, []byte{0x00}, entries[1].flags)
require.Equal(t, "key2", entries[1].key)
require.Equal(t, 2, entries[1].codec)
require.Equal(t, []byte("value2"), entries[1].value)
// Test getEventEntriesStmt for event2
rows, err = s.stmts.getEventEntriesStmt.Query(event2ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()
entries = nil
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 1)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key3", entries[0].key)
require.Equal(t, 3, entries[0].codec)
require.Equal(t, []byte("value3"), entries[0].value)
}
func TestUpdateTipsetToNonRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// insert a reverted tipset
ts := tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: true,
messageCid: []byte(messageCid1),
messageIndex: 0,
}
// Insert tipset
messageId := insertTipsetMessage(t, s, ts)
res, err := s.stmts.updateTipsetToNonRevertedStmt.Exec([]byte(tipsetKeyCid1))
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
// verify the tipset is not reverted
ts.reverted = false
verifyTipsetMessage(t, s, messageId, ts)
}
func TestHasNullRoundAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// running on empty DB should return true
verifyHasNullRoundAtHeightStmt(t, s, 1, true)
verifyHasNullRoundAtHeightStmt(t, s, 0, true)
// insert tipset
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
})
// verify not a null round
verifyHasNullRoundAtHeightStmt(t, s, 1, false)
}
func TestHasTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// running on empty DB should return false
verifyHasTipsetStmt(t, s, []byte(tipsetKeyCid1), false)
// insert tipset
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
})
// verify tipset exists
verifyHasTipsetStmt(t, s, []byte(tipsetKeyCid1), true)
// verify non-existent tipset
verifyHasTipsetStmt(t, s, []byte("non_existent_tipset_key"), false)
}
func TestUpdateEventsToRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Insert a non-reverted tipset
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte(tipsetKeyCid1),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
})
// Insert non-reverted events
insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte(emitterAddr1),
reverted: false,
})
insertEvent(t, s, event{
messageID: messageID,
eventIndex: 1,
emitterId: 2,
emitterAddr: []byte(emitterAddr1),
reverted: false,
})
// Verify events are not reverted
var count int
err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id = ?", messageID).Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
// Execute updateEventsToRevertedStmt
_, err = s.stmts.updateEventsToRevertedStmt.Exec([]byte(tipsetKeyCid1))
require.NoError(t, err)
// Verify events are now reverted
err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 1 AND message_id = ?", messageID).Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
// Verify no non-reverted events remain
err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id = ?", messageID).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)
}
func TestCountTipsetsAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Test empty DB
verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 0)
// Test 0,1 case
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_1"),
height: 1,
reverted: false,
messageCid: []byte("test_message_cid_1"),
messageIndex: 0,
})
verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 1)
// Test 0,2 case
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_2"),
height: 1,
reverted: false,
messageCid: []byte("test_message_cid_2"),
messageIndex: 0,
})
verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 2)
// Test 1,2 case
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_3"),
height: 1,
reverted: true,
messageCid: []byte("test_message_cid_3"),
messageIndex: 0,
})
verifyCountTipsetsAtHeightStmt(t, s, 1, 1, 2)
// Test 2,2 case
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_4"),
height: 1,
reverted: true,
messageCid: []byte("test_message_cid_4"),
messageIndex: 0,
})
verifyCountTipsetsAtHeightStmt(t, s, 1, 2, 2)
}
func TestNonRevertedTipsetAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Test empty DB
var et []byte
err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&et)
require.Equal(t, sql.ErrNoRows, err)
// Insert non-reverted tipset
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_1"),
height: 10,
reverted: false,
messageCid: []byte("test_message_cid_1"),
messageIndex: 0,
})
// Insert reverted tipset at same height
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_2"),
height: 10,
reverted: true,
messageCid: []byte("test_message_cid_2"),
messageIndex: 0,
})
// Verify getNonRevertedTipsetAtHeightStmt returns the non-reverted tipset
var tipsetKeyCid []byte
err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&tipsetKeyCid)
require.NoError(t, err)
require.Equal(t, []byte("test_tipset_key_1"), tipsetKeyCid)
// Insert another non-reverted tipset at a different height
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_3"),
height: 20,
reverted: false,
messageCid: []byte("test_message_cid_3"),
messageIndex: 0,
})
// Verify getNonRevertedTipsetAtHeightStmt returns the correct tipset for the new height
err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(20).Scan(&tipsetKeyCid)
require.NoError(t, err)
require.Equal(t, []byte("test_tipset_key_3"), tipsetKeyCid)
// Test with a height that has no tipset
err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(30).Scan(&tipsetKeyCid)
require.Equal(t, sql.ErrNoRows, err)
// Revert all tipsets at height 10
_, err = s.db.Exec("UPDATE tipset_message SET reverted = 1 WHERE height = 10")
require.NoError(t, err)
// Verify getNonRevertedTipsetAtHeightStmt returns no rows for the reverted height
err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&tipsetKeyCid)
require.Equal(t, sql.ErrNoRows, err)
}
func TestMinNonRevertedHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Test empty DB
var minHeight sql.NullInt64
err = s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight)
require.NoError(t, err)
require.False(t, minHeight.Valid)
// Insert non-reverted tipsets
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_1"),
height: 10,
reverted: false,
messageCid: []byte("test_message_cid_1"),
messageIndex: 0,
})
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_2"),
height: 20,
reverted: false,
messageCid: []byte("test_message_cid_2"),
messageIndex: 0,
})
// Verify minimum non-reverted height
verifyMinNonRevertedHeightStmt(t, s, 10)
// Insert reverted tipset with lower height
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key_4"),
height: 5,
reverted: true,
messageCid: []byte("test_message_cid_4"),
messageIndex: 0,
})
// Verify minimum non-reverted height hasn't changed
verifyMinNonRevertedHeightStmt(t, s, 10)
// Revert all tipsets
_, err = s.db.Exec("UPDATE tipset_message SET reverted = 1")
require.NoError(t, err)
// Verify no minimum non-reverted height
err = s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight)
require.NoError(t, err)
require.False(t, minHeight.Valid)
}
func verifyMinNonRevertedHeightStmt(t *testing.T, s *SqliteIndexer, expectedMinHeight int64) {
var minHeight sql.NullInt64
err := s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight)
require.NoError(t, err)
require.True(t, minHeight.Valid)
require.Equal(t, expectedMinHeight, minHeight.Int64)
}
func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Insert a non-reverted tipset
tipsetKeyCid := []byte(tipsetKeyCid1)
messageCid := []byte(messageCid1)
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: tipsetKeyCid,
height: 1,
reverted: false,
messageCid: messageCid,
messageIndex: 0,
})
// Verify getMsgIdForMsgCidAndTipset returns the correct message ID
var messageID int64
err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(tipsetKeyCid, messageCid).Scan(&messageID)
require.NoError(t, err)
require.Equal(t, int64(1), messageID)
// Test with non-existent message CID
nonExistentMessageCid := []byte("non_existent_message_cid")
err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(tipsetKeyCid, nonExistentMessageCid).Scan(&messageID)
require.Equal(t, sql.ErrNoRows, err)
// Test with non-existent tipset key
nonExistentTipsetKeyCid := []byte("non_existent_tipset_key")
err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(nonExistentTipsetKeyCid, messageCid).Scan(&messageID)
require.Equal(t, sql.ErrNoRows, err)
// Insert a reverted tipset
revertedTipsetKeyCid := []byte("reverted_tipset_key")
insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: revertedTipsetKeyCid,
height: 2,
reverted: true,
messageCid: messageCid,
messageIndex: 0,
})
// Verify getMsgIdForMsgCidAndTipset doesn't return the message ID for a reverted tipset
err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(revertedTipsetKeyCid, messageCid).Scan(&messageID)
require.Equal(t, sql.ErrNoRows, err)
}
func TestForeignKeyCascadeDelete(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
// Insert a tipset
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key"),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
})
// Insert an event for the tipset
eventID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 2,
emitterAddr: []byte("test_emitter_addr"),
reverted: false,
})
// Insert an event entry for the event
insertEventEntry(t, s, eventEntry{
eventID: eventID,
indexed: true,
flags: []byte("test_flags"),
key: "test_key",
codec: 1,
value: []byte("test_value"),
})
// Delete the tipset
res, err := s.db.Exec("DELETE FROM tipset_message WHERE tipset_key_cid = ?", []byte("test_tipset_key"))
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
// verify event is deleted
verifyEventAbsent(t, s, eventID)
verifyEventEntryAbsent(t, s, eventID)
}
func TestInsertTipsetMessage(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
ts := tipsetMessage{
tipsetKeyCid: []byte("test_tipset_key"),
height: 1,
reverted: false,
messageCid: []byte(messageCid1),
messageIndex: 0,
}
// Insert a tipset
messageID := insertTipsetMessage(t, s, ts)
// revert the tipset
revertTipset(t, s, []byte("test_tipset_key"))
ts.reverted = true
verifyTipsetMessage(t, s, messageID, ts)
// inserting with the same (tipset, message) should overwrite the reverted flag
res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, true, ts.messageCid, ts.messageIndex)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
ts.reverted = false
verifyTipsetMessage(t, s, messageID, ts)
}
type tipsetMessage struct {
tipsetKeyCid []byte
height uint64
reverted bool
messageCid []byte
messageIndex int64
}
type event struct {
eventIndex uint64
emitterId uint64
emitterAddr []byte
reverted bool
messageID int64
}
type eventEntry struct {
eventID int64
indexed bool
flags []byte
key string
codec int
value []byte
}
func updateEventsToNonReverted(t *testing.T, s *SqliteIndexer, tsKeyCid []byte) {
res, err := s.stmts.updateEventsToNonRevertedStmt.Exec(tsKeyCid)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
// read all events for this tipset and verify they are not reverted using a COUNT query
var count int
err = s.db.QueryRow("SELECT COUNT(*) FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND e.reverted = 1", tsKeyCid).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count, "Expected no reverted events for this tipset")
}
func revertTipset(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte) {
res, err := s.stmts.updateTipsetToRevertedStmt.Exec(tipsetKeyCid)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
var reverted bool
err = s.db.QueryRow("SELECT reverted FROM tipset_message WHERE tipset_key_cid = ?", tipsetKeyCid).Scan(&reverted)
require.NoError(t, err)
require.True(t, reverted)
}
func verifyTipsetMessage(t *testing.T, s *SqliteIndexer, messageID int64, expectedTipsetMessage tipsetMessage) {
var tipsetKeyCid []byte
var height uint64
var reverted bool
var messageCid []byte
var messageIndex int64
err := s.db.QueryRow("SELECT tipset_key_cid, height, reverted, message_cid, message_index FROM tipset_message WHERE id = ?", messageID).Scan(&tipsetKeyCid, &height, &reverted, &messageCid, &messageIndex)
require.NoError(t, err)
require.Equal(t, expectedTipsetMessage.tipsetKeyCid, tipsetKeyCid)
require.Equal(t, expectedTipsetMessage.height, height)
require.Equal(t, expectedTipsetMessage.reverted, reverted)
require.Equal(t, expectedTipsetMessage.messageCid, messageCid)
require.Equal(t, expectedTipsetMessage.messageIndex, messageIndex)
}
func verifyEventEntryAbsent(t *testing.T, s *SqliteIndexer, eventID int64) {
err := s.db.QueryRow("SELECT event_id FROM event_entry WHERE event_id = ?", eventID).Scan(&eventID)
require.Equal(t, sql.ErrNoRows, err)
}
func verifyEventAbsent(t *testing.T, s *SqliteIndexer, eventID int64) {
var eventIndex uint64
err := s.db.QueryRow("SELECT event_index FROM event WHERE id = ?", eventID).Scan(&eventIndex)
require.Equal(t, sql.ErrNoRows, err)
}
func verifyEvent(t *testing.T, s *SqliteIndexer, eventID int64, expectedEvent event) {
var eventIndex uint64
var emitterAddr []byte
var reverted bool
var messageID int64
err := s.db.QueryRow("SELECT event_index, emitter_addr, reverted, message_id FROM event WHERE id = ?", eventID).Scan(&eventIndex, &emitterAddr, &reverted, &messageID)
require.NoError(t, err)
require.Equal(t, expectedEvent.eventIndex, eventIndex)
require.Equal(t, expectedEvent.emitterAddr, emitterAddr)
require.Equal(t, expectedEvent.reverted, reverted)
require.Equal(t, expectedEvent.messageID, messageID)
}
func verifyCountTipsetsAtHeightStmt(t *testing.T, s *SqliteIndexer, height uint64, expectedRevertedCount, expectedNonRevertedCount int) {
var revertedCount, nonRevertedCount int
err := s.stmts.countTipsetsAtHeightStmt.QueryRow(height).Scan(&revertedCount, &nonRevertedCount)
require.NoError(t, err)
require.Equal(t, expectedRevertedCount, revertedCount)
require.Equal(t, expectedNonRevertedCount, nonRevertedCount)
}
func verifyHasTipsetStmt(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedHas bool) {
var has bool
err := s.stmts.hasTipsetStmt.QueryRow(tipsetKeyCid).Scan(&has)
require.NoError(t, err)
require.Equal(t, expectedHas, has)
}
func verifyHasRevertedEventsInTipsetStmt(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedHas bool) {
var hasRevertedEventsInTipset bool
err := s.stmts.hasRevertedEventsInTipsetStmt.QueryRow(tipsetKeyCid).Scan(&hasRevertedEventsInTipset)
require.NoError(t, err)
require.Equal(t, expectedHas, hasRevertedEventsInTipset)
}
func verifyHasNullRoundAtHeightStmt(t *testing.T, s *SqliteIndexer, height uint64, expectedHasNullRound bool) {
var hasNullRound bool
err := s.stmts.hasNullRoundAtHeightStmt.QueryRow(height).Scan(&hasNullRound)
require.NoError(t, err)
require.Equal(t, expectedHasNullRound, hasNullRound)
}
func verifyNonRevertedMessageCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) {
var count int
err := s.stmts.getNonRevertedTipsetMessageCountStmt.QueryRow(tipsetKeyCid).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}
func verifyNonRevertedEventCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) {
var count int
err := s.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tipsetKeyCid).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}
func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) {
var count int
err := s.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tipsetKeyCid).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}
func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 {
res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
messageID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), messageID)
// read back the message to verify it was inserted correctly
verifyTipsetMessage(t, s, messageID, ts)
return messageID
}
func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 {
res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterId, e.emitterAddr, e.reverted)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
eventID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), eventID)
verifyEvent(t, s, eventID, e)
return eventID
}
func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) {
res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value)
require.NoError(t, err)
rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
}