diff --git a/.changelog/v0.38.6/features/3008-mempool-async-update.md b/.changelog/v0.38.6/features/3008-mempool-async-update.md new file mode 100644 index 0000000000..1667aefb63 --- /dev/null +++ b/.changelog/v0.38.6/features/3008-mempool-async-update.md @@ -0,0 +1,3 @@ +- `[consensus]` Make mempool updates asynchronous from consensus Commit's, + reducing latency for reaching consensus timeouts. + ([#3008](https://github.com/cometbft/cometbft/pull/3008)) diff --git a/spec/abci/abci++_app_requirements.md b/spec/abci/abci++_app_requirements.md index 3c5c5e1c77..6d016dc677 100644 --- a/spec/abci/abci++_app_requirements.md +++ b/spec/abci/abci++_app_requirements.md @@ -297,7 +297,9 @@ will be received on the mempool connection during this processing step, providin update all four connection states to the latest committed state at the same time. -When `Commit` returns, CometBFT unlocks the mempool. +CometBFT unlocks the mempool after it has finished updating for the new block, +which occurs asynchronously from `Commit`. +See [Mempool Update](../mempool/mempool.md) for more information on what the `update` task does. WARNING: if the ABCI app logic processing the `Commit` message sends a `/broadcast_tx_sync` or `/broadcast_tx` and waits for the response diff --git a/state/execution.go b/state/execution.go index 2d78524906..686326f40b 100644 --- a/state/execution.go +++ b/state/execution.go @@ -381,34 +381,38 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t return nil } -// Commit locks the mempool, runs the ABCI Commit message, and updates the +// Commit locks the mempool, runs the ABCI Commit message, and asynchronously starts updating the // mempool. -// It returns the result of calling abci.Commit which is the height to retain (if any)). +// Commit returns the result of calling abci.Commit which is the height to retain (if any)). // The application is expected to have persisted its state (if any) before returning // from the ABCI Commit call. This is the only place where the application should // persist its state. // The Mempool must be locked during commit and update because state is // typically reset on Commit and old txs must be replayed against committed // state before new txs are run in the mempool, lest they be invalid. +// The mempool is unlocked when the Update routine completes, which is +// asynchronous from Commit. func (blockExec *BlockExecutor) Commit( state State, block *types.Block, abciResponse *abci.ResponseFinalizeBlock, ) (int64, error) { blockExec.mempool.Lock() - defer blockExec.mempool.Unlock() + unlockMempool := func() { blockExec.mempool.Unlock() } // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. err := blockExec.mempool.FlushAppConn() if err != nil { - blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err) + unlockMempool() + blockExec.logger.Error("client error during mempool.FlushAppConn, flushing mempool", "err", err) return 0, err } // Commit block, get hash back res, err := blockExec.proxyApp.Commit(context.TODO()) if err != nil { + unlockMempool() blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err) return 0, err } @@ -421,15 +425,36 @@ func (blockExec *BlockExecutor) Commit( ) // Update mempool. - err = blockExec.mempool.Update( + go blockExec.asyncUpdateMempool(unlockMempool, block, state.Copy(), abciResponse) + + return res.RetainHeight, nil +} + +// updates the mempool with the latest state asynchronously. +func (blockExec *BlockExecutor) asyncUpdateMempool( + unlockMempool func(), + block *types.Block, + state State, + abciResponse *abci.ResponseFinalizeBlock, +) { + defer unlockMempool() + + err := blockExec.mempool.Update( block.Height, block.Txs, abciResponse.TxResults, TxPreCheck(state), TxPostCheck(state), ) - - return res.RetainHeight, err + if err != nil { + // We panic in this case, out of legacy behavior. Before we made the mempool + // update complete asynchronously from Commit, we would panic if the mempool + // update failed. This is because we panic on any error within commit. + // We should consider changing this behavior in the future, as there is no + // need to panic if the mempool update failed. The most severe thing we + // would need to do is dump the mempool and restart it. + panic(fmt.Sprintf("client error during mempool.Update; error %v", err)) + } } //---------------------------------------------------------