From f45c0c1bc5cc89adb481058b8dc840065ad62f61 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 28 Oct 2024 15:54:38 +0200 Subject: [PATCH] fix: remove epoch manager calls from consensus gossipsub (#1190) Description --- fix: remove epoch manager calls from consensus gossipsub Motivation and Context --- Several epoch manager calls were used for message broadcasts, however, all required info obtained from these calls is already available at the consensus level, this PR removes the epoch manager dependency from consensus gossip only using the epoch manager events receiver. Also directly encode and send the gossip message in the handle instead of first routing the message through the consensus gossip service and then to networking. The consensus gossip service is only responsible for ensuring that the node is subscribed to the correct shard group topic. Increased gossipsub message limit to 1MiB to account for large foreign proposal messages which were rejected when testing. This is temporary and improvements to the protocol should allow us to reduce this. How Has This Been Tested? --- Manually What process can a PR reviewer use to test or verify this change? --- Slight and certainly unobservable performance improvements, this is a simplification PR Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --- applications/tari_indexer/src/lib.rs | 7 +- .../src/process_manager/manager.rs | 88 ++++++++---- .../tari_validator_node/src/bootstrap.rs | 12 +- .../tari_validator_node/src/dan_node.rs | 2 +- .../p2p/services/consensus_gossip/error.rs | 10 +- .../p2p/services/consensus_gossip/handle.rs | 72 +++++----- .../services/consensus_gossip/initializer.rs | 25 ++-- .../src/p2p/services/consensus_gossip/mod.rs | 2 +- .../p2p/services/consensus_gossip/service.rs | 125 +++--------------- .../src/p2p/services/mempool/service.rs | 2 +- .../src/p2p/services/messaging/outbound.rs | 12 +- .../consensus/src/hotstuff/on_propose.rs | 21 +-- .../src/hotstuff/on_receive_local_proposal.rs | 3 + .../src/hotstuff/state_machine/idle.rs | 2 +- dan_layer/consensus/src/hotstuff/worker.rs | 2 +- .../src/support/epoch_manager.rs | 4 +- .../base_layer/base_layer_epoch_manager.rs | 1 + .../src/base_layer/epoch_manager_service.rs | 7 +- .../epoch_manager/src/base_layer/handle.rs | 17 ++- .../src/base_layer/initializer.rs | 9 +- .../epoch_manager/src/base_layer/types.rs | 7 +- dan_layer/epoch_manager/src/event.rs | 8 +- dan_layer/epoch_manager/src/traits.rs | 2 +- networking/swarm/src/config.rs | 5 +- 24 files changed, 182 insertions(+), 263 deletions(-) diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index e54e8b2ec..8bec9cb34 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -96,12 +96,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow ) .await?; - let mut epoch_manager_events = services.epoch_manager.subscribe().await.map_err(|e| { - ExitError::new( - ExitCode::ConfigError, - format!("Epoch manager crashed on startup: {}", e), - ) - })?; + let mut epoch_manager_events = services.epoch_manager.subscribe(); let substate_cache_dir = config.common.base_path.join("substate_cache"); let substate_cache = SubstateFileCache::new(substate_cache_dir) diff --git a/applications/tari_swarm_daemon/src/process_manager/manager.rs b/applications/tari_swarm_daemon/src/process_manager/manager.rs index 19edfa0d6..cfd1d7101 100644 --- a/applications/tari_swarm_daemon/src/process_manager/manager.rs +++ b/applications/tari_swarm_daemon/src/process_manager/manager.rs @@ -1,7 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + path::PathBuf, + str::FromStr, + time::Duration, +}; use anyhow::{anyhow, Context}; use log::info; @@ -70,43 +76,57 @@ impl ProcessManager { sleep(Duration::from_secs(self.instance_manager.num_instances() as u64)).await; self.check_instances_running()?; - if !self.skip_registration { - let num_vns = self.instance_manager.num_validator_nodes(); - // Mine some initial funds, guessing 10 blocks to allow for coinbase maturity - self.mine(num_vns + 10).await.context("mining failed")?; - self.wait_for_wallet_funds(num_vns) - .await - .context("waiting for wallet funds")?; - - self.register_all_validator_nodes() - .await - .context("registering validator node via GRPC")?; - } - + let mut templates_to_register = vec![]; if !self.disable_template_auto_register { let registered_templates = self.registered_templates().await?; - let registered_template_names: Vec = registered_templates + let registered_template_names = registered_templates .iter() - .map(|template_data| format!("{}-{}", template_data.name, template_data.version)) - .collect(); + .map(|template_data| template_data.name.as_str()) + .collect::>(); let fs_templates = self.file_system_templates().await?; - for template_data in fs_templates.iter().filter(|fs_template_data| { - !registered_template_names.contains(&format!("{}-{}", fs_template_data.name, fs_template_data.version)) - }) { + for template_data in fs_templates + .iter() + .filter(|fs_template_data| !registered_template_names.contains(fs_template_data.name.as_str())) + { info!( "🟡 Register missing template from local file system: {}", template_data.name ); - self.register_template(TemplateData { + templates_to_register.push(TemplateData { name: template_data.name.clone(), version: template_data.version, contents_hash: template_data.contents_hash, contents_url: template_data.contents_url.clone(), - }) - .await?; + }); } } + let num_vns = if self.skip_registration { + 0 + } else { + self.instance_manager.num_validator_nodes() + }; + let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap(); + + // Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity + self.mine(num_blocks + 10).await.context("initial mining failed")?; + self.wait_for_wallet_funds(num_blocks) + .await + .context("waiting for wallet funds")?; + + if !self.skip_registration { + self.register_all_validator_nodes() + .await + .context("registering validator node via GRPC")?; + } + for templates in templates_to_register { + self.register_template(templates).await?; + } + + if num_blocks > 0 { + self.mine(20).await?; + } + Ok(()) } @@ -310,9 +330,15 @@ impl ProcessManager { } }, RegisterTemplate { data, reply } => { - let result = self.register_template(data).await; - if reply.send(result).is_err() { - log::warn!("Request cancelled before response could be sent") + if let Err(err) = self.register_template(data).await { + if reply.send(Err(err)).is_err() { + log::warn!("Request cancelled before response could be sent") + } + } else { + let result = self.mine(10).await; + if reply.send(result).is_err() { + log::warn!("Request cancelled before response could be sent") + } } }, RegisterValidatorNode { instance_id, reply } => { @@ -421,7 +447,6 @@ impl ProcessManager { // inputs for a transaction. sleep(Duration::from_secs(2)).await; } - self.mine(20).await?; Ok(()) } @@ -462,6 +487,9 @@ impl ProcessManager { } async fn mine(&mut self, blocks: u64) -> anyhow::Result<()> { + if blocks == 0 { + return Ok(()); + } let executable = self .executable_manager .get_executable(InstanceType::MinoTariMiner) @@ -510,13 +538,15 @@ impl ProcessManager { .await? .into_inner(); let template_address = TemplateAddress::try_from_vec(resp.template_address).unwrap(); - info!("🟢 Registered template {template_address}. Mining some blocks"); - self.mine(10).await?; + info!("🟢 Registered template {template_address}."); Ok(()) } async fn wait_for_wallet_funds(&mut self, min_expected_blocks: u64) -> anyhow::Result<()> { + if min_expected_blocks == 0 { + return Ok(()); + } // WARN: Assumes one wallet let wallet = self.instance_manager.minotari_wallets().next().ok_or_else(|| { anyhow!("No MinoTariConsoleWallet instances found. Please start a wallet before waiting for funds") diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 7624c7802..78dc90b15 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -76,7 +76,10 @@ use tari_engine_types::{ substate::{SubstateId, SubstateValue}, vault::Vault, }; -use tari_epoch_manager::base_layer::{EpochManagerConfig, EpochManagerHandle}; +use tari_epoch_manager::{ + base_layer::{EpochManagerConfig, EpochManagerHandle}, + EpochManagerReader, +}; use tari_indexer_lib::substate_scanner::SubstateScanner; use tari_networking::{MessagingMode, NetworkingHandle, RelayCircuitLimits, RelayReservationLimits, SwarmConfig}; use tari_rpc_framework::RpcServer; @@ -250,8 +253,11 @@ pub async fn spawn_services( }; // Consensus gossip - let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = - consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages); + let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = consensus_gossip::spawn( + epoch_manager.subscribe(), + networking.clone(), + rx_consensus_gossip_messages, + ); handles.push(join_handle); // Messaging diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 18fb06f77..e57d7206c 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -42,7 +42,7 @@ impl DanNode { pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> { let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events(); - let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?; + let mut epoch_manager_events = self.services.epoch_manager.subscribe(); // if let Err(err) = self.dial_local_shard_peers().await { // error!(target: LOG_TARGET, "Failed to dial local shard peers: {}", err); diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs index 42138c077..d7802590a 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs @@ -22,9 +22,7 @@ use tari_epoch_manager::EpochManagerError; use tari_networking::NetworkingError; -use tokio::sync::{mpsc, oneshot}; - -use super::ConsensusGossipRequest; +use tokio::sync::oneshot; #[derive(thiserror::Error, Debug)] pub enum ConsensusGossipError { @@ -38,12 +36,6 @@ pub enum ConsensusGossipError { NetworkingError(#[from] NetworkingError), } -impl From> for ConsensusGossipError { - fn from(_: mpsc::error::SendError) -> Self { - Self::RequestCancelled - } -} - impl From for ConsensusGossipError { fn from(_: oneshot::error::RecvError) -> Self { Self::RequestCancelled diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs index 21c8dd204..a700c32b9 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs @@ -20,64 +20,56 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use log::*; use tari_consensus::messages::HotstuffMessage; use tari_dan_common_types::ShardGroup; -use tokio::sync::{mpsc, oneshot}; +use tari_dan_p2p::{proto, TariMessagingSpec}; +use tari_networking::{NetworkingHandle, NetworkingService}; +use tari_swarm::messaging::{ + prost::{Message, ProstCodec}, + Codec, +}; use super::ConsensusGossipError; +use crate::p2p::services::consensus_gossip::service::shard_group_to_topic; -pub enum ConsensusGossipRequest { - Multicast { - shard_group: ShardGroup, - message: HotstuffMessage, - reply: oneshot::Sender>, - }, - GetLocalShardGroup { - reply: oneshot::Sender, ConsensusGossipError>>, - }, -} +const LOG_TARGET: &str = "tari::validator_node::consensus_gossip"; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsensusGossipHandle { - tx_consensus_request: mpsc::Sender, -} - -impl Clone for ConsensusGossipHandle { - fn clone(&self) -> Self { - ConsensusGossipHandle { - tx_consensus_request: self.tx_consensus_request.clone(), - } - } + networking: NetworkingHandle, + codec: ProstCodec, } impl ConsensusGossipHandle { - pub(super) fn new(tx_consensus_request: mpsc::Sender) -> Self { - Self { tx_consensus_request } + pub(super) fn new(networking: NetworkingHandle) -> Self { + Self { + networking, + codec: ProstCodec::default(), + } } pub async fn multicast( - &self, + &mut self, shard_group: ShardGroup, message: HotstuffMessage, ) -> Result<(), ConsensusGossipError> { - let (tx, rx) = oneshot::channel(); - self.tx_consensus_request - .send(ConsensusGossipRequest::Multicast { - shard_group, - message, - reply: tx, - }) - .await?; + let topic = shard_group_to_topic(shard_group); - rx.await? - } + let message = proto::consensus::HotStuffMessage::from(&message); + let mut buf = Vec::with_capacity(message.encoded_len()); + + debug!( + target: LOG_TARGET, + "multicast: topic: {} Message size: {}bytes", topic, buf.len() + ); + self.codec + .encode_to(&mut buf, message) + .await + .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; - pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { - let (tx, rx) = oneshot::channel(); - self.tx_consensus_request - .send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx }) - .await?; + self.networking.publish_gossip(topic, buf).await?; - rx.await? + Ok(()) } } diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs index c0056d310..07e0ec77c 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs @@ -22,18 +22,21 @@ use libp2p::{gossipsub, PeerId}; use log::*; -use tari_dan_common_types::PeerAddress; use tari_dan_p2p::{proto, TariMessagingSpec}; -use tari_epoch_manager::base_layer::EpochManagerHandle; +use tari_epoch_manager::EpochManagerEvent; use tari_networking::NetworkingHandle; -use tokio::{sync::mpsc, task, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task, + task::JoinHandle, +}; use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle}; -const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; +const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::initializer"; pub fn spawn( - epoch_manager: EpochManagerHandle, + epoch_manager_events: broadcast::Receiver, networking: NetworkingHandle, rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, ) -> ( @@ -41,17 +44,11 @@ pub fn spawn( JoinHandle>, mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>, ) { - let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10); let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10); - let consensus_gossip = ConsensusGossipService::new( - rx_consensus_request, - epoch_manager, - networking, - rx_gossip, - tx_consensus_gossip, - ); - let handle = ConsensusGossipHandle::new(tx_consensus_request); + let consensus_gossip = + ConsensusGossipService::new(epoch_manager_events, networking.clone(), rx_gossip, tx_consensus_gossip); + let handle = ConsensusGossipHandle::new(networking); let join_handle = task::spawn(consensus_gossip.run()); debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle); diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs index 85d886082..bb5f405bf 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs @@ -24,7 +24,7 @@ mod error; pub use error::*; mod handle; -pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest}; +pub use handle::ConsensusGossipHandle; mod initializer; pub use initializer::spawn; diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs index 4ce900e3d..fa492738a 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs @@ -20,28 +20,24 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::fmt::Display; - use libp2p::{gossipsub, PeerId}; use log::*; -use tari_consensus::messages::HotstuffMessage; -use tari_dan_common_types::{Epoch, PeerAddress, ShardGroup}; +use tari_dan_common_types::ShardGroup; use tari_dan_p2p::{proto, TariMessagingSpec}; -use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerEvent, EpochManagerReader}; +use tari_epoch_manager::EpochManagerEvent; use tari_networking::{NetworkingHandle, NetworkingService}; use tari_swarm::messaging::{prost::ProstCodec, Codec}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc}; -use super::{ConsensusGossipError, ConsensusGossipRequest}; +use super::ConsensusGossipError; const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::service"; pub const TOPIC_PREFIX: &str = "consensus"; #[derive(Debug)] -pub(super) struct ConsensusGossipService { - requests: mpsc::Receiver, - epoch_manager: EpochManagerHandle, +pub(super) struct ConsensusGossipService { + epoch_manager_events: broadcast::Receiver, is_subscribed: Option, networking: NetworkingHandle, codec: ProstCodec, @@ -49,17 +45,15 @@ pub(super) struct ConsensusGossipService { tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, } -impl ConsensusGossipService { +impl ConsensusGossipService { pub fn new( - requests: mpsc::Receiver, - epoch_manager: EpochManagerHandle, + epoch_manager_events: broadcast::Receiver, networking: NetworkingHandle, rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, ) -> Self { Self { - requests, - epoch_manager, + epoch_manager_events, is_subscribed: None, networking, codec: ProstCodec::default(), @@ -69,24 +63,16 @@ impl ConsensusGossipService { } pub async fn run(mut self) -> anyhow::Result<()> { - let mut events = self.epoch_manager.subscribe().await?; - loop { tokio::select! { - Some(req) = self.requests.recv() => self.handle_request(req).await, Some(msg) = self.rx_gossip.recv() => { if let Err(err) = self.handle_incoming_gossip_message(msg).await { warn!(target: LOG_TARGET, "Consensus gossip service error: {}", err); } }, - Ok(event) = events.recv() => { - if let EpochManagerEvent::EpochChanged(epoch) = event { - if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await?{ - info!(target: LOG_TARGET, "Consensus gossip service subscribing messages for epoch {}", epoch); - self.subscribe(epoch).await?; - - // TODO: unsubscribe older epoch shards? - } + Ok(event) = self.epoch_manager_events.recv() => { + if let EpochManagerEvent::ThisValidatorIsRegistered{shard_group, ..} = event { + self.subscribe(shard_group).await?; } }, else => { @@ -101,21 +87,6 @@ impl ConsensusGossipService { Ok(()) } - async fn handle_request(&mut self, request: ConsensusGossipRequest) { - match request { - ConsensusGossipRequest::Multicast { - shard_group, - message, - reply, - } => { - handle(reply, self.multicast(shard_group, message).await); - }, - ConsensusGossipRequest::GetLocalShardGroup { reply } => { - handle(reply, self.get_local_shard_group().await); - }, - } - } - async fn handle_incoming_gossip_message( &mut self, msg: (PeerId, gossipsub::Message), @@ -136,12 +107,10 @@ impl ConsensusGossipService { Ok(()) } - async fn subscribe(&mut self, epoch: Epoch) -> Result<(), ConsensusGossipError> { - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - let shard_group = committee_shard.shard_group(); - + async fn subscribe(&mut self, shard_group: ShardGroup) -> Result<(), ConsensusGossipError> { match self.is_subscribed { Some(sg) if sg == shard_group => { + debug!(target: LOG_TARGET, "🌬️ Consensus gossip already subscribed to messages for {shard_group}"); return Ok(()); }, Some(_) => { @@ -150,9 +119,10 @@ impl ConsensusGossipService { None => {}, } + info!(target: LOG_TARGET, "🌬️ Consensus gossip service subscribing messages for {shard_group}"); let topic = shard_group_to_topic(shard_group); self.networking.subscribe_topic(topic).await?; - self.is_subscribed = Some(committee_shard.shard_group()); + self.is_subscribed = Some(shard_group); Ok(()) } @@ -166,61 +136,9 @@ impl ConsensusGossipService { Ok(()) } - - pub async fn multicast( - &mut self, - shard_group: ShardGroup, - message: HotstuffMessage, - ) -> Result<(), ConsensusGossipError> { - // if we are alone in the local shard group, no need to broadcast - if self.num_shard_group_members().await? < 2 { - return Ok(()); - } - - let topic = shard_group_to_topic(shard_group); - - debug!( - target: LOG_TARGET, - "multicast: topic: {}", topic, - ); - - let message = proto::consensus::HotStuffMessage::from(&message); - let mut buf = Vec::with_capacity(1024); - self.codec - .encode_to(&mut buf, message) - .await - .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; - - self.networking.publish_gossip(topic, buf).await?; - - Ok(()) - } - - async fn num_shard_group_members(&self) -> Result { - let epoch = self.epoch_manager.current_epoch().await?; - - if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - return Ok(committee_shard.num_shard_group_members()); - } - - // default value if the VN is not registered - Ok(0) - } - - pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { - let epoch = self.epoch_manager.current_epoch().await?; - - if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { - return Ok(None); - } - - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - Ok(Some(committee_shard.shard_group())) - } } -fn shard_group_to_topic(shard_group: ShardGroup) -> String { +pub(super) fn shard_group_to_topic(shard_group: ShardGroup) -> String { format!( "{}-{}-{}", TOPIC_PREFIX, @@ -228,12 +146,3 @@ fn shard_group_to_topic(shard_group: ShardGroup) -> String { shard_group.end().as_u32() ) } - -fn handle(reply: oneshot::Sender>, result: Result) { - if let Err(ref e) = result { - error!(target: LOG_TARGET, "Request failed with error: {}", e); - } - if reply.send(result).is_err() { - error!(target: LOG_TARGET, "Requester abandoned request"); - } -} diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 345b97fef..a8d5b821f 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -90,7 +90,7 @@ where TValidator: Validator anyhow::Result<()> { - let mut events = self.epoch_manager.subscribe().await?; + let mut events = self.epoch_manager.subscribe(); loop { tokio::select! { diff --git a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs index 1251b0c9a..f822acd02 100644 --- a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs +++ b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs @@ -80,7 +80,7 @@ impl tari_consensus::traits::OutboundMessaging .map_err(|_| OutboundMessagingError::FailedToEnqueueMessage { reason: "loopback sender closed".to_string(), })?; - return Ok(()); + Ok(()) } async fn send + Send>( @@ -111,16 +111,6 @@ impl tari_consensus::traits::OutboundMessaging { let message = message.into(); - // send it once to ourselves - let local_shard_group = self - .consensus_gossip - .get_local_shard_group() - .await - .map_err(OutboundMessagingError::from_error)?; - if local_shard_group == Some(shard_group) { - self.send_self(message.clone()).await?; - } - self.consensus_gossip .multicast(shard_group, message) .await diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 9f68a2659..0811f1705 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -238,17 +238,18 @@ where TConsensusSpec: ConsensusSpec "🌿 Broadcasting local proposal {} to local committee", next_block, ); - + let msg = HotstuffMessage::Proposal(ProposalMessage { + block: next_block, + foreign_proposals, + }); // Broadcast to local and foreign committees - self.outbound_messaging - .multicast( - local_committee_info.shard_group(), - HotstuffMessage::Proposal(ProposalMessage { - block: next_block, - foreign_proposals, - }), - ) - .await?; + self.outbound_messaging.send_self(msg.clone()).await?; + // If we are the only VN in this committee, no need to multicast + if local_committee_info.num_shard_group_members() > 1 { + self.outbound_messaging + .multicast(local_committee_info.shard_group(), msg) + .await?; + } Ok(()) } diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 8e63ff406..5f91f5781 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -914,6 +914,9 @@ async fn broadcast_foreign_proposal_if_required( shard_group, ); + // TODO: This message can be much larger than the default maximum for gossipsub (16KiB) For now, the limit is + // increased. We also need to allow committees that are stuck on LocalAccept/LocalPrepare to request the + // foreign proposal through messaging. outbound_messaging .multicast( shard_group, diff --git a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs index 59a605b74..633e17666 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs @@ -33,7 +33,7 @@ where TSpec: ConsensusSpec context: &mut ConsensusWorkerContext, ) -> Result { // Subscribe before checking if we're registered to eliminate the chance that we miss the epoch event - let mut epoch_events = context.epoch_manager.subscribe().await?; + let mut epoch_events = context.epoch_manager.subscribe(); context.epoch_manager.wait_for_initial_scanning_to_complete().await?; let current_epoch = context.epoch_manager.current_epoch().await?; if self.is_registered_for_epoch(context, current_epoch).await? { diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 4f6abd0ed..cd25d2a7d 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -258,7 +258,7 @@ impl HotstuffWorker { let mut on_force_beat = self.pacemaker.get_on_force_beat(); let mut on_leader_timeout = self.pacemaker.get_on_leader_timeout(); - let mut epoch_manager_events = self.epoch_manager.subscribe().await?; + let mut epoch_manager_events = self.epoch_manager.subscribe(); let mut prev_height = self.pacemaker.current_view().get_height(); let current_epoch = self.pacemaker.current_view().get_epoch(); diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index 74b98c99e..c4a5d133b 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -149,8 +149,8 @@ impl TestEpochManager { impl EpochManagerReader for TestEpochManager { type Addr = TestAddress; - async fn subscribe(&self) -> Result, EpochManagerError> { - Ok(self.tx_epoch_events.subscribe()) + fn subscribe(&self) -> broadcast::Receiver { + self.tx_epoch_events.subscribe() } async fn get_committee_for_substate( diff --git a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs index e8c029225..2c75d37d7 100644 --- a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs +++ b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs @@ -152,6 +152,7 @@ impl if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) { self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered { epoch, + shard_group: vn.shard_key.to_shard_group(self.config.num_preshards, num_committees), shard_key: vn.shard_key, }); } diff --git a/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs b/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs index 99949ac6d..84c474c41 100644 --- a/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs +++ b/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs @@ -47,7 +47,6 @@ const LOG_TARGET: &str = "tari::validator_node::epoch_manager"; pub struct EpochManagerService { rx_request: Receiver>, inner: BaseLayerEpochManager, - events: broadcast::Sender, } impl @@ -55,6 +54,7 @@ impl { pub fn spawn( config: EpochManagerConfig, + events: broadcast::Sender, rx_request: Receiver>, shutdown: ShutdownSignal, global_db: GlobalDb>, @@ -62,11 +62,9 @@ impl node_public_key: PublicKey, ) -> JoinHandle> { tokio::spawn(async move { - let (tx, _) = broadcast::channel(100); EpochManagerService { rx_request, - inner: BaseLayerEpochManager::new(config, global_db, base_node_client, tx.clone(), node_public_key), - events: tx, + inner: BaseLayerEpochManager::new(config, global_db, base_node_client, events, node_public_key), } .run(shutdown) .await?; @@ -191,7 +189,6 @@ impl context, ); }, - EpochManagerRequest::Subscribe { reply } => handle(reply, Ok(self.events.subscribe()), context), EpochManagerRequest::GetValidatorNodesPerEpoch { epoch, reply } => { handle(reply, self.inner.get_validator_nodes_per_epoch(epoch), context) }, diff --git a/dan_layer/epoch_manager/src/base_layer/handle.rs b/dan_layer/epoch_manager/src/base_layer/handle.rs index 21b11ec68..5717ca78d 100644 --- a/dan_layer/epoch_manager/src/base_layer/handle.rs +++ b/dan_layer/epoch_manager/src/base_layer/handle.rs @@ -27,11 +27,15 @@ use crate::{ #[derive(Clone, Debug)] pub struct EpochManagerHandle { tx_request: mpsc::Sender>, + events: broadcast::Sender, } impl EpochManagerHandle { - pub fn new(tx_request: mpsc::Sender>) -> Self { - Self { tx_request } + pub fn new( + tx_request: mpsc::Sender>, + events: broadcast::Sender, + ) -> Self { + Self { tx_request, events } } pub async fn add_block_hash(&self, block_height: u64, block_hash: FixedHash) -> Result<(), EpochManagerError> { @@ -175,13 +179,8 @@ impl EpochManagerHandle { impl EpochManagerReader for EpochManagerHandle { type Addr = TAddr; - async fn subscribe(&self) -> Result, EpochManagerError> { - let (tx, rx) = oneshot::channel(); - self.tx_request - .send(EpochManagerRequest::Subscribe { reply: tx }) - .await - .map_err(|_| EpochManagerError::SendError)?; - rx.await.map_err(|_| EpochManagerError::ReceiveError)? + fn subscribe(&self) -> broadcast::Receiver { + self.events.subscribe() } async fn wait_for_initial_scanning_to_complete(&self) -> Result<(), EpochManagerError> { diff --git a/dan_layer/epoch_manager/src/base_layer/initializer.rs b/dan_layer/epoch_manager/src/base_layer/initializer.rs index a47bc3e2e..7df98a863 100644 --- a/dan_layer/epoch_manager/src/base_layer/initializer.rs +++ b/dan_layer/epoch_manager/src/base_layer/initializer.rs @@ -26,7 +26,10 @@ use tari_dan_common_types::{DerivableFromPublicKey, NodeAddressable}; use tari_dan_storage::global::GlobalDb; use tari_dan_storage_sqlite::global::SqliteGlobalDbAdapter; use tari_shutdown::ShutdownSignal; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; use crate::base_layer::{config::EpochManagerConfig, epoch_manager_service::EpochManagerService, EpochManagerHandle}; @@ -38,9 +41,11 @@ pub fn spawn_service( shutdown: ShutdownSignal, ) -> (EpochManagerHandle, JoinHandle>) { let (tx_request, rx_request) = mpsc::channel(10); - let epoch_manager = EpochManagerHandle::new(tx_request); + let (events, _) = broadcast::channel(100); + let epoch_manager = EpochManagerHandle::new(tx_request, events.clone()); let handle = EpochManagerService::spawn( config, + events, rx_request, shutdown, global_db, diff --git a/dan_layer/epoch_manager/src/base_layer/types.rs b/dan_layer/epoch_manager/src/base_layer/types.rs index b11ef3c3a..951c73e0b 100644 --- a/dan_layer/epoch_manager/src/base_layer/types.rs +++ b/dan_layer/epoch_manager/src/base_layer/types.rs @@ -13,9 +13,9 @@ use tari_dan_common_types::{ SubstateAddress, }; use tari_dan_storage::global::models::ValidatorNode; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::oneshot; -use crate::{error::EpochManagerError, EpochManagerEvent}; +use crate::error::EpochManagerError; type Reply = oneshot::Sender>; @@ -93,9 +93,6 @@ pub enum EpochManagerRequest { epoch: Epoch, reply: Reply>>, }, - Subscribe { - reply: Reply>, - }, NotifyScanningComplete { reply: Reply<()>, }, diff --git a/dan_layer/epoch_manager/src/event.rs b/dan_layer/epoch_manager/src/event.rs index d5c8b6e7f..66d61ec93 100644 --- a/dan_layer/epoch_manager/src/event.rs +++ b/dan_layer/epoch_manager/src/event.rs @@ -1,10 +1,14 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_dan_common_types::{Epoch, SubstateAddress}; +use tari_dan_common_types::{Epoch, ShardGroup, SubstateAddress}; #[derive(Debug, Clone)] pub enum EpochManagerEvent { EpochChanged(Epoch), - ThisValidatorIsRegistered { epoch: Epoch, shard_key: SubstateAddress }, + ThisValidatorIsRegistered { + epoch: Epoch, + shard_group: ShardGroup, + shard_key: SubstateAddress, + }, } diff --git a/dan_layer/epoch_manager/src/traits.rs b/dan_layer/epoch_manager/src/traits.rs index b37371868..0c256a6f9 100644 --- a/dan_layer/epoch_manager/src/traits.rs +++ b/dan_layer/epoch_manager/src/traits.rs @@ -40,7 +40,7 @@ use crate::{EpochManagerError, EpochManagerEvent}; pub trait EpochManagerReader: Send + Sync { type Addr: NodeAddressable; - async fn subscribe(&self) -> Result, EpochManagerError>; + fn subscribe(&self) -> broadcast::Receiver; async fn wait_for_initial_scanning_to_complete(&self) -> Result<(), EpochManagerError>; diff --git a/networking/swarm/src/config.rs b/networking/swarm/src/config.rs index 6e7917412..298397583 100644 --- a/networking/swarm/src/config.rs +++ b/networking/swarm/src/config.rs @@ -40,8 +40,9 @@ impl Default for Config { relay_reservation_limits: RelayReservationLimits::default(), // This is the default for identify identify_interval: Duration::from_secs(5 * 60), - // 128k, double the libp2p default - gossip_sub_max_message_size: 128 * 1024, + // 1MiB, 64 times the libp2p default + // TODO: change this to a lower limit when foreign proposal messages are smaller + gossip_sub_max_message_size: 1024 * 1024, } } }