Skip to content

Commit

Permalink
Isolate transaction broadcasting latency from regas logic (Layr-Labs#465
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ian-shim authored Apr 18, 2024
1 parent 66d87bf commit 13cf70c
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 65 deletions.
1 change: 1 addition & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type TimeoutConfig struct {
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
FireblocksAPITimeout time.Duration
TxnBroadcastTimeout time.Duration
}

type Config struct {
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
FinalizationBlockDelay: finalizationBlockDelay,
}
timeoutConfig := bat.TimeoutConfig{
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
EncodingTimeout: 10 * time.Second,
AttestationTimeout: 10 * time.Second,
ChainReadTimeout: 10 * time.Second,
ChainWriteTimeout: 10 * time.Second,
TxnBroadcastTimeout: 10 * time.Second,
}

metrics := bat.NewMetrics("9100", logger)
Expand Down
9 changes: 5 additions & 4 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type EncodingStreamerMetrics struct {
}

type TxnManagerMetrics struct {
Latency prometheus.Summary
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
Expand Down Expand Up @@ -89,13 +89,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummary(
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -313,8 +314,8 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) {
t.Latency.Observe(latencyMs)
func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
Expand Down
142 changes: 104 additions & 38 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
maxSendTransactionRetry = 3
queryTickerDuration = 3 * time.Second
ErrTransactionNotBroadcasted = errors.New("transaction not broadcasted")
)

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -36,7 +38,8 @@ type TxnManager interface {

type transaction struct {
*types.Transaction
TxID walletsdk.TxID
TxID walletsdk.TxID
requestedAt time.Time
}

type TxnRequest struct {
Expand Down Expand Up @@ -70,25 +73,27 @@ type txnManager struct {
requestChan chan *TxnRequest
logger logging.Logger

receiptChan chan *ReceiptOrErr
queueSize int
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
receiptChan chan *ReceiptOrErr
queueSize int
txnBroadcastTimeout time.Duration
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
ethClient: ethClient,
wallet: wallet,
numConfirmations: numConfirmations,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger.With("component", "TxnManager"),
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnBroadcastTimeout: txnBroadcastTimeout,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
}
}

Expand Down Expand Up @@ -128,7 +133,7 @@ func (t *txnManager) Start(ctx context.Context) {
t.metrics.UpdateGasUsed(receipt.GasUsed)
}
}
t.metrics.ObserveLatency(float64(time.Since(req.requestedAt).Milliseconds()))
t.metrics.ObserveLatency("total", float64(time.Since(req.requestedAt).Milliseconds()))
}
}
}()
Expand All @@ -141,7 +146,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("new transaction", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())

var txn *types.Transaction
var txID walletsdk.TxID
Expand All @@ -164,13 +169,13 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
didTimeout = urlErr.Timeout()
}
if didTimeout || errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", req.Tx.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", txn.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
retryFromFailure++
continue
} else if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, txn.Hash().Hex(), err)
} else {
t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "ProcessTransaction", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex())
break
}
}
Expand All @@ -183,6 +188,7 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: txn,
requestedAt: time.Now(),
})

t.requestChan <- req
Expand All @@ -194,8 +200,31 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
return t.receiptChan
}

// ensureAnyTransactionBroadcasted waits until all given transactions are broadcasted to the network.
func (t *txnManager) ensureAnyTransactionBroadcasted(ctx context.Context, txs []*transaction) error {
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()

for {
for _, tx := range txs {
_, err := t.wallet.GetTransactionReceipt(ctx, tx.TxID)
if err == nil || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
t.metrics.ObserveLatency("broadcasted", float64(time.Since(tx.requestedAt).Milliseconds()))
return nil
}
}

// Wait for the next round.
select {
case <-ctx.Done():
return ctx.Err()
case <-queryTicker.C:
}
}
}

func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()
var receipt *types.Receipt
var err error
Expand All @@ -212,25 +241,32 @@ func (t *txnManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*tran
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) {
t.logger.Debug("Transaction not yet mined", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction not yet mined", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
} else if errors.Is(err, walletsdk.ErrTransactionFailed) {
t.logger.Debug("Transaction failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
t.logger.Debug("Transaction failed", "txID", txID, "txHash", tx.Hash().Hex(), "err", err)
delete(txnsToQuery, txID)
} else if err != nil {
t.logger.Debug("Transaction receipt retrieval failed", "component", "TxnManager", "method", "ensureAnyTransactionEvaled", "err", err)
} else if errors.Is(err, walletsdk.ErrNotYetBroadcasted) {
t.logger.Error("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err)
} else {
t.logger.Debug("Transaction receipt retrieval failed", "err", err)
}
}

if len(txnsToQuery) == 0 {
return nil, fmt.Errorf("all transactions failed")
}

// Wait for the next round.
select {
case <-ctx.Done():
Expand All @@ -251,10 +287,40 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
var err error

rpcCallAttempt := func() error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()
t.logger.Debug("monitoring transaction", "component", "TxnManager", "method", "monitorTransaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
t.logger.Debug("monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())

ctxWithTimeout, cancelBroadcastTimeout := context.WithTimeout(ctx, t.txnBroadcastTimeout)
defer cancelBroadcastTimeout()

// Ensure transactions are broadcasted to the network before querying the receipt.
// This is to avoid querying the receipt of a transaction that hasn't been broadcasted yet.
// For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to latency from cosigning and MPC operations.
err = t.ensureAnyTransactionBroadcasted(ctxWithTimeout, req.txAttempts)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("transaction not broadcasted within timeout", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
fireblocksWallet, ok := t.wallet.(interface {
CancelTransactionBroadcast(ctx context.Context, txID walletsdk.TxID) (bool, error)
})
if ok {
// Consider these transactions failed as they haven't been broadcasted within timeout.
// Cancel these transactions to avoid blocking the next transactions.
for _, tx := range req.txAttempts {
cancelled, err := fireblocksWallet.CancelTransactionBroadcast(ctx, tx.TxID)
if err != nil {
t.logger.Warn("failed to cancel Fireblocks transaction broadcast", "txID", tx.TxID, "err", err)
} else if cancelled {
t.logger.Info("cancelled Fireblocks transaction broadcast because it didn't get broadcasted within timeout", "txID", tx.TxID, "timeout", t.txnBroadcastTimeout.String())
}
}
}
return ErrTransactionNotBroadcasted
} else if err != nil {
t.logger.Error("unexpected error while waiting for Fireblocks transaction to broadcast", "txHash", req.Tx.Hash().Hex(), "err", err)
return err
}

ctxWithTimeout, cancelEvaluationTimeout := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancelEvaluationTimeout()
receipt, err = t.ensureAnyTransactionEvaled(
ctxWithTimeout,
req.txAttempts,
Expand All @@ -272,38 +338,38 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "component", "TxnManager", "method", "monitorTransaction", "err", err)
t.logger.Error("failed to speed up transaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
if retryFromFailure >= maxSendTransactionRetry {
t.logger.Warn("failed to send txn - retries exhausted", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retries exhausted", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Warn("failed to send txn - retrying", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn("failed to send txn - retrying", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
}
retryFromFailure++
continue
}

t.logger.Debug("successfully sent txn", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex())
req.Tx = newTx
req.txAttempts = append(req.txAttempts, &transaction{
TxID: txID,
Transaction: newTx,
})
numSpeedUps++
} else {
t.logger.Error("transaction failed", "component", "TxnManager", "method", "monitorTransaction", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
Expand Down Expand Up @@ -335,7 +401,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("increasing gas price", "component", "TxnManager", "method", "speedUpTxn", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
Loading

0 comments on commit 13cf70c

Please sign in to comment.