Skip to content

Commit

Permalink
[cp 1.23] Move payload_manager.notify_commit() to after commit (#15359)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Nov 22, 2024
1 parent 78db503 commit 527870e
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,9 @@ impl ExecutionProxy {
tx
}

fn pre_commit_hook(
&self,
block: &Block,
payload_manager: Arc<dyn TPayloadManager>,
) -> PreCommitHook {
fn pre_commit_hook(&self) -> PreCommitHook {
let mut pre_commit_notifier = self.pre_commit_notifier.clone();
let state_sync_notifier = self.state_sync_notifier.clone();
let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
Box::new(move |state_compute_result: &StateComputeResult| {
let state_compute_result = state_compute_result.clone();
Box::pin(async move {
Expand All @@ -157,15 +151,37 @@ impl ExecutionProxy {
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
})
}

fn commit_hook(
&self,
blocks: &[Arc<PipelinedBlock>],
callback: StateComputerCommitCallBackType,
finality_proof: LedgerInfoWithSignatures,
) -> NotificationType {
let payload_manager = self
.state
.read()
.as_ref()
.expect("must be set within an epoch")
.payload_manager
.clone();
let blocks = blocks.to_vec();
Box::pin(async move {
for block in blocks.iter() {
let payload = block.payload().cloned();
let payload_vec = payload.into_iter().collect();
let timestamp = block.timestamp_usecs();
payload_manager.notify_commit(timestamp, payload_vec);
}
callback(&blocks, finality_proof);
})
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -201,7 +217,7 @@ impl StateComputer for ExecutionProxy {

let txn_notifier = self.txn_notifier.clone();
let transaction_generator = BlockPreparer::new(
payload_manager.clone(),
payload_manager,
self.transaction_filter.clone(),
transaction_deduper.clone(),
transaction_shuffler.clone(),
Expand All @@ -225,7 +241,7 @@ impl StateComputer for ExecutionProxy {
parent_block_id,
transaction_generator,
block_executor_onchain_config,
self.pre_commit_hook(block, payload_manager),
self.pre_commit_hook(),
lifetime_guard,
)
.await;
Expand Down Expand Up @@ -308,14 +324,9 @@ impl StateComputer for ExecutionProxy {
)
.expect("spawn_blocking failed");

let blocks = blocks.to_vec();
let callback_fut = Box::pin(async move {
callback(&blocks, finality_proof);
});

self.commit_notifier
.clone()
.send(callback_fut)
.send(self.commit_hook(blocks, callback, finality_proof))
.await
.expect("Failed to send commit notification");

Expand Down

0 comments on commit 527870e

Please sign in to comment.