Skip to content

Commit

Permalink
feat(consensus): use protobuf/SMEvent round
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Jul 31, 2024
1 parent 24a818d commit 8ee1b00
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl ConsensusContext for PapyrusConsensusContext {
fin_receiver.await.expect("Failed to get block hash from fin receiver");
let proposal = Proposal {
height: init.height.0,
round: 0, // TODO(Asmaa): add round to the proposal.
round: init.round,
proposer: init.proposer,
transactions,
block_hash,
Expand Down Expand Up @@ -234,8 +234,11 @@ impl From<ProposalWrapper>
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init =
ProposalInit { height: BlockNumber(val.0.height), proposer: val.0.proposer };
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ async fn propose() {
let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(block.header.block_hash).unwrap();

let proposal_init = ProposalInit { height: block_number, proposer: ContractAddress::default() };
let proposal_init =
ProposalInit { height: block_number, round: 0, proposer: ContractAddress::default() };
papyrus_context.propose(proposal_init.clone(), content_receiver, fin_receiver).await.unwrap();

let expected_message = ConsensusMessage::Proposal(Proposal {
Expand Down
49 changes: 28 additions & 21 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
#[path = "single_height_consensus_test.rs"]
mod single_height_consensus_test;

use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};

use futures::channel::{mpsc, oneshot};
use papyrus_protobuf::consensus::{ConsensusMessage, Vote, VoteType};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument, trace};
use tracing::{debug, info, instrument, trace, warn};

use crate::state_machine::{StateMachine, StateMachineEvent};
use crate::types::{
Expand All @@ -20,8 +21,6 @@ use crate::types::{
ValidatorId,
};

const ROUND_ZERO: Round = 0;

/// Struct which represents a single height of consensus. Each height is expected to be begun with a
/// call to `start`, which is relevant if we are the proposer for this height's first round.
/// SingleHeightConsensus receives messages directly as parameters to function calls. It can send
Expand Down Expand Up @@ -91,6 +90,10 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
format!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg));
}
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
warn!("Round {} already has a proposal, ignoring", init.round);
return Ok(None);
};

let block_receiver = context.validate_proposal(self.height, p2p_messages_receiver).await;
// TODO(matan): Actual Tendermint should handle invalid proposals.
Expand All @@ -117,9 +120,8 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
"block signature doesn't match expected block hash".into(),
));
}
let sm_proposal = StateMachineEvent::Proposal(Some(block.id()), ROUND_ZERO);
// TODO(matan): Handle multiple rounds.
self.proposals.insert(ROUND_ZERO, block);
let sm_proposal = StateMachineEvent::Proposal(Some(block.id()), init.round);
proposal_entry.insert(block);
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
let sm_events = self.state_machine.handle_event(sm_proposal, &leader_fn);
Expand Down Expand Up @@ -150,26 +152,31 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
) -> Result<Option<Decision<BlockT>>, ConsensusError> {
let (votes, sm_vote) = match vote.vote_type {
VoteType::Prevote => {
(&mut self.prevotes, StateMachineEvent::Prevote(vote.block_hash, ROUND_ZERO))
(&mut self.prevotes, StateMachineEvent::Prevote(vote.block_hash, vote.round))
}
VoteType::Precommit => {
(&mut self.precommits, StateMachineEvent::Precommit(vote.block_hash, ROUND_ZERO))
(&mut self.precommits, StateMachineEvent::Precommit(vote.block_hash, vote.round))
}
};
if let Some(old) = votes.get(&(ROUND_ZERO, vote.voter)) {
if old.block_hash != vote.block_hash {
return Err(ConsensusError::Equivocation(
self.height,
ConsensusMessage::Vote(old.clone()),
ConsensusMessage::Vote(vote),
));
} else {
// Replay, ignore.
return Ok(None);

match votes.entry((vote.round, vote.voter)) {
Entry::Vacant(entry) => {
entry.insert(vote.clone());
}
Entry::Occupied(entry) => {
let old = entry.get();
if old.block_hash != vote.block_hash {
return Err(ConsensusError::Equivocation(
self.height,
ConsensusMessage::Vote(old.clone()),
ConsensusMessage::Vote(vote),
));
} else {
// Replay, ignore.
return Ok(None);
}
}
}

votes.insert((ROUND_ZERO, vote.voter), vote);
let leader_fn =
|_round: Round| -> ValidatorId { context.proposer(&self.validators, self.height) };
let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn);
Expand Down Expand Up @@ -229,7 +236,7 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {

let (p2p_messages_receiver, block_receiver) = context.build_proposal(self.height).await;
let (fin_sender, fin_receiver) = oneshot::channel();
let init = ProposalInit { height: self.height, proposer: self.id };
let init = ProposalInit { height: self.height, round, proposer: self.id };
// Peering is a permanent component, so if sending to it fails we cannot continue.
context
.propose(init, p2p_messages_receiver, fin_receiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@ use super::SingleHeightConsensus;
use crate::test_utils::{MockTestContext, TestBlock};
use crate::types::{ConsensusBlock, ProposalInit, ValidatorId};

fn prevote(block_hash: Option<BlockHash>, height: u64, voter: ValidatorId) -> ConsensusMessage {
ConsensusMessage::Vote(Vote {
vote_type: VoteType::Prevote,
height,
round: 0,
block_hash,
voter,
})
fn prevote(
block_hash: Option<BlockHash>,
height: u64,
round: u32,
voter: ValidatorId,
) -> ConsensusMessage {
ConsensusMessage::Vote(Vote { vote_type: VoteType::Prevote, height, round, block_hash, voter })
}

fn precommit(block_hash: Option<BlockHash>, height: u64, voter: ValidatorId) -> ConsensusMessage {
fn precommit(
block_hash: Option<BlockHash>,
height: u64,
round: u32,
voter: ValidatorId,
) -> ConsensusMessage {
ConsensusMessage::Vote(Vote {
vote_type: VoteType::Precommit,
height,
round: 0,
round,
block_hash,
voter,
})
Expand All @@ -44,17 +48,17 @@ async fn proposer() {
vec![node_id, 2_u32.into(), 3_u32.into(), 4_u32.into()],
);

context.expect_proposer().returning(move |_, _| node_id);
context.expect_proposer().times(1).returning(move |_, _| node_id);
let block_clone = block.clone();
context.expect_build_proposal().returning(move |_| {
context.expect_build_proposal().times(1).returning(move |_| {
let (_, content_receiver) = mpsc::channel(1);
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(block_clone.clone()).unwrap();
(content_receiver, block_receiver)
});
let fin_receiver = Arc::new(OnceLock::new());
let fin_receiver_clone = Arc::clone(&fin_receiver);
context.expect_propose().return_once(move |init, _, fin_receiver| {
context.expect_propose().times(1).return_once(move |init, _, fin_receiver| {
// Ignore content receiver, since this is the context's responsibility.
assert_eq!(init.height, BlockNumber(0));
assert_eq!(init.proposer, node_id);
Expand All @@ -63,30 +67,32 @@ async fn proposer() {
});
context
.expect_broadcast()
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(block_id), 0, node_id))
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(block_id), 0, 0, node_id))
.returning(move |_| Ok(()));
// Sends proposal and prevote.
assert!(matches!(shc.start(&mut context).await, Ok(None)));

assert_eq!(
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 2_u32.into())).await,
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 0, 2_u32.into())).await,
Ok(None)
);
// 3 of 4 Prevotes is enough to send a Precommit.
context
.expect_broadcast()
.withf(move |msg: &ConsensusMessage| msg == &precommit(Some(block_id), 0, node_id))
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &precommit(Some(block_id), 0, 0, node_id))
.returning(move |_| Ok(()));
assert_eq!(
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 3_u32.into())).await,
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 0, 3_u32.into())).await,
Ok(None)
);

