diff --git a/.changeset/small-beers-perform.md b/.changeset/small-beers-perform.md new file mode 100644 index 0000000000..a420116a44 --- /dev/null +++ b/.changeset/small-beers-perform.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Making LogPoller's replay more robust by backfilling up to finalized block and processing rest in the main loop diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index f7b9c47dd8..7bc69a947a 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "encoding/binary" + "errors" "fmt" "math/big" "sort" @@ -376,7 +377,13 @@ func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQ // If ctx is cancelled before the replay request has been initiated, ErrReplayRequestAborted is returned. If the replay // is already in progress, the replay will continue and ErrReplayInProgress will be returned. If the client needs a // guarantee that the replay is complete before proceeding, it should either avoid cancelling or retry until nil is returned -func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { +func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { + defer func() { + if errors.Is(err, context.Canceled) { + err = ErrReplayRequestAborted + } + }() + lp.lggr.Debugf("Replaying from block %d", fromBlock) latest, err := lp.ec.HeadByNumber(ctx, nil) if err != nil { @@ -385,6 +392,27 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { if fromBlock < 1 || fromBlock > latest.Number { return pkgerrors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number) } + + // Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop. + // This is safe, because chain cannot be rewinded deeper than that, so there must not be any race conditions. + savedFinalizedBlockNumber, err := lp.savedFinalizedBlockNumber(ctx) + if err != nil { + return err + } + if fromBlock <= savedFinalizedBlockNumber { + err = lp.backfill(ctx, fromBlock, savedFinalizedBlockNumber) + if err != nil { + return err + } + } + + // Poll everything after latest finalized block in main loop to avoid concurrent writes during reorg + // We assume that number of logs between saved finalized block and current head is small enough to be processed in main loop + fromBlock = mathutil.Max(fromBlock, savedFinalizedBlockNumber+1) + // Don't continue if latest block number is the same as saved finalized block number + if fromBlock > latest.Number { + return nil + } // Block until replay notification accepted or cancelled. select { case lp.replayStart <- fromBlock: @@ -403,6 +431,20 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error { } } +// savedFinalizedBlockNumber returns the FinalizedBlockNumber saved with the last processed block in the db +// (latestFinalizedBlock at the time the last processed block was saved) +// If this is the first poll and no blocks are in the db, it returns 0 +func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, error) { + latestProcessed, err := lp.LatestBlock(pg.WithParentCtx(ctx)) + if err == nil { + return latestProcessed.FinalizedBlockNumber, nil + } + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err +} + func (lp *logPoller) recvReplayComplete() { err := <-lp.replayComplete if err != nil { diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index af2d9a558e..9c28dfd5c4 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -27,6 +27,7 @@ import ( evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" @@ -253,6 +254,7 @@ func TestLogPoller_Replay(t *testing.T) { chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + ctx := testutils.Context(t) head := evmtypes.Head{Number: 4} events := []common.Hash{EmitterABI.Events["Log1"].ID} @@ -268,7 +270,7 @@ func TestLogPoller_Replay(t *testing.T) { ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice() ec.On("ConfiguredChainID").Return(chainID, nil) lpOpts := Opts{ PollPeriod: time.Hour, @@ -285,12 +287,13 @@ func TestLogPoller_Replay(t *testing.T) { latest, err := lp.LatestBlock() require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) + require.Equal(t, int64(1), latest.FinalizedBlockNumber) t.Run("abort before replayStart received", func(t *testing.T) { // Replay() should abort immediately if caller's context is cancelled before request signal is read - ctx, cancel := context.WithCancel(testutils.Context(t)) + cancelCtx, cancel := context.WithCancel(testutils.Context(t)) cancel() - err = lp.Replay(ctx, 3) + err = lp.Replay(cancelCtx, 3) assert.ErrorIs(t, err, ErrReplayRequestAborted) }) @@ -305,12 +308,11 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return error code received from replayComplete t.Run("returns error code on replay complete", func(t *testing.T) { - ctx := testutils.Context(t) anyErr := pkgerrors.New("any error") done := make(chan struct{}) go func() { defer close(done) - recvStartReplay(ctx, 1) + recvStartReplay(ctx, 2) lp.replayComplete <- anyErr }() assert.ErrorIs(t, lp.Replay(ctx, 1), anyErr) @@ -319,14 +321,14 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return ErrReplayInProgress if caller's context is cancelled after replay has begun t.Run("late abort returns ErrReplayInProgress", func(t *testing.T) { - ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s + cancelCtx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s done := make(chan struct{}) go func() { defer close(done) - recvStartReplay(ctx, 4) + recvStartReplay(cancelCtx, 4) cancel() }() - assert.ErrorIs(t, lp.Replay(ctx, 4), ErrReplayInProgress) + assert.ErrorIs(t, lp.Replay(cancelCtx, 4), ErrReplayInProgress) <-done lp.replayComplete <- nil lp.wg.Wait() @@ -336,8 +338,6 @@ func TestLogPoller_Replay(t *testing.T) { t.Run("client abort doesnt hang run loop", func(t *testing.T) { lp.backupPollerNextBlock = 0 - ctx := testutils.Context(t) - pass := make(chan struct{}) cancelled := make(chan struct{}) @@ -392,7 +392,6 @@ func TestLogPoller_Replay(t *testing.T) { done := make(chan struct{}) defer func() { <-done }() - ctx := testutils.Context(t) ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { go func() { defer close(done) @@ -425,7 +424,7 @@ func TestLogPoller_Replay(t *testing.T) { lp.ReplayAsync(1) - recvStartReplay(testutils.Context(t), 1) + recvStartReplay(testutils.Context(t), 2) }) t.Run("ReplayAsync error", func(t *testing.T) { @@ -447,6 +446,32 @@ func TestLogPoller_Replay(t *testing.T) { require.Equal(t, 1, observedLogs.Len()) assert.Equal(t, observedLogs.All()[0].Message, anyErr.Error()) }) + + t.Run("run regular replay when there are not blocks in db", func(t *testing.T) { + err := lp.orm.DeleteLogsAndBlocksAfter(0) + require.NoError(t, err) + + lp.ReplayAsync(1) + recvStartReplay(testutils.Context(t), 1) + }) + + t.Run("run only backfill when everything is finalized", func(t *testing.T) { + err := lp.orm.DeleteLogsAndBlocksAfter(0) + require.NoError(t, err) + + err = lp.orm.InsertLogsWithBlock([]Log{}, LogPollerBlock{ + EvmChainId: ubig.New(chainID), + BlockHash: head.Hash, + BlockNumber: head.Number, + BlockTimestamp: head.Timestamp, + FinalizedBlockNumber: head.Number, + CreatedAt: time.Time{}, + }) + require.NoError(t, err) + + err = lp.Replay(ctx, 1) + require.NoError(t, err) + }) } func (lp *logPoller) reset() { diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 665817af58..02ba326510 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1502,7 +1502,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) require.Eventually(t, func() bool { - return observedLogs.Len() >= 4 + return observedLogs.Len() >= 1 }, 2*time.Second, 20*time.Millisecond) lp.Close() @@ -1518,7 +1518,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { assert.Contains(t, logMsgs, "SQL ERROR") assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later") - assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock") } type getLogErrData struct {