Skip to content

Commit

Permalink
Allow customizing BackupPollerDelayBlock or disabling Backup LogPol…
Browse files Browse the repository at this point in the history
…ler entirely for tests (#11850)

* Refactor NewLogPoller() params into logpoller.Opts struct, add BackupPollerBlockDela

* Fix chain reader test & logpoller test

This failing chainreader test had FinalityDepth set to 0, so LogPoller was starting after
the block where the logs were emitted. It was only passing due to
Backup LogPoller.  With Backup LogPoller disabled, we have to increase
the finality depth so that LogPoller will start a few blocks back (at
the first unfinalized block).

The failing logpoller test was looking for an extra log entry emitted by BackupLogPoller.
Now we can run it with BackupLogPoller disabled (leading to a more robust test) and stop
looking for that

* Fix LogPoller_Replay test

* Address SonarQube CodeSmell: reduce cognitive complexity of lp.run()

* Add BackupLogPollerBlockDelay to toml config

* Fix some merge conflicts

* Restore KeepFinalizedBlocksDepth: 1000

* Use default BackupPollerBlockDelay: 0 instead of explicitly setting to 0

* disable backup log poller in lp smoke tests

* Fix generated docs

* Add missing web resolver testdata

---------

Co-authored-by: Bartek Tofel <[email protected]>
  • Loading branch information
reductionista and Tofel authored Feb 28, 2024
1 parent c033e02 commit 014e842
Show file tree
Hide file tree
Showing 40 changed files with 584 additions and 192 deletions.
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (e *evmConfig) LogKeepBlocksDepth() uint32 {
return *e.c.LogKeepBlocksDepth
}

func (e *evmConfig) BackupLogPollerBlockDelay() uint64 {
return *e.c.BackupLogPollerBlockDelay
}

func (e *evmConfig) NonceAutoSync() bool {
return *e.c.NonceAutoSync
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type EVM interface {
LinkContractAddress() string
LogBackfillBatchSize() uint32
LogKeepBlocksDepth() uint32
BackupLogPollerBlockDelay() uint64
LogPollInterval() time.Duration
LogPrunePageSize() uint32
MinContractPayment() *commonassets.Link
Expand Down
39 changes: 20 additions & 19 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,25 +341,26 @@ func (c *EVMConfig) TOMLString() (string, error) {
}

type Chain struct {
AutoCreateKey *bool
BlockBackfillDepth *uint32
BlockBackfillSkip *bool
ChainType *string
FinalityDepth *uint32
FinalityTagEnabled *bool
FlagsContractAddress *ethkey.EIP55Address
LinkContractAddress *ethkey.EIP55Address
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
NoNewHeadsThreshold *commonconfig.Duration
OperatorFactoryAddress *ethkey.EIP55Address
RPCDefaultBatchSize *uint32
RPCBlockQueryDelay *uint16
AutoCreateKey *bool
BlockBackfillDepth *uint32
BlockBackfillSkip *bool
ChainType *string
FinalityDepth *uint32
FinalityTagEnabled *bool
FlagsContractAddress *ethkey.EIP55Address
LinkContractAddress *ethkey.EIP55Address
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
BackupLogPollerBlockDelay *uint64
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
NoNewHeadsThreshold *commonconfig.Duration
OperatorFactoryAddress *ethkey.EIP55Address
RPCDefaultBatchSize *uint32
RPCBlockQueryDelay *uint16

Transactions Transactions `toml:",omitempty"`
BalanceMonitor BalanceMonitor `toml:",omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (c *Chain) SetFrom(f *Chain) {
if v := f.LogPrunePageSize; v != nil {
c.LogPrunePageSize = v
}
if v := f.BackupLogPollerBlockDelay; v != nil {
c.BackupLogPollerBlockDelay = v
}
if v := f.MinIncomingConfirmations; v != nil {
c.MinIncomingConfirmations = v
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ LogBackfillBatchSize = 1000
LogPollInterval = '15s'
LogKeepBlocksDepth = 100000
LogPrunePageSize = 0
BackupLogPollerBlockDelay = 100
MinContractPayment = '.00001 link'
MinIncomingConfirmations = 3
NonceAutoSync = true
Expand Down
19 changes: 17 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)

lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down Expand Up @@ -113,7 +121,14 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
ec.Commit()

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down
8 changes: 6 additions & 2 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TestHarness struct {
EthDB ethdb.Database
}

func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth int64) TestHarness {
func SetupTH(t testing.TB, opts logpoller.Opts) TestHarness {
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()
chainID2 := testutils.NewRandomEVMChainID()
Expand All @@ -67,7 +67,11 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0)

if opts.PollPeriod == 0 {
opts.PollPeriod = 1 * time.Hour
}
lp := logpoller.NewLogPoller(o, esc, lggr, opts)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
144 changes: 81 additions & 63 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
type LogPollerTest interface {
LogPoller
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64)
BackupPollAndSaveLogs(ctx context.Context)
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
PruneOldBlocks(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -105,8 +105,9 @@ type logPoller struct {
keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
backupPollerNextBlock int64
logPrunePageSize int64
backupPollerNextBlock int64 // next block to be processed by Backup LogPoller
backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled

filterMu sync.RWMutex
filters map[string]Filter
Expand All @@ -121,6 +122,17 @@ type logPoller struct {
wg sync.WaitGroup
}

type Opts struct {
PollPeriod time.Duration
UseFinalityTag bool
FinalityDepth int64
BackfillBatchSize int64
RpcBatchSize int64
KeepFinalizedBlocksDepth int64
BackupPollerBlockDelay int64
LogPrunePageSize int64
}

// NewLogPoller creates a log poller. Note there is an assumption
// that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind.
// Block processing involves the following calls in steady state (without reorgs):
Expand All @@ -131,7 +143,7 @@ type logPoller struct {
//
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
return &logPoller{
ctx: ctx,
Expand All @@ -141,13 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: pollPeriod,
finalityDepth: finalityDepth,
useFinalityTag: useFinalityTag,
backfillBatchSize: backfillBatchSize,
rpcBatchSize: rpcBatchSize,
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
logPrunePageSize: logsPrunePageSize,
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
}
Expand Down Expand Up @@ -436,67 +449,41 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

func (lp *logPoller) loadFilters() error {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx))

if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}

lp.filters = filters
lp.filterDirty = true
return nil
}

func (lp *logPoller) run() {
defer lp.wg.Done()
logPollTick := time.After(0)
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
filtersLoaded := false

loadFilters := func() error {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx))

if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}

lp.filters = filters
lp.filterDirty = true
filtersLoaded = true
return nil
}

for {
select {
case <-lp.ctx.Done():
return
case fromBlockReq := <-lp.replayStart:
fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq)
if err == nil {
if !filtersLoaded {
lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq)
if err = loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock)
}
}
if err == nil {
// Serially process replay requests.
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
lp.PollAndSaveLogs(lp.ctx, fromBlock)
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
}
} else {
lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err)
}
select {
case <-lp.ctx.Done():
// We're shutting down, notify client and exit
select {
case lp.replayComplete <- ErrReplayRequestAborted:
default:
}
return
case lp.replayComplete <- err:
}
lp.handleReplayRequest(fromBlockReq, filtersLoaded)
case <-logPollTick:
logPollTick = time.After(utils.WithJitter(lp.pollPeriod))
if !filtersLoaded {
if err := loadFilters(); err != nil {
if err := lp.loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err)
continue
}
filtersLoaded = true
}

// Always start from the latest block in the db.
Expand Down Expand Up @@ -529,22 +516,23 @@ func (lp *logPoller) run() {
}
lp.PollAndSaveLogs(lp.ctx, start)
case <-backupLogPollTick:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
}
// Backup log poller: this serves as an emergency backup to protect against eventual-consistency behavior
// of an rpc node (seen occasionally on optimism, but possibly could happen on other chains?). If the first
// time we request a block, no logs or incomplete logs come back, this ensures that every log is eventually
// re-requested after it is finalized. This doesn't add much overhead, because we can request all of them
// in one shot, since we don't need to worry about re-orgs after finality depth, and it runs 100x less
// frequently than the primary log poller.

// If pollPeriod is set to 1 block time, backup log poller will run once every 100 blocks
const backupPollerBlockDelay = 100
// re-requested after it is finalized. This doesn't add much overhead, because we can request all of them
// in one shot, since we don't need to worry about re-orgs after finality depth, and it runs far less
// frequently than the primary log poller (instead of roughly once per block it runs once roughly once every
// lp.backupPollerDelay blocks--with default settings about 100x less frequently).

backupLogPollTick = time.After(utils.WithJitter(backupPollerBlockDelay * lp.pollPeriod))
backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod))
if !filtersLoaded {
lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
continue
}
lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay)
lp.BackupPollAndSaveLogs(lp.ctx)
}
}
}
Expand Down Expand Up @@ -582,7 +570,37 @@ func (lp *logPoller) backgroundWorkerRun() {
}
}

func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) {
func (lp *logPoller) handleReplayRequest(fromBlockReq int64, filtersLoaded bool) {
fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq)
if err == nil {
if !filtersLoaded {
lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq)
if err = lp.loadFilters(); err != nil {
lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock)
}
}
if err == nil {
// Serially process replay requests.
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
lp.PollAndSaveLogs(lp.ctx, fromBlock)
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
}
} else {
lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err)
}
select {
case <-lp.ctx.Done():
// We're shutting down, notify client and exit
select {
case lp.replayComplete <- ErrReplayRequestAborted:
default:
}
return
case lp.replayComplete <- err:
}
}

func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
if lp.backupPollerNextBlock == 0 {
lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
Expand All @@ -594,7 +612,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBloc
return
}
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
// (or at block 0 if whole blockchain is too short)
lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0)
}
Expand Down
Loading

0 comments on commit 014e842

Please sign in to comment.