Skip to content

Commit

Permalink
feat: integrate streaming with consensus proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 11, 2024
1 parent 1097d48 commit e0de25f
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 190 deletions.
33 changes: 31 additions & 2 deletions crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,42 @@ impl ConsensusManager {
let mut network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);
let proposals_broadcast_channels = network_manager
.register_broadcast_topic::<ProposalPart>(
.register_broadcast_topic::<StreamMessage<ProposalPart>>(
Topic::new(NETWORK_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_proposal_channels,
broadcast_topic_client: outbound_proposal_channels,
} = proposals_broadcast_channels;

// The inbound proposals come into StreamHandler via inbound_proposal_channels,
// and are forwarded to the consensus via inbound_proposal_receiver.
let (inbound_proposal_sender, mut inbound_proposal_receiver) = mpsc::channel(10);
// The outbound proposal messages that context would like to send:
// 1. Sent into outbound_proposal_sender as tuples of (u64, Receiver)
// 2. Ingested by StreamHandler by its outbound_proposal_receiver.
// 3. Boadcast by the sender using the broadcast_topic_client.
let (outbound_proposal_sender, outbound_proposal_receiver): (
Sender<(StreamId, Receiver<ContextT::ProposalPart>)>,
Receiver<(StreamId, Receiver<ContextT::ProposalPart>)>,
) = mpsc::channel(10);

let mut stream_handler = StreamHandler::<ContextT::ProposalPart>::new(
inbound_proposal_sender, // Sender<Receiver<T>>,
inbound_proposal_channels, // BroadcastTopicServer<StreamMessage<ProposalPart>>,
outbound_proposal_receiver, // Receiver<(StreamId, Receiver<T>)>,
outbound_proposal_channels, // BroadcastTopicClient<StreamMessage<ProposalPart>>
);
tokio::spawn(async move {
stream_handler.run().await;
});

let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
proposals_broadcast_channels.broadcast_topic_client.clone(),
outbound_proposal_sender,
// proposals_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
);

Expand All @@ -57,6 +85,7 @@ impl ConsensusManager {
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
create_fake_network_channels(),
inbound_proposal_receiver,
futures::stream::pending(),
);

Expand Down
123 changes: 72 additions & 51 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
// use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::Transaction;

use crate::converters::ProtobufConversionError;
Expand Down Expand Up @@ -34,7 +33,7 @@ pub struct Vote {

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Proposal(Proposal), // To be deprecated
Vote(Vote),
}

Expand Down Expand Up @@ -99,6 +98,28 @@ pub enum ProposalPart {
Fin(ProposalFin),
}

impl TryInto<ProposalInit> for ProposalPart {
type Error = ProtobufConversionError;

fn try_into(self: ProposalPart) -> Result<ProposalInit, Self::Error> {
match self {
ProposalPart::Init(init) => Ok(init),
_ => Err(ProtobufConversionError::WrongEnumVariant {
type_description: "ProposalPart",
value_as_str: format!("{:?}", self),
expected: "Init",
got: "Transactions or Fin",
}),
}
}
}

impl From<ProposalInit> for ProposalPart {
fn from(value: ProposalInit) -> Self {
ProposalPart::Init(value)
}
}

impl<T> std::fmt::Display for StreamMessage<T>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
Expand All @@ -125,51 +146,51 @@ where
}

// TODO(Guy): Remove after implementing broadcast streams.
#[allow(missing_docs)]
pub struct ProposalWrapper(pub Proposal);

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}
// #[allow(missing_docs)]
// pub struct ProposalWrapper(pub Proposal);

// impl From<ProposalWrapper>
// for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
// {
// fn from(val: ProposalWrapper) -> Self {
// let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
// let proposal_init = ProposalInit {
// height: BlockNumber(val.0.height),
// round: val.0.round,
// proposer: val.0.proposer,
// valid_round: val.0.valid_round,
// };
// let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
// for tx in transactions {
// content_sender.try_send(tx).expect("Send should succeed");
// }
// content_sender.close_channel();

// let (fin_sender, fin_receiver) = oneshot::channel();
// fin_sender.send(val.0.block_hash).expect("Send should succeed");

// (proposal_init, content_receiver, fin_receiver)
// }
// }

// impl From<ProposalWrapper>
// for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
// {
// fn from(val: ProposalWrapper) -> Self {
// let proposal_init = ProposalInit {
// height: BlockNumber(val.0.height),
// round: val.0.round,
// proposer: val.0.proposer,
// valid_round: val.0.valid_round,
// };

// let (_, content_receiver) = mpsc::channel(0);
// // This should only be used for Milestone 1, and then removed once streaming is
// supported. println!("Cannot build ExecutableTransaction from Transaction.");

// let (fin_sender, fin_receiver) = oneshot::channel();
// fin_sender.send(val.0.block_hash).expect("Send should succeed");

// (proposal_init, content_receiver, fin_receiver)
// }
// }
7 changes: 7 additions & 0 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub enum ProtobufConversionError {
MissingField { field_description: &'static str },
#[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")]
BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec<u8> },
#[error("Type `{type_description}` got unexpected enum variant {value_as_str}")]
WrongEnumVariant {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
got: &'static str,
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
/// For CompressionError and serde_json::Error we put the string of the error instead of the
Expand Down
92 changes: 57 additions & 35 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ mod manager_test;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
Expand All @@ -37,13 +37,12 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
mut inbound_proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
ContextT: ConsensusContext + 'static,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!(
"Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}",
Expand All @@ -57,11 +56,17 @@ where
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new(validator_id, timeouts);

#[allow(clippy::as_conversions)] // FIXME: use int metrics so `as f64` may be removed.
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels);
let run_height = manager.run_height(
&mut context,
current_height,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
Expand Down Expand Up @@ -101,20 +106,13 @@ impl MultiHeightManager {
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError> {
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -140,6 +138,9 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(content_receiver) = proposal_receiver.next() => {
self.handle_proposal(context, height, &mut shc, content_receiver).await?
},
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
Expand All @@ -156,22 +157,41 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
// proposal_init: (BlockNumber, u32, ContractAddress, Option<u32>),
mut content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError> {
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
"Proposal receiver closed".to_string(),
));
};
let proposal_init: ProposalInit = first_part.into().try_into()?;

// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
debug!("Received a proposal for a different height. {:?}", proposal_init);
// if message.height() > height.0 {
// self.cached_messages.entry(message.height()).or_default().push(message);
// }
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
) -> Result<ShcReturn, ConsensusError> {
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
Expand All @@ -184,14 +204,16 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
ConsensusMessage::Proposal(_proposal) => {
// Special case due to fake streaming. TODO(guyn): We can eliminate this option and
// leave handle_message. let (proposal_init, content_receiver,
// fin_receiver) = ProposalWrapper(proposal).into();
// let res = shc
// .handle_proposal(context, proposal_init.into(), content_receiver,
// fin_receiver) .await?;
Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
))
}
_ => {
let res = shc.handle_message(context, message).await?;
Expand Down
Loading

0 comments on commit e0de25f

Please sign in to comment.