From f6401b5fa7f6b7f5b49cd5aa96ab796912089978 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 12 Oct 2023 16:45:19 +0200 Subject: [PATCH] Fixing createdAfter queries for backfill cases (#10928) * Testing created_after queries working with backfill * Fix in query layer * Migration script --- core/chains/evm/logpoller/log_poller_test.go | 60 ++++++++++++++++ core/chains/evm/logpoller/orm.go | 70 ++++++------------- core/chains/evm/logpoller/orm_test.go | 39 ++++++----- ...200_evm_logs_add_block_timestamp_index.sql | 15 ++++ 4 files changed, 117 insertions(+), 67 deletions(-) create mode 100644 core/store/migrate/migrations/0200_evm_logs_add_block_timestamp_index.sql diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index e21fc0f3838..f6be57aa6e7 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1152,3 +1152,63 @@ func TestTooManyLogResults(t *testing.T) { require.Len(t, crit, 1) assert.Contains(t, crit[0].Message, "Too many log results in a single block") } + +func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { + emittedLogs := 60 + finalityDepth := 10 + ctx := testutils.Context(t) + th := SetupTH(t, int64(finalityDepth), 3, 2) + + header, err := th.Client.HeaderByNumber(ctx, nil) + require.NoError(t, err) + + genesisBlockTime := time.UnixMilli(int64(header.Time)) + + // Emit some logs in blocks + for i := 0; i < emittedLogs; i++ { + _, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) + require.NoError(t, err) + th.Client.Commit() + } + + // First PollAndSave, no filters are registered + currentBlock := th.PollAndSaveLogs(ctx, 1) + + err = th.LogPoller.RegisterFilter(logpoller.Filter{ + Name: "Test Emitter", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // Emit blocks to cover finality depth, because backup always backfill up to the one block before last finalized + for i := 0; i < finalityDepth+1; i++ { + th.Client.Commit() + } + + // LogPoller should backfill entire history + th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + require.NoError(t, err) + + // Make sure that all logs are backfilled + logs, err := th.LogPoller.Logs( + 0, + currentBlock, + EmitterABI.Events["Log1"].ID, + th.EmitterAddress1, + pg.WithParentCtx(testutils.Context(t)), + ) + require.NoError(t, err) + require.Len(t, logs, emittedLogs) + + // We should get all the logs by the block_timestamp + logs, err = th.LogPoller.LogsCreatedAfter( + EmitterABI.Events["Log1"].ID, + th.EmitterAddress1, + genesisBlockTime, + 0, + pg.WithParentCtx(testutils.Context(t)), + ) + require.NoError(t, err) + require.Len(t, logs, emittedLogs) +} diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 95db7c24255..f8d0e618762 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -315,27 +315,25 @@ func (o *DbORM) SelectLogs(start, end int64, address common.Address, eventSig co // SelectLogsCreatedAfter finds logs created after some timestamp. func (o *DbORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { - startBlock, endBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) - if err != nil { - return nil, err - } args, err := newQueryArgsForEvent(o.chainID, address, eventSig). - withStartBlock(startBlock). - withEndBlock(endBlock). + withBlockTimestampAfter(after). + withConfs(confs). toArgs() if err != nil { return nil, err } - var logs []Log - err = o.q.WithOpts(qopts...).SelectNamed(&logs, ` + + query := fmt.Sprintf(` SELECT * FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig - AND block_number > :start_block - AND block_number <= :end_block - ORDER BY (block_number, log_index)`, args) - if err != nil { + AND block_timestamp > :block_timestamp_after + AND block_number <= %s + ORDER BY (block_number, log_index)`, nestedBlockNumberQuery()) + + var logs []Log + if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil { return nil, err } return logs, nil @@ -594,30 +592,28 @@ func (o *DbORM) SelectIndexedLogsByBlockRange(start, end int64, address common.A } func (o *DbORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { - startBlock, endBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) - if err != nil { - return nil, err - } args, err := newQueryArgsForEvent(o.chainID, address, eventSig). - withStartBlock(startBlock). - withEndBlock(endBlock). + withBlockTimestampAfter(after). + withConfs(confs). withTopicIndex(topicIndex). withTopicValues(topicValues). toArgs() if err != nil { return nil, err } - var logs []Log - err = o.q.WithOpts(qopts...).SelectNamed(&logs, ` + + query := fmt.Sprintf(` SELECT * FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig AND topics[:topic_index] = ANY(:topic_values) - AND block_number > :start_block - AND block_number <= :end_block - ORDER BY (block_number, log_index)`, args) - if err != nil { + AND block_timestamp > :block_timestamp_after + AND block_number <= %s + ORDER BY (block_number, log_index)`, nestedBlockNumberQuery()) + + var logs []Log + if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil { return nil, err } return logs, nil @@ -685,32 +681,6 @@ func (o *DbORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topic return logs, nil } -func (o *DbORM) blocksRangeAfterTimestamp(after time.Time, confs Confirmations, qopts ...pg.QOpt) (int64, int64, error) { - args, err := newQueryArgs(o.chainID). - withBlockTimestampAfter(after). - toArgs() - if err != nil { - return 0, 0, err - } - - var blocks []LogPollerBlock - err = o.q.WithOpts(qopts...).SelectNamed(&blocks, ` - SELECT * FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - AND block_number in ( - SELECT unnest(array[min(block_number), max(block_number)]) FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - AND block_timestamp > :block_timestamp_after) - order by block_number`, args) - if err != nil { - return 0, 0, err - } - if len(blocks) != 2 { - return 0, 0, nil - } - return blocks[0].BlockNumber, blocks[1].BlockNumber - int64(confs), nil -} - func (o *DbORM) SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { var logs []Log q := o.q.WithOpts(qopts...) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index e91baec60ec..1f43586548b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -27,16 +27,21 @@ type block struct { } func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address) logpoller.Log { + return GenLogWithTimestamp(chainID, logIndex, blockNum, blockHash, topic1, address, time.Now()) +} + +func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address, blockTimestamp time.Time) logpoller.Log { return logpoller.Log{ - EvmChainId: utils.NewBig(chainID), - LogIndex: logIndex, - BlockHash: common.HexToHash(blockHash), - BlockNumber: blockNum, - EventSig: common.BytesToHash(topic1), - Topics: [][]byte{topic1, topic1}, - Address: address, - TxHash: common.HexToHash("0x1234"), - Data: append([]byte("hello "), byte(blockNum)), + EvmChainId: utils.NewBig(chainID), + LogIndex: logIndex, + BlockHash: common.HexToHash(blockHash), + BlockNumber: blockNum, + EventSig: common.BytesToHash(topic1), + Topics: [][]byte{topic1, topic1}, + Address: address, + TxHash: common.HexToHash("0x1234"), + Data: append([]byte("hello "), byte(blockNum)), + BlockTimestamp: blockTimestamp, } } @@ -1182,10 +1187,10 @@ func TestSelectLogsCreatedAfter(t *testing.T) { future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC) require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{ - GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address), - GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address), - GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address), - GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address), + GenLogWithTimestamp(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address, past), + GenLogWithTimestamp(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address, now), + GenLogWithTimestamp(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address, now), + GenLogWithTimestamp(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address, future), })) require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past)) require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now)) @@ -1205,7 +1210,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { { name: "picks logs after block 1", confs: 0, - after: past.Add(-time.Hour), + after: past, expectedLogs: []expectedLog{ {block: 2, log: 1}, {block: 2, log: 2}, @@ -1215,7 +1220,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { { name: "skips blocks with not enough confirmations", confs: 1, - after: past.Add(-time.Hour), + after: past, expectedLogs: []expectedLog{ {block: 2, log: 1}, {block: 2, log: 2}, @@ -1224,7 +1229,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { { name: "limits number of blocks by block_timestamp", confs: 0, - after: now.Add(-time.Hour), + after: now, expectedLogs: []expectedLog{ {block: 3, log: 1}, }, @@ -1238,7 +1243,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { { name: "returns empty dataset when too many confirmations are required", confs: 3, - after: past.Add(-time.Hour), + after: past, expectedLogs: []expectedLog{}, }, } diff --git a/core/store/migrate/migrations/0200_evm_logs_add_block_timestamp_index.sql b/core/store/migrate/migrations/0200_evm_logs_add_block_timestamp_index.sql new file mode 100644 index 00000000000..544a81f2878 --- /dev/null +++ b/core/store/migrate/migrations/0200_evm_logs_add_block_timestamp_index.sql @@ -0,0 +1,15 @@ +-- +goose Up + +-- Start with dropping the index introduced in a previous migration - we are not going to use it +DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp; + +CREATE INDEX evm_logs_by_timestamp + ON evm.logs (evm_chain_id, address, event_sig, block_timestamp, block_number); + +-- +goose Down +create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp); + +drop index if exists evm.evm_logs_by_timestamp; + + +