From d07f229804ab73eb1b9050f97fa22b677da282aa Mon Sep 17 00:00:00 2001 From: aldenhu Date: Thu, 21 Nov 2024 23:00:17 +0000 Subject: [PATCH] Move payload_manager.notify_commit to after commit --- consensus/src/pipeline/pipeline_builder.rs | 13 ++--- consensus/src/state_computer.rs | 47 ++++++++++++------- .../src/vm_test_harness.rs | 2 +- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index abd3884d8c22b..07cbf4673b49a 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -325,7 +325,6 @@ impl PipelineBuilder { pre_commit_fut.clone(), parent.post_pre_commit_fut.clone(), self.state_sync_notifier.clone(), - self.payload_manager.clone(), block.clone(), ), &mut abort_handles, @@ -335,6 +334,7 @@ impl PipelineBuilder { pre_commit_fut.clone(), commit_ledger_fut.clone(), parent.post_commit_fut.clone(), + self.payload_manager.clone(), block_store_callback, block.clone(), ), @@ -618,15 +618,12 @@ impl PipelineBuilder { pre_commit: TaskFuture, parent_post_pre_commit: TaskFuture, state_sync_notifier: Arc, - payload_manager: Arc, block: Arc, ) -> TaskResult { let compute_result = pre_commit.await?; parent_post_pre_commit.await?; let _tracker = Tracker::new("post_pre_commit", &block); - let payload = block.payload().cloned(); - let timestamp = block.timestamp_usecs(); let _timer = counters::OP_COUNTERS.timer("pre_commit_notify"); let txns = compute_result.transactions_to_commit().to_vec(); @@ -640,8 +637,6 @@ impl PipelineBuilder { error!(error = ?e, "Failed to notify state synchronizer"); } - let payload_vec = payload.into_iter().collect(); - payload_manager.notify_commit(timestamp, payload_vec); Ok(()) } @@ -684,6 +679,7 @@ impl PipelineBuilder { pre_commit_fut: TaskFuture, commit_ledger_fut: TaskFuture, parent_post_commit: TaskFuture, + payload_manager: Arc, block_store_callback: Box, block: Arc, ) -> TaskResult { @@ -695,6 +691,11 @@ impl PipelineBuilder { update_counters_for_block(&block); update_counters_for_compute_result(&compute_result); + let payload = block.payload().cloned(); + let timestamp = block.timestamp_usecs(); + let payload_vec = payload.into_iter().collect(); + payload_manager.notify_commit(timestamp, payload_vec); + if let Some(ledger_info_with_sigs) = maybe_ledger_info_with_sigs { block_store_callback(ledger_info_with_sigs); } diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 5db63850cd6d6..498be4e87ea9a 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -131,15 +131,9 @@ impl ExecutionProxy { tx } - fn pre_commit_hook( - &self, - block: &Block, - payload_manager: Arc, - ) -> PreCommitHook { + fn pre_commit_hook(&self) -> PreCommitHook { let mut pre_commit_notifier = self.pre_commit_notifier.clone(); let state_sync_notifier = self.state_sync_notifier.clone(); - let payload = block.payload().cloned(); - let timestamp = block.timestamp_usecs(); Box::new(move |state_compute_result: &StateComputeResult| { let state_compute_result = state_compute_result.clone(); Box::pin(async move { @@ -158,9 +152,6 @@ impl ExecutionProxy { ) { error!(error = ?e, "Failed to notify state synchronizer"); } - - let payload_vec = payload.into_iter().collect(); - payload_manager.notify_commit(timestamp, payload_vec); })) .await .expect("Failed to send pre-commit notification"); @@ -168,6 +159,31 @@ impl ExecutionProxy { }) } + fn commit_hook( + &self, + blocks: &[Arc], + callback: StateComputerCommitCallBackType, + finality_proof: LedgerInfoWithSignatures, + ) -> NotificationType { + let payload_manager = self + .state + .read() + .as_ref() + .expect("must be set within an epoch") + .payload_manager + .clone(); + let blocks = blocks.to_vec(); + Box::pin(async move { + for block in blocks.iter() { + let payload = block.payload().cloned(); + let payload_vec = payload.into_iter().collect(); + let timestamp = block.timestamp_usecs(); + payload_manager.notify_commit(timestamp, payload_vec); + } + callback(&blocks, finality_proof); + }) + } + pub fn pipeline_builder(&self, commit_signer: Arc) -> PipelineBuilder { let MutableState { validators, @@ -236,7 +252,7 @@ impl StateComputer for ExecutionProxy { let txn_notifier = self.txn_notifier.clone(); let transaction_generator = BlockPreparer::new( - payload_manager.clone(), + payload_manager, self.transaction_filter.clone(), transaction_deduper.clone(), transaction_shuffler.clone(), @@ -260,7 +276,7 @@ impl StateComputer for ExecutionProxy { parent_block_id, transaction_generator, block_executor_onchain_config, - self.pre_commit_hook(block, payload_manager), + self.pre_commit_hook(), lifetime_guard, ) .await; @@ -343,14 +359,9 @@ impl StateComputer for ExecutionProxy { ) .expect("spawn_blocking failed"); - let blocks = blocks.to_vec(); - let callback_fut = Box::pin(async move { - callback(&blocks, finality_proof); - }); - self.commit_notifier .clone() - .send(callback_fut) + .send(self.commit_hook(blocks, callback, finality_proof)) .await .expect("Failed to send commit notification"); diff --git a/third_party/move/testing-infra/transactional-test-runner/src/vm_test_harness.rs b/third_party/move/testing-infra/transactional-test-runner/src/vm_test_harness.rs index 00730088b70a6..0cf1f792eb255 100644 --- a/third_party/move/testing-infra/transactional-test-runner/src/vm_test_harness.rs +++ b/third_party/move/testing-infra/transactional-test-runner/src/vm_test_harness.rs @@ -302,7 +302,7 @@ impl<'a> MoveTestAdapter<'a> for SimpleVMTestAdapter<'a> { Compatibility::new( !extra_args.skip_check_struct_layout, !extra_args.skip_check_friend_linking, - false + false, ) }; if vm.vm_config().use_loader_v2 {