let precommits = vec![
precommit(Some(block.id()), 0, 1_u32.into()),
precommit(Some(BlockHash(Felt::TWO)), 0, 4_u32.into()), // Ignores since disagrees.
precommit(Some(block.id()), 0, 2_u32.into()),
precommit(Some(block.id()), 0, 3_u32.into()),
precommit(Some(block.id()), 0, 0, 1_u32.into()),
precommit(Some(BlockHash(Felt::TWO)), 0, 0, 4_u32.into()), // Ignores since disagrees.
precommit(Some(block.id()), 0, 0, 2_u32.into()),
precommit(Some(block.id()), 0, 0, 3_u32.into()),
];
assert_eq!(shc.handle_message(&mut context, precommits[1].clone()).await, Ok(None));
assert_eq!(shc.handle_message(&mut context, precommits[2].clone()).await, Ok(None));
Expand Down Expand Up @@ -124,45 +130,47 @@ async fn validator() {
let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(block.id()).unwrap();

context.expect_proposer().returning(move |_, _| proposer);
context.expect_proposer().times(1).returning(move |_, _| proposer);
let block_clone = block.clone();
context.expect_validate_proposal().returning(move |_, _| {
context.expect_validate_proposal().times(1).returning(move |_, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(block_clone.clone()).unwrap();
block_receiver
});
context
.expect_broadcast()
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(block_id), 0, node_id))
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(block_id), 0, 0, node_id))
.returning(move |_| Ok(()));
let res = shc
.handle_proposal(
&mut context,
ProposalInit { height: BlockNumber(0), proposer },
ProposalInit { height: BlockNumber(0), round: 0, proposer },
mpsc::channel(1).1, // content - ignored by SHC.
fin_receiver,
)
.await;
assert_eq!(res, Ok(None));

assert_eq!(
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 2_u32.into())).await,
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 0, 2_u32.into())).await,
Ok(None)
);
// 3 of 4 Prevotes is enough to send a Precommit.
context
.expect_broadcast()
.withf(move |msg: &ConsensusMessage| msg == &precommit(Some(block_id), 0, node_id))
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &precommit(Some(block_id), 0, 0, node_id))
.returning(move |_| Ok(()));
assert_eq!(
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 3_u32.into())).await,
shc.handle_message(&mut context, prevote(Some(block.id()), 0, 0, 3_u32.into())).await,
Ok(None)
);

let precommits = vec![
precommit(Some(block.id()), 0, 2_u32.into()),
precommit(Some(block.id()), 0, 3_u32.into()),
precommit(Some(block.id()), 0, node_id),
precommit(Some(block.id()), 0, 0, 2_u32.into()),
precommit(Some(block.id()), 0, 0, 3_u32.into()),
precommit(Some(block.id()), 0, 0, node_id),
];
assert_eq!(shc.handle_message(&mut context, precommits[0].clone()).await, Ok(None));
let decision = shc.handle_message(&mut context, precommits[1].clone()).await.unwrap().unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl<BlockT: ConsensusBlock> Debug for Decision<BlockT> {
#[derive(PartialEq, Debug, Clone)]
pub struct ProposalInit {
pub height: BlockNumber,
pub round: Round,
pub proposer: ValidatorId,
}

Expand Down

0 comments on commit 8ee1b00

Please sign in to comment.