diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index e54e8b2ec..8bec9cb34 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -96,12 +96,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow ) .await?; - let mut epoch_manager_events = services.epoch_manager.subscribe().await.map_err(|e| { - ExitError::new( - ExitCode::ConfigError, - format!("Epoch manager crashed on startup: {}", e), - ) - })?; + let mut epoch_manager_events = services.epoch_manager.subscribe(); let substate_cache_dir = config.common.base_path.join("substate_cache"); let substate_cache = SubstateFileCache::new(substate_cache_dir) diff --git a/applications/tari_swarm_daemon/src/process_manager/manager.rs b/applications/tari_swarm_daemon/src/process_manager/manager.rs index 19edfa0d6..cfd1d7101 100644 --- a/applications/tari_swarm_daemon/src/process_manager/manager.rs +++ b/applications/tari_swarm_daemon/src/process_manager/manager.rs @@ -1,7 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + path::PathBuf, + str::FromStr, + time::Duration, +}; use anyhow::{anyhow, Context}; use log::info; @@ -70,43 +76,57 @@ impl ProcessManager { sleep(Duration::from_secs(self.instance_manager.num_instances() as u64)).await; self.check_instances_running()?; - if !self.skip_registration { - let num_vns = self.instance_manager.num_validator_nodes(); - // Mine some initial funds, guessing 10 blocks to allow for coinbase maturity - self.mine(num_vns + 10).await.context("mining failed")?; - self.wait_for_wallet_funds(num_vns) - .await - .context("waiting for wallet funds")?; - - self.register_all_validator_nodes() - .await - .context("registering validator node via GRPC")?; - } - + let mut templates_to_register = vec![]; if !self.disable_template_auto_register { let registered_templates = self.registered_templates().await?; - let registered_template_names: Vec = registered_templates + let registered_template_names = registered_templates .iter() - .map(|template_data| format!("{}-{}", template_data.name, template_data.version)) - .collect(); + .map(|template_data| template_data.name.as_str()) + .collect::>(); let fs_templates = self.file_system_templates().await?; - for template_data in fs_templates.iter().filter(|fs_template_data| { - !registered_template_names.contains(&format!("{}-{}", fs_template_data.name, fs_template_data.version)) - }) { + for template_data in fs_templates + .iter() + .filter(|fs_template_data| !registered_template_names.contains(fs_template_data.name.as_str())) + { info!( "🟡 Register missing template from local file system: {}", template_data.name ); - self.register_template(TemplateData { + templates_to_register.push(TemplateData { name: template_data.name.clone(), version: template_data.version, contents_hash: template_data.contents_hash, contents_url: template_data.contents_url.clone(), - }) - .await?; + }); } } + let num_vns = if self.skip_registration { + 0 + } else { + self.instance_manager.num_validator_nodes() + }; + let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap(); + + // Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity + self.mine(num_blocks + 10).await.context("initial mining failed")?; + self.wait_for_wallet_funds(num_blocks) + .await + .context("waiting for wallet funds")?; + + if !self.skip_registration { + self.register_all_validator_nodes() + .await + .context("registering validator node via GRPC")?; + } + for templates in templates_to_register { + self.register_template(templates).await?; + } + + if num_blocks > 0 { + self.mine(20).await?; + } + Ok(()) } @@ -310,9 +330,15 @@ impl ProcessManager { } }, RegisterTemplate { data, reply } => { - let result = self.register_template(data).await; - if reply.send(result).is_err() { - log::warn!("Request cancelled before response could be sent") + if let Err(err) = self.register_template(data).await { + if reply.send(Err(err)).is_err() { + log::warn!("Request cancelled before response could be sent") + } + } else { + let result = self.mine(10).await; + if reply.send(result).is_err() { + log::warn!("Request cancelled before response could be sent") + } } }, RegisterValidatorNode { instance_id, reply } => { @@ -421,7 +447,6 @@ impl ProcessManager { // inputs for a transaction. sleep(Duration::from_secs(2)).await; } - self.mine(20).await?; Ok(()) } @@ -462,6 +487,9 @@ impl ProcessManager { } async fn mine(&mut self, blocks: u64) -> anyhow::Result<()> { + if blocks == 0 { + return Ok(()); + } let executable = self .executable_manager .get_executable(InstanceType::MinoTariMiner) @@ -510,13 +538,15 @@ impl ProcessManager { .await? .into_inner(); let template_address = TemplateAddress::try_from_vec(resp.template_address).unwrap(); - info!("🟢 Registered template {template_address}. Mining some blocks"); - self.mine(10).await?; + info!("🟢 Registered template {template_address}."); Ok(()) } async fn wait_for_wallet_funds(&mut self, min_expected_blocks: u64) -> anyhow::Result<()> { + if min_expected_blocks == 0 { + return Ok(()); + } // WARN: Assumes one wallet let wallet = self.instance_manager.minotari_wallets().next().ok_or_else(|| { anyhow!("No MinoTariConsoleWallet instances found. Please start a wallet before waiting for funds") diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 7624c7802..78dc90b15 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -76,7 +76,10 @@ use tari_engine_types::{ substate::{SubstateId, SubstateValue}, vault::Vault, }; -use tari_epoch_manager::base_layer::{EpochManagerConfig, EpochManagerHandle}; +use tari_epoch_manager::{ + base_layer::{EpochManagerConfig, EpochManagerHandle}, + EpochManagerReader, +}; use tari_indexer_lib::substate_scanner::SubstateScanner; use tari_networking::{MessagingMode, NetworkingHandle, RelayCircuitLimits, RelayReservationLimits, SwarmConfig}; use tari_rpc_framework::RpcServer; @@ -250,8 +253,11 @@ pub async fn spawn_services( }; // Consensus gossip - let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = - consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages); + let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = consensus_gossip::spawn( + epoch_manager.subscribe(), + networking.clone(), + rx_consensus_gossip_messages, + ); handles.push(join_handle); // Messaging diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 18fb06f77..e57d7206c 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -42,7 +42,7 @@ impl DanNode { pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> { let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events(); - let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?; + let mut epoch_manager_events = self.services.epoch_manager.subscribe(); // if let Err(err) = self.dial_local_shard_peers().await { // error!(target: LOG_TARGET, "Failed to dial local shard peers: {}", err); diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs index 42138c077..d7802590a 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs @@ -22,9 +22,7 @@ use tari_epoch_manager::EpochManagerError; use tari_networking::NetworkingError; -use tokio::sync::{mpsc, oneshot}; - -use super::ConsensusGossipRequest; +use tokio::sync::oneshot; #[derive(thiserror::Error, Debug)] pub enum ConsensusGossipError { @@ -38,12 +36,6 @@ pub enum ConsensusGossipError { NetworkingError(#[from] NetworkingError), } -impl From> for ConsensusGossipError { - fn from(_: mpsc::error::SendError) -> Self { - Self::RequestCancelled - } -} - impl From for ConsensusGossipError { fn from(_: oneshot::error::RecvError) -> Self { Self::RequestCancelled diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs index 21c8dd204..a700c32b9 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs @@ -20,64 +20,56 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use log::*; use tari_consensus::messages::HotstuffMessage; use tari_dan_common_types::ShardGroup; -use tokio::sync::{mpsc, oneshot}; +use tari_dan_p2p::{proto, TariMessagingSpec}; +use tari_networking::{NetworkingHandle, NetworkingService}; +use tari_swarm::messaging::{ + prost::{Message, ProstCodec}, + Codec, +}; use super::ConsensusGossipError; +use crate::p2p::services::consensus_gossip::service::shard_group_to_topic; -pub enum ConsensusGossipRequest { - Multicast { - shard_group: ShardGroup, - message: HotstuffMessage, - reply: oneshot::Sender>, - }, - GetLocalShardGroup { - reply: oneshot::Sender, ConsensusGossipError>>, - }, -} +const LOG_TARGET: &str = "tari::validator_node::consensus_gossip"; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsensusGossipHandle { - tx_consensus_request: mpsc::Sender, -} - -impl Clone for ConsensusGossipHandle { - fn clone(&self) -> Self { - ConsensusGossipHandle { - tx_consensus_request: self.tx_consensus_request.clone(), - } - } + networking: NetworkingHandle, + codec: ProstCodec, } impl ConsensusGossipHandle { - pub(super) fn new(tx_consensus_request: mpsc::Sender) -> Self { - Self { tx_consensus_request } + pub(super) fn new(networking: NetworkingHandle) -> Self { + Self { + networking, + codec: ProstCodec::default(), + } } pub async fn multicast( - &self, + &mut self, shard_group: ShardGroup, message: HotstuffMessage, ) -> Result<(), ConsensusGossipError> { - let (tx, rx) = oneshot::channel(); - self.tx_consensus_request - .send(ConsensusGossipRequest::Multicast { - shard_group, - message, - reply: tx, - }) - .await?; + let topic = shard_group_to_topic(shard_group); - rx.await? - } + let message = proto::consensus::HotStuffMessage::from(&message); + let mut buf = Vec::with_capacity(message.encoded_len()); + + debug!( + target: LOG_TARGET, + "multicast: topic: {} Message size: {}bytes", topic, buf.len() + ); + self.codec + .encode_to(&mut buf, message) + .await + .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; - pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { - let (tx, rx) = oneshot::channel(); - self.tx_consensus_request - .send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx }) - .await?; + self.networking.publish_gossip(topic, buf).await?; - rx.await? + Ok(()) } } diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs index c0056d310..07e0ec77c 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs @@ -22,18 +22,21 @@ use libp2p::{gossipsub, PeerId}; use log::*; -use tari_dan_common_types::PeerAddress; use tari_dan_p2p::{proto, TariMessagingSpec}; -use tari_epoch_manager::base_layer::EpochManagerHandle; +use tari_epoch_manager::EpochManagerEvent; use tari_networking::NetworkingHandle; -use tokio::{sync::mpsc, task, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task, + task::JoinHandle, +}; use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle}; -const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; +const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::initializer"; pub fn spawn( - epoch_manager: EpochManagerHandle, + epoch_manager_events: broadcast::Receiver, networking: NetworkingHandle, rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, ) -> ( @@ -41,17 +44,11 @@ pub fn spawn( JoinHandle>, mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>, ) { - let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10); let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10); - let consensus_gossip = ConsensusGossipService::new( - rx_consensus_request, - epoch_manager, - networking, - rx_gossip, - tx_consensus_gossip, - ); - let handle = ConsensusGossipHandle::new(tx_consensus_request); + let consensus_gossip = + ConsensusGossipService::new(epoch_manager_events, networking.clone(), rx_gossip, tx_consensus_gossip); + let handle = ConsensusGossipHandle::new(networking); let join_handle = task::spawn(consensus_gossip.run()); debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle); diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs index 85d886082..bb5f405bf 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs @@ -24,7 +24,7 @@ mod error; pub use error::*; mod handle; -pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest}; +pub use handle::ConsensusGossipHandle; mod initializer; pub use initializer::spawn; diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs index 4ce900e3d..f6cfae986 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs @@ -20,28 +20,24 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::fmt::Display; - use libp2p::{gossipsub, PeerId}; use log::*; -use tari_consensus::messages::HotstuffMessage; -use tari_dan_common_types::{Epoch, PeerAddress, ShardGroup}; +use tari_dan_common_types::ShardGroup; use tari_dan_p2p::{proto, TariMessagingSpec}; -use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerEvent, EpochManagerReader}; +use tari_epoch_manager::EpochManagerEvent; use tari_networking::{NetworkingHandle, NetworkingService}; use tari_swarm::messaging::{prost::ProstCodec, Codec}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc}; -use super::{ConsensusGossipError, ConsensusGossipRequest}; +use super::ConsensusGossipError; const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::service"; pub const TOPIC_PREFIX: &str = "consensus"; #[derive(Debug)] -pub(super) struct ConsensusGossipService { - requests: mpsc::Receiver, - epoch_manager: EpochManagerHandle, +pub(super) struct ConsensusGossipService { + epoch_manager_events: broadcast::Receiver, is_subscribed: Option, networking: NetworkingHandle, codec: ProstCodec, @@ -49,17 +45,15 @@ pub(super) struct ConsensusGossipService { tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, } -impl ConsensusGossipService { +impl ConsensusGossipService { pub fn new( - requests: mpsc::Receiver, - epoch_manager: EpochManagerHandle, + epoch_manager_events: broadcast::Receiver, networking: NetworkingHandle, rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, ) -> Self { Self { - requests, - epoch_manager, + epoch_manager_events, is_subscribed: None, networking, codec: ProstCodec::default(), @@ -69,24 +63,17 @@ impl ConsensusGossipService { } pub async fn run(mut self) -> anyhow::Result<()> { - let mut events = self.epoch_manager.subscribe().await?; - loop { tokio::select! { - Some(req) = self.requests.recv() => self.handle_request(req).await, Some(msg) = self.rx_gossip.recv() => { if let Err(err) = self.handle_incoming_gossip_message(msg).await { warn!(target: LOG_TARGET, "Consensus gossip service error: {}", err); } }, - Ok(event) = events.recv() => { - if let EpochManagerEvent::EpochChanged(epoch) = event { - if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await?{ - info!(target: LOG_TARGET, "Consensus gossip service subscribing messages for epoch {}", epoch); - self.subscribe(epoch).await?; - - // TODO: unsubscribe older epoch shards? - } + Ok(event) = self.epoch_manager_events.recv() => { + if let EpochManagerEvent::ThisValidatorIsRegistered{epoch,shard_group, ..} = event { + info!(target: LOG_TARGET, "Consensus gossip service subscribing messages for epoch {}", epoch); + self.subscribe(shard_group).await?; } }, else => { @@ -101,21 +88,6 @@ impl ConsensusGossipService { Ok(()) } - async fn handle_request(&mut self, request: ConsensusGossipRequest) { - match request { - ConsensusGossipRequest::Multicast { - shard_group, - message, - reply, - } => { - handle(reply, self.multicast(shard_group, message).await); - }, - ConsensusGossipRequest::GetLocalShardGroup { reply } => { - handle(reply, self.get_local_shard_group().await); - }, - } - } - async fn handle_incoming_gossip_message( &mut self, msg: (PeerId, gossipsub::Message), @@ -136,10 +108,7 @@ impl ConsensusGossipService { Ok(()) } - async fn subscribe(&mut self, epoch: Epoch) -> Result<(), ConsensusGossipError> { - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - let shard_group = committee_shard.shard_group(); - + async fn subscribe(&mut self, shard_group: ShardGroup) -> Result<(), ConsensusGossipError> { match self.is_subscribed { Some(sg) if sg == shard_group => { return Ok(()); @@ -152,7 +121,7 @@ impl ConsensusGossipService { let topic = shard_group_to_topic(shard_group); self.networking.subscribe_topic(topic).await?; - self.is_subscribed = Some(committee_shard.shard_group()); + self.is_subscribed = Some(shard_group); Ok(()) } @@ -166,61 +135,9 @@ impl ConsensusGossipService { Ok(()) } - - pub async fn multicast( - &mut self, - shard_group: ShardGroup, - message: HotstuffMessage, - ) -> Result<(), ConsensusGossipError> { - // if we are alone in the local shard group, no need to broadcast - if self.num_shard_group_members().await? < 2 { - return Ok(()); - } - - let topic = shard_group_to_topic(shard_group); - - debug!( - target: LOG_TARGET, - "multicast: topic: {}", topic, - ); - - let message = proto::consensus::HotStuffMessage::from(&message); - let mut buf = Vec::with_capacity(1024); - self.codec - .encode_to(&mut buf, message) - .await - .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; - - self.networking.publish_gossip(topic, buf).await?; - - Ok(()) - } - - async fn num_shard_group_members(&self) -> Result { - let epoch = self.epoch_manager.current_epoch().await?; - - if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - return Ok(committee_shard.num_shard_group_members()); - } - - // default value if the VN is not registered - Ok(0) - } - - pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { - let epoch = self.epoch_manager.current_epoch().await?; - - if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { - return Ok(None); - } - - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - Ok(Some(committee_shard.shard_group())) - } } -fn shard_group_to_topic(shard_group: ShardGroup) -> String { +pub(super) fn shard_group_to_topic(shard_group: ShardGroup) -> String { format!( "{}-{}-{}", TOPIC_PREFIX, @@ -228,12 +145,3 @@ fn shard_group_to_topic(shard_group: ShardGroup) -> String { shard_group.end().as_u32() ) } - -fn handle(reply: oneshot::Sender>, result: Result) { - if let Err(ref e) = result { - error!(target: LOG_TARGET, "Request failed with error: {}", e); - } - if reply.send(result).is_err() { - error!(target: LOG_TARGET, "Requester abandoned request"); - } -} diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 345b97fef..a8d5b821f 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -90,7 +90,7 @@ where TValidator: Validator anyhow::Result<()> { - let mut events = self.epoch_manager.subscribe().await?; + let mut events = self.epoch_manager.subscribe(); loop { tokio::select! { diff --git a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs index 1251b0c9a..f822acd02 100644 --- a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs +++ b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs @@ -80,7 +80,7 @@ impl tari_consensus::traits::OutboundMessaging .map_err(|_| OutboundMessagingError::FailedToEnqueueMessage { reason: "loopback sender closed".to_string(), })?; - return Ok(()); + Ok(()) } async fn send + Send>( @@ -111,16 +111,6 @@ impl tari_consensus::traits::OutboundMessaging { let message = message.into(); - // send it once to ourselves - let local_shard_group = self - .consensus_gossip - .get_local_shard_group() - .await - .map_err(OutboundMessagingError::from_error)?; - if local_shard_group == Some(shard_group) { - self.send_self(message.clone()).await?; - } - self.consensus_gossip .multicast(shard_group, message) .await diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 9f68a2659..0811f1705 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -238,17 +238,18 @@ where TConsensusSpec: ConsensusSpec "🌿 Broadcasting local proposal {} to local committee", next_block, ); - + let msg = HotstuffMessage::Proposal(ProposalMessage { + block: next_block, + foreign_proposals, + }); // Broadcast to local and foreign committees - self.outbound_messaging - .multicast( - local_committee_info.shard_group(), - HotstuffMessage::Proposal(ProposalMessage { - block: next_block, - foreign_proposals, - }), - ) - .await?; + self.outbound_messaging.send_self(msg.clone()).await?; + // If we are the only VN in this committee, no need to multicast + if local_committee_info.num_shard_group_members() > 1 { + self.outbound_messaging + .multicast(local_committee_info.shard_group(), msg) + .await?; + } Ok(()) } diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 8e63ff406..5f91f5781 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -914,6 +914,9 @@ async fn broadcast_foreign_proposal_if_required( shard_group, ); + // TODO: This message can be much larger than the default maximum for gossipsub (16KiB) For now, the limit is + // increased. We also need to allow committees that are stuck on LocalAccept/LocalPrepare to request the + // foreign proposal through messaging. outbound_messaging .multicast( shard_group, diff --git a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs index 59a605b74..633e17666 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs @@ -33,7 +33,7 @@ where TSpec: ConsensusSpec context: &mut ConsensusWorkerContext, ) -> Result { // Subscribe before checking if we're registered to eliminate the chance that we miss the epoch event - let mut epoch_events = context.epoch_manager.subscribe().await?; + let mut epoch_events = context.epoch_manager.subscribe(); context.epoch_manager.wait_for_initial_scanning_to_complete().await?; let current_epoch = context.epoch_manager.current_epoch().await?; if self.is_registered_for_epoch(context, current_epoch).await? { diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 4f6abd0ed..cd25d2a7d 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -258,7 +258,7 @@ impl HotstuffWorker { let mut on_force_beat = self.pacemaker.get_on_force_beat(); let mut on_leader_timeout = self.pacemaker.get_on_leader_timeout(); - let mut epoch_manager_events = self.epoch_manager.subscribe().await?; + let mut epoch_manager_events = self.epoch_manager.subscribe(); let mut prev_height = self.pacemaker.current_view().get_height(); let current_epoch = self.pacemaker.current_view().get_epoch(); diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index 74b98c99e..c4a5d133b 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -149,8 +149,8 @@ impl TestEpochManager { impl EpochManagerReader for TestEpochManager { type Addr = TestAddress; - async fn subscribe(&self) -> Result, EpochManagerError> { - Ok(self.tx_epoch_events.subscribe()) + fn subscribe(&self) -> broadcast::Receiver { + self.tx_epoch_events.subscribe() } async fn get_committee_for_substate( diff --git a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs index e8c029225..2c75d37d7 100644 --- a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs +++ b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs @@ -152,6 +152,7 @@ impl if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) { self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered { epoch, + shard_group: vn.shard_key.to_shard_group(self.config.num_preshards, num_committees), shard_key: vn.shard_key, }); } diff --git a/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs b/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs index 99949ac6d..84c474c41 100644 --- a/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs +++ b/dan_layer/epoch_manager/src/base_layer/epoch_manager_service.rs @@ -47,7 +47,6 @@ const LOG_TARGET: &str = "tari::validator_node::epoch_manager"; pub struct EpochManagerService { rx_request: Receiver>, inner: BaseLayerEpochManager, - events: broadcast::Sender, } impl @@ -55,6 +54,7 @@ impl { pub fn spawn( config: EpochManagerConfig, + events: broadcast::Sender, rx_request: Receiver>, shutdown: ShutdownSignal, global_db: GlobalDb>, @@ -62,11 +62,9 @@ impl node_public_key: PublicKey, ) -> JoinHandle> { tokio::spawn(async move { - let (tx, _) = broadcast::channel(100); EpochManagerService { rx_request, - inner: BaseLayerEpochManager::new(config, global_db, base_node_client, tx.clone(), node_public_key), - events: tx, + inner: BaseLayerEpochManager::new(config, global_db, base_node_client, events, node_public_key), } .run(shutdown) .await?; @@ -191,7 +189,6 @@ impl context, ); }, - EpochManagerRequest::Subscribe { reply } => handle(reply, Ok(self.events.subscribe()), context), EpochManagerRequest::GetValidatorNodesPerEpoch { epoch, reply } => { handle(reply, self.inner.get_validator_nodes_per_epoch(epoch), context) }, diff --git a/dan_layer/epoch_manager/src/base_layer/handle.rs b/dan_layer/epoch_manager/src/base_layer/handle.rs index 21b11ec68..5717ca78d 100644 --- a/dan_layer/epoch_manager/src/base_layer/handle.rs +++ b/dan_layer/epoch_manager/src/base_layer/handle.rs @@ -27,11 +27,15 @@ use crate::{ #[derive(Clone, Debug)] pub struct EpochManagerHandle { tx_request: mpsc::Sender>, + events: broadcast::Sender, } impl EpochManagerHandle { - pub fn new(tx_request: mpsc::Sender>) -> Self { - Self { tx_request } + pub fn new( + tx_request: mpsc::Sender>, + events: broadcast::Sender, + ) -> Self { + Self { tx_request, events } } pub async fn add_block_hash(&self, block_height: u64, block_hash: FixedHash) -> Result<(), EpochManagerError> { @@ -175,13 +179,8 @@ impl EpochManagerHandle { impl EpochManagerReader for EpochManagerHandle { type Addr = TAddr; - async fn subscribe(&self) -> Result, EpochManagerError> { - let (tx, rx) = oneshot::channel(); - self.tx_request - .send(EpochManagerRequest::Subscribe { reply: tx }) - .await - .map_err(|_| EpochManagerError::SendError)?; - rx.await.map_err(|_| EpochManagerError::ReceiveError)? + fn subscribe(&self) -> broadcast::Receiver { + self.events.subscribe() } async fn wait_for_initial_scanning_to_complete(&self) -> Result<(), EpochManagerError> { diff --git a/dan_layer/epoch_manager/src/base_layer/initializer.rs b/dan_layer/epoch_manager/src/base_layer/initializer.rs index a47bc3e2e..7df98a863 100644 --- a/dan_layer/epoch_manager/src/base_layer/initializer.rs +++ b/dan_layer/epoch_manager/src/base_layer/initializer.rs @@ -26,7 +26,10 @@ use tari_dan_common_types::{DerivableFromPublicKey, NodeAddressable}; use tari_dan_storage::global::GlobalDb; use tari_dan_storage_sqlite::global::SqliteGlobalDbAdapter; use tari_shutdown::ShutdownSignal; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; use crate::base_layer::{config::EpochManagerConfig, epoch_manager_service::EpochManagerService, EpochManagerHandle}; @@ -38,9 +41,11 @@ pub fn spawn_service( shutdown: ShutdownSignal, ) -> (EpochManagerHandle, JoinHandle>) { let (tx_request, rx_request) = mpsc::channel(10); - let epoch_manager = EpochManagerHandle::new(tx_request); + let (events, _) = broadcast::channel(100); + let epoch_manager = EpochManagerHandle::new(tx_request, events.clone()); let handle = EpochManagerService::spawn( config, + events, rx_request, shutdown, global_db, diff --git a/dan_layer/epoch_manager/src/base_layer/types.rs b/dan_layer/epoch_manager/src/base_layer/types.rs index b11ef3c3a..951c73e0b 100644 --- a/dan_layer/epoch_manager/src/base_layer/types.rs +++ b/dan_layer/epoch_manager/src/base_layer/types.rs @@ -13,9 +13,9 @@ use tari_dan_common_types::{ SubstateAddress, }; use tari_dan_storage::global::models::ValidatorNode; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::oneshot; -use crate::{error::EpochManagerError, EpochManagerEvent}; +use crate::error::EpochManagerError; type Reply = oneshot::Sender>; @@ -93,9 +93,6 @@ pub enum EpochManagerRequest { epoch: Epoch, reply: Reply>>, }, - Subscribe { - reply: Reply>, - }, NotifyScanningComplete { reply: Reply<()>, }, diff --git a/dan_layer/epoch_manager/src/event.rs b/dan_layer/epoch_manager/src/event.rs index d5c8b6e7f..66d61ec93 100644 --- a/dan_layer/epoch_manager/src/event.rs +++ b/dan_layer/epoch_manager/src/event.rs @@ -1,10 +1,14 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_dan_common_types::{Epoch, SubstateAddress}; +use tari_dan_common_types::{Epoch, ShardGroup, SubstateAddress}; #[derive(Debug, Clone)] pub enum EpochManagerEvent { EpochChanged(Epoch), - ThisValidatorIsRegistered { epoch: Epoch, shard_key: SubstateAddress }, + ThisValidatorIsRegistered { + epoch: Epoch, + shard_group: ShardGroup, + shard_key: SubstateAddress, + }, } diff --git a/dan_layer/epoch_manager/src/traits.rs b/dan_layer/epoch_manager/src/traits.rs index b37371868..0c256a6f9 100644 --- a/dan_layer/epoch_manager/src/traits.rs +++ b/dan_layer/epoch_manager/src/traits.rs @@ -40,7 +40,7 @@ use crate::{EpochManagerError, EpochManagerEvent}; pub trait EpochManagerReader: Send + Sync { type Addr: NodeAddressable; - async fn subscribe(&self) -> Result, EpochManagerError>; + fn subscribe(&self) -> broadcast::Receiver; async fn wait_for_initial_scanning_to_complete(&self) -> Result<(), EpochManagerError>; diff --git a/networking/swarm/src/config.rs b/networking/swarm/src/config.rs index 6e7917412..298397583 100644 --- a/networking/swarm/src/config.rs +++ b/networking/swarm/src/config.rs @@ -40,8 +40,9 @@ impl Default for Config { relay_reservation_limits: RelayReservationLimits::default(), // This is the default for identify identify_interval: Duration::from_secs(5 * 60), - // 128k, double the libp2p default - gossip_sub_max_message_size: 128 * 1024, + // 1MiB, 64 times the libp2p default + // TODO: change this to a lower limit when foreign proposal messages are smaller + gossip_sub_max_message_size: 1024 * 1024, } } }