Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract sequence tracking from the Broadcaster #12353

Merged
merged 11 commits into from
Mar 19, 2024
172 changes: 14 additions & 158 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -112,13 +111,13 @@ type Broadcaster[
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
listenerConfig txmgrtypes.BroadcasterListenerConfig
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
listenerConfig txmgrtypes.BroadcasterListenerConfig

// autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence
// when Start is called
Expand All @@ -141,10 +140,6 @@ type Broadcaster[

initSync sync.Mutex
isStarted bool

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
}

func NewBroadcaster[
Expand All @@ -164,19 +159,17 @@ func NewBroadcaster[
listenerConfig txmgrtypes.BroadcasterListenerConfig,
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ],
lggr logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
lggr: logger.Sugared(lggr),
txStore: txStore,
client: client,
TxAttemptBuilder: txAttemptBuilder,
sequenceSyncer: sequenceSyncer,
chainID: client.ConfiguredChainID(),
config: config,
feeConfig: feeConfig,
Expand All @@ -185,10 +178,10 @@ func NewBroadcaster[
ks: keystore,
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
sequenceTracker: sequenceTracker,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
b.generateNextSequence = generateNextSequence
return b
}

Expand Down Expand Up @@ -222,9 +215,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
eb.wg = sync.WaitGroup{}
eb.wg.Add(len(eb.enabledAddresses))
eb.triggers = make(map[ADDR]chan struct{})
eb.sequenceLock.Lock()
eb.nextSequenceMap = eb.loadNextSequenceMap(ctx, eb.enabledAddresses)
eb.sequenceLock.Unlock()
eb.sequenceTracker.LoadNextSequences(ctx, eb.enabledAddresses)
for _, addr := range eb.enabledAddresses {
triggerCh := make(chan struct{}, 1)
eb.triggers[addr] = triggerCh
Expand Down Expand Up @@ -284,46 +275,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(ctx context.Context, addresses []ADDR) map[ADDR]SEQ {
nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
seq, err := eb.getSequenceForAddr(ctx, address)
if err == nil {
nextSequenceMap[address] = seq
}
}

return nextSequenceMap
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err == nil {
seq = eb.generateNextSequence(seq)
return seq, nil
}
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err == nil {
return seq, nil
}
eb.lggr.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 5 * time.Second,
Jitter: true,
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newResendBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 1 * time.Second,
Expand All @@ -340,7 +291,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni

if eb.autoSyncSequence {
eb.lggr.Debugw("Auto-syncing sequence", "address", addr.String())
eb.SyncSequence(ctx, addr)
eb.sequenceTracker.SyncSequence(ctx, addr, eb.chStop)
if ctx.Err() != nil {
return
}
Expand Down Expand Up @@ -393,46 +344,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
}
}

// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
eb.lggr.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
return
}

// Enter loop with retries
var attempt int
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++
newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence)
if err != nil {
if attempt > 5 {
eb.lggr.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.lggr.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
continue
}
// Found new sequence to use from on-chain
if localSequence.String() != newNextSequence.String() {
eb.lggr.Infow("Fast-forward sequence", "address", addr, "newNextSequence", newNextSequence, "oldNextSequence", localSequence)
// Set new sequence in the map
eb.SetNextSequence(addr, newNextSequence)
}
return

}
}
}

// ProcessUnstartedTxs picks up and handles all txes in the queue
// revive:disable:error-return
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error) {
Expand Down Expand Up @@ -619,18 +530,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
case client.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
Expand Down Expand Up @@ -677,18 +582,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// transaction to have been accepted. In this case, the right thing to
// do is assume success and hand off to Confirmer

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
}
// Either the unknown error prevented the transaction from being mined, or
Expand Down Expand Up @@ -716,7 +615,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, fmt.Errorf("findNextUnstartedTransactionFromAddress failed: %w", err)
}

sequence, err := eb.GetNextSequence(ctx, etx.FromAddress)
sequence, err := eb.sequenceTracker.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -805,49 +704,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if exists {
return seq, nil
}

eb.lggr.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String())
// Check if address is in the enabled address list
if !slices.Contains(eb.enabledAddresses, address) {
return seq, fmt.Errorf("address disabled: %s", address)
}

// Try to retrieve next sequence from tx table or on-chain to load the map
// A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start)
// The expectation is that the node does not fail startup so sequences need to be loaded during runtime
foundSeq, err := eb.getSequenceForAddr(ctx, address)
if err != nil {
return seq, fmt.Errorf("failed to find next sequence for address: %s", address)
}

// Set sequence in map
eb.nextSequenceMap[address] = foundSeq
return foundSeq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = eb.generateNextSequence(seq)
}

// Used to set the next sequence explicitly to a certain value
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = seq
}

func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
duration := float64(broadcastAt.Sub(createdAt))
promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration)
Expand Down
11 changes: 0 additions & 11 deletions common/txmgr/sequence_syncer.go

This file was deleted.

3 changes: 0 additions & 3 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ type Txm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -136,7 +135,6 @@ func NewTxm[
fwdMgr txmgrtypes.ForwarderManager[ADDR],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
Expand All @@ -157,7 +155,6 @@ func NewTxm[
reset: make(chan reset),
fwdMgr: fwdMgr,
txAttemptBuilder: txAttemptBuilder,
sequenceSyncer: sequenceSyncer,
broadcaster: broadcaster,
confirmer: confirmer,
resender: resender,
Expand Down
26 changes: 26 additions & 0 deletions common/txmgr/types/sequence_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type SequenceTracker[
// Represents an account address, in native chain format.
ADDR types.Hashable,
// Represents the sequence type for a chain. For example, nonce for EVM.
SEQ types.Sequence,
] interface {
// Load the next sequence needed for transactions for all enabled addresses
LoadNextSequences(context.Context, []ADDR)
// Get the next sequence to assign to a transaction
GetNextSequence(context.Context, ADDR) (SEQ, error)
// Signals the existing sequence has been used so generates and stores the next sequence
// Can be a no-op depending on the chain
GenerateNextSequence(ADDR, SEQ)
// Syncs the local sequence with the one on-chain in case the address as been used externally
// Can be a no-op depending on the chain
SyncSequence(context.Context, ADDR, services.StopChan)
}
Loading
Loading