-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: integrate streaming with consensus proposals #1609
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 6 unresolved discussions (waiting on @asmaastarkware and @guy-starkware)
crates/sequencing/papyrus_consensus/src/manager.rs
line 172 at r1 (raw file):
} }; self.handle_proposal(context, height, &mut shc, proposal_init.into(), content_receiver).await?
Minor style point, I prefer to put as little code as possible within macro blocks since it's harder for the IDE to work with
Code quote:
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError("Proposal receiver closed".to_string()));
};
let proposal_init = match first_part {
ProposalPart::Init(init) => init,
_ => {
return Err(ConsensusError::InternalNetworkError(
"Expected first part of proposal to be Init variant".to_string(),
));
}
};
self.handle_proposal(context, height, &mut shc, proposal_init.into(), content_receiver).await?
crates/sequencing/papyrus_consensus/src/manager.rs
line 202 at r1 (raw file):
ContextT: ConsensusContext, { // TODO(guyn): what is the right thing to do if proposal's height doesn't match?
We should do the same as before. Cache the channel and when we get to the height send it to SHC (create a new cache for this type)
Code quote:
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 127 at r1 (raw file):
// TODO(guyn): I think we should rename "block" and "fin" to be more descriptive! let (block, fin) = match block_receiver.await {
WDYT?
Suggestion:
block_built_from_content, fin_sent_by_proposer
crates/sequencing/papyrus_consensus/src/types.rs
line 72 at r1 (raw file):
height: BlockNumber, timeout: Duration, content: mpsc::Receiver<Self::ProposalChunk>,
I see that you are explicitly using the protobuf ProposalPart
throughout. I do not want this to be the case. What I want is a type which is generic and implements Into/From ProposalInit/ProposalFin
. Let's rename ProposalChunk
here to ProposalPart
actually. Nowhere in the papyrus_consensus
crate should the actual protobuf type ProposalPart
appear, since the crate should remain generic on the proposal's content.
Code quote:
Self::ProposalChunk
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 139 at r1 (raw file):
height: BlockNumber, timeout: Duration, content_receiver: mpsc::Receiver<ProposalPart>,
Only here in this implementation should we actually reference the protobuf ProposalPart struct, instead of being generic.
Code quote:
content_receiver: mpsc::Receiver<ProposalPart>,
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 342 at r1 (raw file):
} } ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => {
let's break out of the loop here and move this code out of the loop. And then we can explicitly check that the receiver is closed with no other content.
Code quote:
ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => {
93d5db2
to
425eed4
Compare
ddba9e3
to
e0de25f
Compare
e0de25f
to
c994b3b
Compare
Benchmark movements: |
This is still a draft and does not compile! Please look only at the flow and don't worry about the details.
This PR combines the StreamHandler and ProposalParts structs into the flow of Consensus accepting proposals.
The Consensus is given a receiver from the StreamHandler, which, in turn, listens to the network and collects messages, sending them as soon as they arrive in the correct order.
The manager's
run_height
functions takes an additional receiver, and listens separately to regular messages and for proposals (both inside the sameselect
statement),The new receiver is produced at the begining of the call to
run_consensus
by making a channel and giving one side of it to a newly created StreamHandler and the other side to therun_height
function. The StreamHandler gets a separate spawned thread to run on, listening to the network. In the future it would also be listening to the consensus context for outgoing proposals.TODO: