Skip to content

Commit

Permalink
BCF-3059 Hardening LogPoller replay (#12484) (#649)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Mar 29, 2024
2 parents 4cb46c1 + 014899e commit 07d8d0b
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-beers-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Making LogPoller's replay more robust by backfilling up to finalized block and processing rest in the main loop
44 changes: 43 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"encoding/binary"
"errors"
"fmt"
"math/big"
"sort"
Expand Down Expand Up @@ -376,7 +377,13 @@ func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQ
// If ctx is cancelled before the replay request has been initiated, ErrReplayRequestAborted is returned. If the replay
// is already in progress, the replay will continue and ErrReplayInProgress will be returned. If the client needs a
// guarantee that the replay is complete before proceeding, it should either avoid cancelling or retry until nil is returned
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) {
defer func() {
if errors.Is(err, context.Canceled) {
err = ErrReplayRequestAborted
}
}()

lp.lggr.Debugf("Replaying from block %d", fromBlock)
latest, err := lp.ec.HeadByNumber(ctx, nil)
if err != nil {
Expand All @@ -385,6 +392,27 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
if fromBlock < 1 || fromBlock > latest.Number {
return pkgerrors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number)
}

// Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop.
// This is safe, because chain cannot be rewinded deeper than that, so there must not be any race conditions.
savedFinalizedBlockNumber, err := lp.savedFinalizedBlockNumber(ctx)
if err != nil {
return err
}
if fromBlock <= savedFinalizedBlockNumber {
err = lp.backfill(ctx, fromBlock, savedFinalizedBlockNumber)
if err != nil {
return err
}
}

// Poll everything after latest finalized block in main loop to avoid concurrent writes during reorg
// We assume that number of logs between saved finalized block and current head is small enough to be processed in main loop
fromBlock = mathutil.Max(fromBlock, savedFinalizedBlockNumber+1)
// Don't continue if latest block number is the same as saved finalized block number
if fromBlock > latest.Number {
return nil
}
// Block until replay notification accepted or cancelled.
select {
case lp.replayStart <- fromBlock:
Expand All @@ -403,6 +431,20 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
}
}

// savedFinalizedBlockNumber returns the FinalizedBlockNumber saved with the last processed block in the db
// (latestFinalizedBlock at the time the last processed block was saved)
// If this is the first poll and no blocks are in the db, it returns 0
func (lp *logPoller) savedFinalizedBlockNumber(ctx context.Context) (int64, error) {
latestProcessed, err := lp.LatestBlock(pg.WithParentCtx(ctx))
if err == nil {
return latestProcessed.FinalizedBlockNumber, nil
}
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}

func (lp *logPoller) recvReplayComplete() {
err := <-lp.replayComplete
if err != nil {
Expand Down
49 changes: 37 additions & 12 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestLogPoller_Replay(t *testing.T) {
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
ctx := testutils.Context(t)

head := evmtypes.Head{Number: 4}
events := []common.Hash{EmitterABI.Events["Log1"].ID}
Expand All @@ -268,7 +270,7 @@ func TestLogPoller_Replay(t *testing.T) {

ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice()
ec.On("ConfiguredChainID").Return(chainID, nil)
lpOpts := Opts{
PollPeriod: time.Hour,
Expand All @@ -285,12 +287,13 @@ func TestLogPoller_Replay(t *testing.T) {
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest.BlockNumber)
require.Equal(t, int64(1), latest.FinalizedBlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
ctx, cancel := context.WithCancel(testutils.Context(t))
cancelCtx, cancel := context.WithCancel(testutils.Context(t))
cancel()
err = lp.Replay(ctx, 3)
err = lp.Replay(cancelCtx, 3)
assert.ErrorIs(t, err, ErrReplayRequestAborted)
})

Expand All @@ -305,12 +308,11 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return error code received from replayComplete
t.Run("returns error code on replay complete", func(t *testing.T) {
ctx := testutils.Context(t)
anyErr := pkgerrors.New("any error")
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 1)
recvStartReplay(ctx, 2)
lp.replayComplete <- anyErr
}()
assert.ErrorIs(t, lp.Replay(ctx, 1), anyErr)
Expand All @@ -319,14 +321,14 @@ func TestLogPoller_Replay(t *testing.T) {

// Replay() should return ErrReplayInProgress if caller's context is cancelled after replay has begun
t.Run("late abort returns ErrReplayInProgress", func(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
cancelCtx, cancel := context.WithTimeout(testutils.Context(t), time.Second) // Intentionally abort replay after 1s
done := make(chan struct{})
go func() {
defer close(done)
recvStartReplay(ctx, 4)
recvStartReplay(cancelCtx, 4)
cancel()
}()
assert.ErrorIs(t, lp.Replay(ctx, 4), ErrReplayInProgress)
assert.ErrorIs(t, lp.Replay(cancelCtx, 4), ErrReplayInProgress)
<-done
lp.replayComplete <- nil
lp.wg.Wait()
Expand All @@ -336,8 +338,6 @@ func TestLogPoller_Replay(t *testing.T) {
t.Run("client abort doesnt hang run loop", func(t *testing.T) {
lp.backupPollerNextBlock = 0

ctx := testutils.Context(t)

pass := make(chan struct{})
cancelled := make(chan struct{})

Expand Down Expand Up @@ -392,7 +392,6 @@ func TestLogPoller_Replay(t *testing.T) {
done := make(chan struct{})
defer func() { <-done }()

ctx := testutils.Context(t)
ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) {
go func() {
defer close(done)
Expand Down Expand Up @@ -425,7 +424,7 @@ func TestLogPoller_Replay(t *testing.T) {

lp.ReplayAsync(1)

recvStartReplay(testutils.Context(t), 1)
recvStartReplay(testutils.Context(t), 2)
})

t.Run("ReplayAsync error", func(t *testing.T) {
Expand All @@ -447,6 +446,32 @@ func TestLogPoller_Replay(t *testing.T) {
require.Equal(t, 1, observedLogs.Len())
assert.Equal(t, observedLogs.All()[0].Message, anyErr.Error())
})

t.Run("run regular replay when there are not blocks in db", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

lp.ReplayAsync(1)
recvStartReplay(testutils.Context(t), 1)
})

t.Run("run only backfill when everything is finalized", func(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(0)
require.NoError(t, err)

err = lp.orm.InsertLogsWithBlock([]Log{}, LogPollerBlock{
EvmChainId: ubig.New(chainID),
BlockHash: head.Hash,
BlockNumber: head.Number,
BlockTimestamp: head.Timestamp,
FinalizedBlockNumber: head.Number,
CreatedAt: time.Time{},
})
require.NoError(t, err)

err = lp.Replay(ctx, 1)
require.NoError(t, err)
})
}

func (lp *logPoller) reset() {
Expand Down
3 changes: 1 addition & 2 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {
time.Sleep(100 * time.Millisecond)
require.NoError(t, lp.Start(ctx))
require.Eventually(t, func() bool {
return observedLogs.Len() >= 4
return observedLogs.Len() >= 1
}, 2*time.Second, 20*time.Millisecond)
lp.Close()

Expand All @@ -1518,7 +1518,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {

assert.Contains(t, logMsgs, "SQL ERROR")
assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later")
assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock")
}

type getLogErrData struct {
Expand Down

0 comments on commit 07d8d0b

Please sign in to comment.