Skip to content

Commit

Permalink
Fixing createdAfter queries for backfill cases (#10928)
Browse files Browse the repository at this point in the history
* Testing created_after queries working with backfill

* Fix in query layer

* Migration script
  • Loading branch information
mateusz-sekara authored Oct 12, 2023
1 parent 0c50f5d commit f6401b5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 67 deletions.
60 changes: 60 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,3 +1152,63 @@ func TestTooManyLogResults(t *testing.T) {
require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
}

func Test_CreatedAfterQueriesWithBackfill(t *testing.T) {
emittedLogs := 60
finalityDepth := 10
ctx := testutils.Context(t)
th := SetupTH(t, int64(finalityDepth), 3, 2)

header, err := th.Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)

genesisBlockTime := time.UnixMilli(int64(header.Time))

// Emit some logs in blocks
for i := 0; i < emittedLogs; i++ {
_, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
require.NoError(t, err)
th.Client.Commit()
}

// First PollAndSave, no filters are registered
currentBlock := th.PollAndSaveLogs(ctx, 1)

err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)

// Emit blocks to cover finality depth, because backup always backfill up to the one block before last finalized
for i := 0; i < finalityDepth+1; i++ {
th.Client.Commit()
}

// LogPoller should backfill entire history
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
require.NoError(t, err)

// Make sure that all logs are backfilled
logs, err := th.LogPoller.Logs(
0,
currentBlock,
EmitterABI.Events["Log1"].ID,
th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)),
)
require.NoError(t, err)
require.Len(t, logs, emittedLogs)

// We should get all the logs by the block_timestamp
logs, err = th.LogPoller.LogsCreatedAfter(
EmitterABI.Events["Log1"].ID,
th.EmitterAddress1,
genesisBlockTime,
0,
pg.WithParentCtx(testutils.Context(t)),
)
require.NoError(t, err)
require.Len(t, logs, emittedLogs)
}
70 changes: 20 additions & 50 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,27 +315,25 @@ func (o *DbORM) SelectLogs(start, end int64, address common.Address, eventSig co

// SelectLogsCreatedAfter finds logs created after some timestamp.
func (o *DbORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
startBlock, endBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withStartBlock(startBlock).
withEndBlock(endBlock).
withBlockTimestampAfter(after).
withConfs(confs).
toArgs()
if err != nil {
return nil, err
}
var logs []Log
err = o.q.WithOpts(qopts...).SelectNamed(&logs, `

query := fmt.Sprintf(`
SELECT * FROM evm.logs
WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND block_number > :start_block
AND block_number <= :end_block
ORDER BY (block_number, log_index)`, args)
if err != nil {
AND block_timestamp > :block_timestamp_after
AND block_number <= %s
ORDER BY (block_number, log_index)`, nestedBlockNumberQuery())

var logs []Log
if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil {
return nil, err
}
return logs, nil
Expand Down Expand Up @@ -594,30 +592,28 @@ func (o *DbORM) SelectIndexedLogsByBlockRange(start, end int64, address common.A
}

func (o *DbORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
startBlock, endBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withStartBlock(startBlock).
withEndBlock(endBlock).
withBlockTimestampAfter(after).
withConfs(confs).
withTopicIndex(topicIndex).
withTopicValues(topicValues).
toArgs()
if err != nil {
return nil, err
}
var logs []Log
err = o.q.WithOpts(qopts...).SelectNamed(&logs, `

query := fmt.Sprintf(`
SELECT * FROM evm.logs
WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND topics[:topic_index] = ANY(:topic_values)
AND block_number > :start_block
AND block_number <= :end_block
ORDER BY (block_number, log_index)`, args)
if err != nil {
AND block_timestamp > :block_timestamp_after
AND block_number <= %s
ORDER BY (block_number, log_index)`, nestedBlockNumberQuery())

var logs []Log
if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil {
return nil, err
}
return logs, nil
Expand Down Expand Up @@ -685,32 +681,6 @@ func (o *DbORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topic
return logs, nil
}

func (o *DbORM) blocksRangeAfterTimestamp(after time.Time, confs Confirmations, qopts ...pg.QOpt) (int64, int64, error) {
args, err := newQueryArgs(o.chainID).
withBlockTimestampAfter(after).
toArgs()
if err != nil {
return 0, 0, err
}

var blocks []LogPollerBlock
err = o.q.WithOpts(qopts...).SelectNamed(&blocks, `
SELECT * FROM evm.log_poller_blocks
WHERE evm_chain_id = :evm_chain_id
AND block_number in (
SELECT unnest(array[min(block_number), max(block_number)]) FROM evm.log_poller_blocks
WHERE evm_chain_id = :evm_chain_id
AND block_timestamp > :block_timestamp_after)
order by block_number`, args)
if err != nil {
return 0, 0, err
}
if len(blocks) != 2 {
return 0, 0, nil
}
return blocks[0].BlockNumber, blocks[1].BlockNumber - int64(confs), nil
}

func (o *DbORM) SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
var logs []Log
q := o.q.WithOpts(qopts...)
Expand Down
39 changes: 22 additions & 17 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ type block struct {
}

func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address) logpoller.Log {
return GenLogWithTimestamp(chainID, logIndex, blockNum, blockHash, topic1, address, time.Now())
}

func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address, blockTimestamp time.Time) logpoller.Log {
return logpoller.Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: logIndex,
BlockHash: common.HexToHash(blockHash),
BlockNumber: blockNum,
EventSig: common.BytesToHash(topic1),
Topics: [][]byte{topic1, topic1},
Address: address,
TxHash: common.HexToHash("0x1234"),
Data: append([]byte("hello "), byte(blockNum)),
EvmChainId: utils.NewBig(chainID),
LogIndex: logIndex,
BlockHash: common.HexToHash(blockHash),
BlockNumber: blockNum,
EventSig: common.BytesToHash(topic1),
Topics: [][]byte{topic1, topic1},
Address: address,
TxHash: common.HexToHash("0x1234"),
Data: append([]byte("hello "), byte(blockNum)),
BlockTimestamp: blockTimestamp,
}
}

Expand Down Expand Up @@ -1182,10 +1187,10 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address),
GenLogWithTimestamp(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address, past),
GenLogWithTimestamp(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address, now),
GenLogWithTimestamp(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address, now),
GenLogWithTimestamp(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address, future),
}))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now))
Expand All @@ -1205,7 +1210,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "picks logs after block 1",
confs: 0,
after: past.Add(-time.Hour),
after: past,
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
Expand All @@ -1215,7 +1220,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "skips blocks with not enough confirmations",
confs: 1,
after: past.Add(-time.Hour),
after: past,
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
Expand All @@ -1224,7 +1229,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "limits number of blocks by block_timestamp",
confs: 0,
after: now.Add(-time.Hour),
after: now,
expectedLogs: []expectedLog{
{block: 3, log: 1},
},
Expand All @@ -1238,7 +1243,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "returns empty dataset when too many confirmations are required",
confs: 3,
after: past.Add(-time.Hour),
after: past,
expectedLogs: []expectedLog{},
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- +goose Up

-- Start with dropping the index introduced in a previous migration - we are not going to use it
DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp;

CREATE INDEX evm_logs_by_timestamp
ON evm.logs (evm_chain_id, address, event_sig, block_timestamp, block_number);

-- +goose Down
create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp);

drop index if exists evm.evm_logs_by_timestamp;



0 comments on commit f6401b5

Please sign in to comment.