Skip to content

Commit

Permalink
feat: integrate streaming with consensus proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Oct 28, 2024
1 parent a3b26a1 commit 93d5db2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 71 deletions.
8 changes: 8 additions & 0 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ pub struct ProposalInit {
pub proposer: ContractAddress,
}

// TODO(guyn): after uniting this struct with that in sequencing/papyrus_consensus/src/types.rs,
// we'll be able to remove this and the needless conversions back and forth to a tuple.
impl From<ProposalInit> for (BlockNumber, u32, ContractAddress, Option<u32>) {
fn from(val: ProposalInit) -> Self {
return (BlockNumber(val.height), val.round, val.proposer, val.valid_round);
}
}

/// There is one or more batches of transactions in a proposed block.
#[derive(Debug, Clone, PartialEq)]
pub struct TransactionBatch {
Expand Down
76 changes: 65 additions & 11 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalWrapper};
use papyrus_network::network_manager::{BroadcastTopicClientTrait, BroadcastTopicServer};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, ProposalWrapper, StreamMessage};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, ShcTask, SingleHeightConsensus};
use crate::stream_handler::StreamHandler;
use crate::types::{
BroadcastConsensusMessageChannel,
ConsensusContext,
Expand All @@ -37,6 +38,7 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
proposal_broadcast_channels: BroadcastTopicServer<StreamMessage<ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
Expand All @@ -60,10 +62,22 @@ where
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new(validator_id, timeouts);
let (proposal_sender, mut proposal_receiver) = mpsc::channel(10);
let mut stream_handler =
StreamHandler::<ProposalPart>::new(proposal_sender, proposal_broadcast_channels);
tokio::spawn(async move {
stream_handler.run().await;
});

loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels);
let run_height = manager.run_height(
&mut context,
current_height,
&mut broadcast_channels,
&mut proposal_receiver,
);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
Expand Down Expand Up @@ -108,6 +122,7 @@ impl MultiHeightManager {
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
Expand Down Expand Up @@ -142,6 +157,20 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError("Proposal receiver closed".to_string()));
};
let proposal_init = match first_part {
ProposalPart::Init(init) => init,
_ => {
return Err(ConsensusError::InternalNetworkError(
"Expected first part of proposal to be Init variant".to_string(),
));
}
};
self.handle_proposal(context, height, &mut shc, proposal_init.into(), content_receiver).await?
},
Some(shc_task) = shc_tasks.next() => {
shc.handle_event(context, shc_task.event).await?
},
Expand All @@ -158,6 +187,29 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: (BlockNumber, u32, ContractAddress, Option<u32>),
content_receiver: mpsc::Receiver<ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.0 != height {
debug!("Received a proposal for a different height. {:?}", proposal_init);
// if message.height() > height.0 {
// self.cached_messages.entry(message.height()).or_default().push(message);
// }
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
&mut self,
Expand Down Expand Up @@ -186,14 +238,16 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init.into(), content_receiver, fin_receiver)
.await?;
Ok(res)
ConsensusMessage::Proposal(_proposal) => {
// Special case due to fake streaming. TODO(guyn): We can eliminate this option and
// leave handle_message. let (proposal_init, content_receiver,
// fin_receiver) = ProposalWrapper(proposal).into();
// let res = shc
// .handle_proposal(context, proposal_init.into(), content_receiver,
// fin_receiver) .await?;
Err(ConsensusError::InternalNetworkError(
"Proposal variance of ConsensusMessage no longer supported".to_string(),
))
}
_ => {
let res = shc.handle_message(context, message).await?;
Expand Down
29 changes: 15 additions & 14 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use papyrus_protobuf::consensus::{ConsensusMessage, Vote, VoteType};
use futures::channel::mpsc;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, Vote, VoteType};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, trace, warn};

Expand Down Expand Up @@ -97,8 +97,8 @@ impl SingleHeightConsensus {
&mut self,
context: &mut ContextT,
init: ProposalInit,
p2p_messages_receiver: mpsc::Receiver<ContextT::ProposalChunk>,
fin_receiver: oneshot::Receiver<ProposalContentId>,
p2p_messages_receiver: mpsc::Receiver<ProposalPart>,
// fin_receiver: oneshot::Receiver<ProposalContentId>,
) -> Result<ShcReturn, ConsensusError> {
debug!(
"Received proposal: height={}, round={}, proposer={:?}",
Expand All @@ -123,23 +123,24 @@ impl SingleHeightConsensus {
.validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;

let block = match block_receiver.await {
Ok(block) => block,
// TODO(guyn): I think we should rename "block" and "fin" to be more descriptive!
let (block, fin) = match block_receiver.await {
Ok((block, fin)) => (block, fin),
// ProposalFin never received from peer.
Err(_) => {
proposal_entry.insert(None);
return self.process_inbound_proposal(context, &init, None).await;
}
};

let fin = match fin_receiver.await {
Ok(fin) => fin,
// ProposalFin never received from peer.
Err(_) => {
proposal_entry.insert(None);
return self.process_inbound_proposal(context, &init, None).await;
}
};
// let fin = match fin_receiver.await {
// Ok(fin) => fin,
// // ProposalFin never received from peer.
// Err(_) => {
// proposal_entry.insert(None);
// return self.process_inbound_proposal(context, &init, None).await;
// }
// };
// TODO(matan): Switch to signature validation.
if block != fin {
proposal_entry.insert(None);
Expand Down
12 changes: 7 additions & 5 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use papyrus_network::network_manager::{
GenericReceiver,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, Vote};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, Vote};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
Expand Down Expand Up @@ -63,14 +63,15 @@ pub trait ConsensusContext {
/// - `content`: A receiver for the stream of the block's content.
///
/// Returns:
/// - A receiver for the block id. If a valid block cannot be built the Sender will be dropped
/// by ConsensusContext.
/// - A receiver for a tuple with two block ids, one calculated by the context, one sent from
/// the network. If a valid block cannot be built the Sender will be dropped by
/// ConsensusContext.
async fn validate_proposal(
&mut self,
height: BlockNumber,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalChunk>,
) -> oneshot::Receiver<ProposalContentId>;
content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;

/// This function is called by consensus to retrieve the content of a previously built or
/// validated proposal. It broadcasts the proposal to the network.
Expand Down Expand Up @@ -118,6 +119,7 @@ impl Debug for Decision {
}
}

// TODO(guyn): The same struct is defined in papyrus_protobus/src/consensus.rs...
#[derive(PartialEq, Debug, Default, Clone)]
pub struct ProposalInit {
pub height: BlockNumber,
Expand Down
Loading

0 comments on commit 93d5db2

Please sign in to comment.