From 552e23ff1fa558d40943cd5b18fba4a1cab7edfc Mon Sep 17 00:00:00 2001 From: Miguel Naveira <47919901+mrnaveira@users.noreply.github.com> Date: Wed, 9 Oct 2024 20:35:47 +0100 Subject: [PATCH] feat!: use gossipsub for consensus broadcasts (#1156) Description --- * Both the mempool and HotStuff send broadcast messages using the same gossipsub infrastructure. * The networking layer allows to specify a map (`tx_gossip_messages_by_topic`) that maps topic prefixes to a channel. On startup we register a channel for mempool (topic prefix `transactions`) and another for consensus (topic prefix `consensus`). Then, on gossip message received, the networking layer simply relays it to the appropriate channel based on the topic prefix. This design allows us to easily use gossip for other purposes in the future. * Message encoding and decoding is done in each service (mempool or consensus) separately. * New `ConsensusGossipService` in the Validator node, that listens for epoch events and subscribes to the appropriate gossip topics. It also does message encoding/decoding. * Updated the `MempoolGossip` module to adapt it to the new gossip design, by implementing message encoding/decoding and to receive the messages from networking. * The consensus layer is independent of how communication is done. * `ConsensusInboundMessaging` now also listens to consensus messages coming from the new `ConsensusGossipService` . * `ConsensusOutboundMessaging` uses the new `ConsensusGossipService` for broadcasting. * The `OutboundMessaging` trait for the `multicast` function now expects a `ShardGroup` instead of a committee. Motivation and Context --- Hotstuff and cerberus are message based protocols. Currently we implement a message protocol that requires nodes to connect to every other node in the local shard. For cross shard messaging, we implement a strategy that limits the number of messages sent but relies on multiple connections per peer across shards. We want to leverage libp2p's gossipsub for all consensus broadcasts to local/foreign shards. * Each shard subscribes to their topic `consensus-{start}-{end}` (`start` and `end` are the start/end shards in the `ShardGroup` type, similar to the mempool service) * Ambient peer discovery required by gossipsub is already performed by the Tari-implemented peer sync protocol and L1 registrations How Has This Been Tested? --- Manually by starting a local network using `tari_spawn`, performing transactions and inspecting the logs. What process can a PR reviewer use to test or verify this change? --- See previous section Breaking Changes --- - [ ] None - [ ] Requires data directory to be deleted - [x] Other - Requires network reset, as multicast communications between VNs are now done via gossip --- Cargo.lock | 1 + applications/tari_validator_node/Cargo.toml | 1 + .../tari_validator_node/src/bootstrap.rs | 38 ++- .../p2p/services/consensus_gossip/error.rs | 51 ++++ .../p2p/services/consensus_gossip/handle.rs | 83 ++++++ .../services/consensus_gossip/initializer.rs | 60 +++++ .../src/p2p/services/consensus_gossip/mod.rs | 33 +++ .../p2p/services/consensus_gossip/service.rs | 239 ++++++++++++++++++ .../src/p2p/services/mempool/error.rs | 1 - .../src/p2p/services/mempool/gossip.rs | 88 +++++-- .../src/p2p/services/mempool/initializer.rs | 14 +- .../src/p2p/services/mempool/mod.rs | 1 + .../src/p2p/services/mempool/service.rs | 14 +- .../src/p2p/services/messaging/gossip.rs | 52 ---- .../src/p2p/services/messaging/inbound.rs | 48 ++-- .../src/p2p/services/messaging/mod.rs | 3 - .../src/p2p/services/messaging/outbound.rs | 37 +-- .../src/p2p/services/mod.rs | 1 + dan_layer/common_types/src/committee.rs | 4 + .../consensus/src/hotstuff/on_propose.rs | 9 +- .../src/hotstuff/on_receive_local_proposal.rs | 43 ++-- dan_layer/consensus/src/traits/messaging.rs | 5 +- .../src/support/messaging_impls.rs | 30 ++- .../src/support/validator/builder.rs | 15 +- dan_layer/p2p/src/message.rs | 25 ++ dan_layer/p2p/src/message_spec.rs | 3 +- networking/core/src/handle.rs | 4 +- networking/core/src/lib.rs | 2 +- networking/core/src/message.rs | 3 +- networking/core/src/spawn.rs | 39 ++- networking/core/src/worker.rs | 50 ++-- networking/libp2p-messaging/src/codec/mod.rs | 4 +- .../libp2p-messaging/src/codec/prost.rs | 4 +- networking/libp2p-messaging/src/handler.rs | 4 +- 34 files changed, 774 insertions(+), 235 deletions(-) create mode 100644 applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs create mode 100644 applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs create mode 100644 applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs create mode 100644 applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs create mode 100644 applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs delete mode 100644 applications/tari_validator_node/src/p2p/services/messaging/gossip.rs diff --git a/Cargo.lock b/Cargo.lock index 642fd158f..20b3db50c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10063,6 +10063,7 @@ dependencies = [ "tari_rpc_state_sync", "tari_shutdown", "tari_state_store_sqlite", + "tari_swarm", "tari_template_builtin", "tari_template_lib", "tari_transaction", diff --git a/applications/tari_validator_node/Cargo.toml b/applications/tari_validator_node/Cargo.toml index 003b3df19..b5a7c4e55 100644 --- a/applications/tari_validator_node/Cargo.toml +++ b/applications/tari_validator_node/Cargo.toml @@ -42,6 +42,7 @@ tari_state_store_sqlite = { workspace = true } tari_networking = { workspace = true } tari_rpc_framework = { workspace = true } tari_template_builtin = { workspace = true } +tari_swarm = { workspace = true } sqlite_message_logger = { workspace = true } diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 7bedc8ae1..e16b10a97 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fs, io, ops::Deref, str::FromStr}; +use std::{collections::HashMap, fs, io, ops::Deref, str::FromStr}; use anyhow::{anyhow, Context}; use futures::{future, FutureExt}; @@ -106,8 +106,9 @@ use crate::{ p2p::{ create_tari_validator_node_rpc_service, services::{ + consensus_gossip::{self, ConsensusGossipHandle}, mempool::{self, MempoolHandle}, - messaging::{ConsensusInboundMessaging, ConsensusOutboundMessaging, Gossip}, + messaging::{ConsensusInboundMessaging, ConsensusOutboundMessaging}, }, NopLogger, }, @@ -137,7 +138,14 @@ pub async fn spawn_services( // Networking let (tx_consensus_messages, rx_consensus_messages) = mpsc::unbounded_channel(); - let (tx_gossip_messages, rx_gossip_messages) = mpsc::unbounded_channel(); + + // gossip channels + let (tx_transaction_gossip_messages, rx_transaction_gossip_messages) = mpsc::unbounded_channel(); + let (tx_consensus_gossip_messages, rx_consensus_gossip_messages) = mpsc::unbounded_channel(); + let mut tx_gossip_messages_by_topic = HashMap::new(); + tx_gossip_messages_by_topic.insert(mempool::TOPIC_PREFIX.to_string(), tx_transaction_gossip_messages); + tx_gossip_messages_by_topic.insert(consensus_gossip::TOPIC_PREFIX.to_string(), tx_consensus_gossip_messages); + let identity = identity::Keypair::sr25519_from_bytes(keypair.secret_key().as_bytes().to_vec()).map_err(|e| { ExitError::new( ExitCode::ConfigError, @@ -157,11 +165,12 @@ pub async fn spawn_services( p.addresses.into_iter().map(move |a| (peer_id, a)) }) .collect(); + let (mut networking, join_handle) = tari_networking::spawn( identity, MessagingMode::Enabled { tx_messages: tx_consensus_messages, - tx_gossip_messages, + tx_gossip_messages_by_topic, }, tari_networking::Config { listener_port: config.validator_node.p2p.listener_port, @@ -239,6 +248,11 @@ pub async fn spawn_services( per_log_cost: 1, }; + // Consensus gossip + let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = + consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages); + handles.push(join_handle); + // Messaging let message_logger = NopLogger; // SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite")); let local_address = PeerAddress::from(keypair.public_key().clone()); @@ -246,11 +260,16 @@ pub async fn spawn_services( let inbound_messaging = ConsensusInboundMessaging::new( local_address, rx_consensus_messages, + rx_consensus_gossip_messages, loopback_receiver, message_logger.clone(), ); - let outbound_messaging = - ConsensusOutboundMessaging::new(loopback_sender, networking.clone(), message_logger.clone()); + let outbound_messaging = ConsensusOutboundMessaging::new( + loopback_sender, + consensus_gossip_service.clone(), + networking.clone(), + message_logger.clone(), + ); // Consensus let payload_processor = TariDanTransactionProcessor::new(config.network, template_manager.clone(), fee_table); @@ -284,15 +303,14 @@ pub async fn spawn_services( .await; handles.push(consensus_join_handle); - let gossip = Gossip::new(networking.clone(), rx_gossip_messages); - let (mempool, join_handle) = mempool::spawn( consensus_constants.num_preshards, - gossip, epoch_manager.clone(), create_mempool_transaction_validator(template_manager.clone()), state_store.clone(), consensus_handle.clone(), + networking.clone(), + rx_transaction_gossip_messages, #[cfg(feature = "metrics")] metrics_registry, ); @@ -363,6 +381,7 @@ pub async fn spawn_services( dry_run_transaction_processor, handles, validator_node_client_factory, + consensus_gossip_service, }) } @@ -414,6 +433,7 @@ pub struct Services { pub global_db: GlobalDb>, pub dry_run_transaction_processor: DryRunTransactionProcessor, pub validator_node_client_factory: TariValidatorNodeRpcClientFactory, + pub consensus_gossip_service: ConsensusGossipHandle, pub state_store: SqliteStateStore, pub handles: Vec>>, diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs new file mode 100644 index 000000000..42138c077 --- /dev/null +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/error.rs @@ -0,0 +1,51 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use tari_epoch_manager::EpochManagerError; +use tari_networking::NetworkingError; +use tokio::sync::{mpsc, oneshot}; + +use super::ConsensusGossipRequest; + +#[derive(thiserror::Error, Debug)] +pub enum ConsensusGossipError { + #[error("Invalid message: {0}")] + InvalidMessage(#[from] anyhow::Error), + #[error("Epoch Manager Error: {0}")] + EpochManagerError(#[from] EpochManagerError), + #[error("Internal service request cancelled")] + RequestCancelled, + #[error("Network error: {0}")] + NetworkingError(#[from] NetworkingError), +} + +impl From> for ConsensusGossipError { + fn from(_: mpsc::error::SendError) -> Self { + Self::RequestCancelled + } +} + +impl From for ConsensusGossipError { + fn from(_: oneshot::error::RecvError) -> Self { + Self::RequestCancelled + } +} diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs new file mode 100644 index 000000000..21c8dd204 --- /dev/null +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/handle.rs @@ -0,0 +1,83 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use tari_consensus::messages::HotstuffMessage; +use tari_dan_common_types::ShardGroup; +use tokio::sync::{mpsc, oneshot}; + +use super::ConsensusGossipError; + +pub enum ConsensusGossipRequest { + Multicast { + shard_group: ShardGroup, + message: HotstuffMessage, + reply: oneshot::Sender>, + }, + GetLocalShardGroup { + reply: oneshot::Sender, ConsensusGossipError>>, + }, +} + +#[derive(Debug)] +pub struct ConsensusGossipHandle { + tx_consensus_request: mpsc::Sender, +} + +impl Clone for ConsensusGossipHandle { + fn clone(&self) -> Self { + ConsensusGossipHandle { + tx_consensus_request: self.tx_consensus_request.clone(), + } + } +} + +impl ConsensusGossipHandle { + pub(super) fn new(tx_consensus_request: mpsc::Sender) -> Self { + Self { tx_consensus_request } + } + + pub async fn multicast( + &self, + shard_group: ShardGroup, + message: HotstuffMessage, + ) -> Result<(), ConsensusGossipError> { + let (tx, rx) = oneshot::channel(); + self.tx_consensus_request + .send(ConsensusGossipRequest::Multicast { + shard_group, + message, + reply: tx, + }) + .await?; + + rx.await? + } + + pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { + let (tx, rx) = oneshot::channel(); + self.tx_consensus_request + .send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx }) + .await?; + + rx.await? + } +} diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs new file mode 100644 index 000000000..c0056d310 --- /dev/null +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/initializer.rs @@ -0,0 +1,60 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use libp2p::{gossipsub, PeerId}; +use log::*; +use tari_dan_common_types::PeerAddress; +use tari_dan_p2p::{proto, TariMessagingSpec}; +use tari_epoch_manager::base_layer::EpochManagerHandle; +use tari_networking::NetworkingHandle; +use tokio::{sync::mpsc, task, task::JoinHandle}; + +use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle}; + +const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; + +pub fn spawn( + epoch_manager: EpochManagerHandle, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, +) -> ( + ConsensusGossipHandle, + JoinHandle>, + mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>, +) { + let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10); + let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10); + + let consensus_gossip = ConsensusGossipService::new( + rx_consensus_request, + epoch_manager, + networking, + rx_gossip, + tx_consensus_gossip, + ); + let handle = ConsensusGossipHandle::new(tx_consensus_request); + + let join_handle = task::spawn(consensus_gossip.run()); + debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle); + + (handle, join_handle, rx_consensus_gossip) +} diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs new file mode 100644 index 000000000..85d886082 --- /dev/null +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod error; +pub use error::*; + +mod handle; +pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest}; + +mod initializer; +pub use initializer::spawn; + +mod service; +pub use service::TOPIC_PREFIX; diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs new file mode 100644 index 000000000..4ce900e3d --- /dev/null +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs @@ -0,0 +1,239 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::fmt::Display; + +use libp2p::{gossipsub, PeerId}; +use log::*; +use tari_consensus::messages::HotstuffMessage; +use tari_dan_common_types::{Epoch, PeerAddress, ShardGroup}; +use tari_dan_p2p::{proto, TariMessagingSpec}; +use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerEvent, EpochManagerReader}; +use tari_networking::{NetworkingHandle, NetworkingService}; +use tari_swarm::messaging::{prost::ProstCodec, Codec}; +use tokio::sync::{mpsc, oneshot}; + +use super::{ConsensusGossipError, ConsensusGossipRequest}; + +const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::service"; + +pub const TOPIC_PREFIX: &str = "consensus"; + +#[derive(Debug)] +pub(super) struct ConsensusGossipService { + requests: mpsc::Receiver, + epoch_manager: EpochManagerHandle, + is_subscribed: Option, + networking: NetworkingHandle, + codec: ProstCodec, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, + tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, +} + +impl ConsensusGossipService { + pub fn new( + requests: mpsc::Receiver, + epoch_manager: EpochManagerHandle, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, + tx_consensus_gossip: mpsc::Sender<(PeerId, proto::consensus::HotStuffMessage)>, + ) -> Self { + Self { + requests, + epoch_manager, + is_subscribed: None, + networking, + codec: ProstCodec::default(), + rx_gossip, + tx_consensus_gossip, + } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + let mut events = self.epoch_manager.subscribe().await?; + + loop { + tokio::select! { + Some(req) = self.requests.recv() => self.handle_request(req).await, + Some(msg) = self.rx_gossip.recv() => { + if let Err(err) = self.handle_incoming_gossip_message(msg).await { + warn!(target: LOG_TARGET, "Consensus gossip service error: {}", err); + } + }, + Ok(event) = events.recv() => { + if let EpochManagerEvent::EpochChanged(epoch) = event { + if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await?{ + info!(target: LOG_TARGET, "Consensus gossip service subscribing messages for epoch {}", epoch); + self.subscribe(epoch).await?; + + // TODO: unsubscribe older epoch shards? + } + } + }, + else => { + info!(target: LOG_TARGET, "Consensus gossip service shutting down"); + break; + } + } + } + + self.unsubscribe().await?; + + Ok(()) + } + + async fn handle_request(&mut self, request: ConsensusGossipRequest) { + match request { + ConsensusGossipRequest::Multicast { + shard_group, + message, + reply, + } => { + handle(reply, self.multicast(shard_group, message).await); + }, + ConsensusGossipRequest::GetLocalShardGroup { reply } => { + handle(reply, self.get_local_shard_group().await); + }, + } + } + + async fn handle_incoming_gossip_message( + &mut self, + msg: (PeerId, gossipsub::Message), + ) -> Result<(), ConsensusGossipError> { + let (from, msg) = msg; + + let (_, msg) = self + .codec + .decode_from(&mut msg.data.as_slice()) + .await + .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; + + self.tx_consensus_gossip + .send((from, msg)) + .await + .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; + + Ok(()) + } + + async fn subscribe(&mut self, epoch: Epoch) -> Result<(), ConsensusGossipError> { + let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; + let shard_group = committee_shard.shard_group(); + + match self.is_subscribed { + Some(sg) if sg == shard_group => { + return Ok(()); + }, + Some(_) => { + self.unsubscribe().await?; + }, + None => {}, + } + + let topic = shard_group_to_topic(shard_group); + self.networking.subscribe_topic(topic).await?; + self.is_subscribed = Some(committee_shard.shard_group()); + + Ok(()) + } + + async fn unsubscribe(&mut self) -> Result<(), ConsensusGossipError> { + if let Some(sg) = self.is_subscribed { + let topic = shard_group_to_topic(sg); + self.networking.unsubscribe_topic(topic).await?; + self.is_subscribed = None; + } + + Ok(()) + } + + pub async fn multicast( + &mut self, + shard_group: ShardGroup, + message: HotstuffMessage, + ) -> Result<(), ConsensusGossipError> { + // if we are alone in the local shard group, no need to broadcast + if self.num_shard_group_members().await? < 2 { + return Ok(()); + } + + let topic = shard_group_to_topic(shard_group); + + debug!( + target: LOG_TARGET, + "multicast: topic: {}", topic, + ); + + let message = proto::consensus::HotStuffMessage::from(&message); + let mut buf = Vec::with_capacity(1024); + self.codec + .encode_to(&mut buf, message) + .await + .map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?; + + self.networking.publish_gossip(topic, buf).await?; + + Ok(()) + } + + async fn num_shard_group_members(&self) -> Result { + let epoch = self.epoch_manager.current_epoch().await?; + + if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { + let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; + return Ok(committee_shard.num_shard_group_members()); + } + + // default value if the VN is not registered + Ok(0) + } + + pub async fn get_local_shard_group(&self) -> Result, ConsensusGossipError> { + let epoch = self.epoch_manager.current_epoch().await?; + + if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { + return Ok(None); + } + + let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; + Ok(Some(committee_shard.shard_group())) + } +} + +fn shard_group_to_topic(shard_group: ShardGroup) -> String { + format!( + "{}-{}-{}", + TOPIC_PREFIX, + shard_group.start().as_u32(), + shard_group.end().as_u32() + ) +} + +fn handle(reply: oneshot::Sender>, result: Result) { + if let Err(ref e) = result { + error!(target: LOG_TARGET, "Request failed with error: {}", e); + } + if reply.send(result).is_err() { + error!(target: LOG_TARGET, "Requester abandoned request"); + } +} diff --git a/applications/tari_validator_node/src/p2p/services/mempool/error.rs b/applications/tari_validator_node/src/p2p/services/mempool/error.rs index 4168c044a..c80aecc5d 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/error.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/error.rs @@ -28,7 +28,6 @@ pub enum MempoolError { StorageError(#[from] StorageError), #[error("Transaction validation error: {0}")] TransactionValidationError(#[from] TransactionValidationError), - #[error("Network error: {0}")] NetworkingError(#[from] NetworkingError), } diff --git a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs index d84b71a89..89e21b6f5 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs @@ -3,38 +3,81 @@ use std::{collections::HashSet, iter}; +use libp2p::{gossipsub, PeerId}; use log::*; use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress}; -use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage}; +use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage, TariMessagingSpec}; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; +use tari_networking::{NetworkingHandle, NetworkingService}; +use tari_swarm::messaging::{prost::ProstCodec, Codec}; +use tokio::sync::mpsc; -use crate::p2p::services::{mempool::MempoolError, messaging::Gossip}; +use crate::p2p::services::mempool::MempoolError; const LOG_TARGET: &str = "tari::validator_node::mempool::gossip"; +pub const TOPIC_PREFIX: &str = "transactions"; + +#[derive(Debug)] +pub struct MempoolGossipCodec { + codec: ProstCodec, +} + +impl MempoolGossipCodec { + pub fn new() -> Self { + Self { + codec: ProstCodec::default(), + } + } + + pub async fn encode(&self, message: DanMessage) -> std::io::Result> { + let mut buf = Vec::with_capacity(1024); + let message = proto::network::DanMessage::from(&message); + self.codec.encode_to(&mut buf, message).await?; + Ok(buf) + } + + pub async fn decode(&self, message: gossipsub::Message) -> std::io::Result<(usize, DanMessage)> { + let (length, message) = self.codec.decode_from(&mut message.data.as_slice()).await?; + let message = DanMessage::try_from(message).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + Ok((length, message)) + } +} + #[derive(Debug)] pub(super) struct MempoolGossip { num_preshards: NumPreshards, epoch_manager: EpochManagerHandle, - gossip: Gossip, is_subscribed: Option, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, + codec: MempoolGossipCodec, } impl MempoolGossip { - pub fn new(num_preshards: NumPreshards, epoch_manager: EpochManagerHandle, outbound: Gossip) -> Self { + pub fn new( + num_preshards: NumPreshards, + epoch_manager: EpochManagerHandle, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, + ) -> Self { Self { num_preshards, epoch_manager, - gossip: outbound, is_subscribed: None, + networking, + rx_gossip, + codec: MempoolGossipCodec::new(), } } pub async fn next_message(&mut self) -> Option> { - self.gossip - .next_message() - .await - .map(|result| result.map_err(MempoolError::InvalidMessage)) + let (from, msg) = self.rx_gossip.recv().await?; + match self.codec.decode(msg).await { + Ok((len, msg)) => Some(Ok((from.into(), msg, len))), + Err(e) => Some(Err(MempoolError::InvalidMessage(e.into()))), + } } pub async fn subscribe(&mut self, epoch: Epoch) -> Result<(), MempoolError> { @@ -49,7 +92,7 @@ impl MempoolGossip { None => {}, } - self.gossip + self.networking .subscribe_topic(shard_group_to_topic(committee_shard.shard_group())) .await?; self.is_subscribed = Some(committee_shard.shard_group()); @@ -58,7 +101,7 @@ impl MempoolGossip { pub async fn unsubscribe(&mut self) -> Result<(), MempoolError> { if let Some(sg) = self.is_subscribed { - self.gossip.unsubscribe_topic(shard_group_to_topic(sg)).await?; + self.networking.unsubscribe_topic(shard_group_to_topic(sg)).await?; self.is_subscribed = None; } Ok(()) @@ -73,14 +116,18 @@ impl MempoolGossip { "forward_to_local_replicas: topic: {}", topic, ); - let msg = proto::network::DanMessage::from(&msg); - self.gossip.publish_message(topic, msg).await?; + let msg = self + .codec + .encode(msg) + .await + .map_err(|e| MempoolError::InvalidMessage(e.into()))?; + self.networking.publish_gossip(topic, msg).await?; Ok(()) } pub fn get_num_incoming_messages(&self) -> usize { - self.gossip.get_num_incoming_messages() + self.rx_gossip.len() } pub async fn forward_to_foreign_replicas( @@ -109,15 +156,19 @@ impl MempoolGossip { .filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group) .collect::>(); - let msg = proto::network::DanMessage::from(&msg.into()); + let msg = self + .codec + .encode(msg.into()) + .await + .map_err(|e| MempoolError::InvalidMessage(e.into()))?; + for sg in shard_groups { let topic = shard_group_to_topic(sg); debug!( target: LOG_TARGET, "forward_to_foreign_replicas: topic: {}", topic, ); - - self.gossip.publish_message(topic, msg.clone()).await?; + self.networking.publish_gossip(topic, msg.clone()).await?; } Ok(()) @@ -126,7 +177,8 @@ impl MempoolGossip { fn shard_group_to_topic(shard_group: ShardGroup) -> String { format!( - "transactions-{}-{}", + "{}-{}-{}", + TOPIC_PREFIX, shard_group.start().as_u32(), shard_group.end().as_u32() ) diff --git a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs index 735fe9518..2bb4d622e 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs @@ -20,9 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use libp2p::{gossipsub, PeerId}; use log::*; use tari_dan_common_types::{NumPreshards, PeerAddress}; +use tari_dan_p2p::TariMessagingSpec; use tari_epoch_manager::base_layer::EpochManagerHandle; +use tari_networking::NetworkingHandle; use tari_state_store_sqlite::SqliteStateStore; use tari_transaction::Transaction; use tokio::{sync::mpsc, task, task::JoinHandle}; @@ -31,10 +34,7 @@ use tokio::{sync::mpsc, task, task::JoinHandle}; use super::metrics::PrometheusMempoolMetrics; use crate::{ consensus::ConsensusHandle, - p2p::services::{ - mempool::{handle::MempoolHandle, service::MempoolService}, - messaging::Gossip, - }, + p2p::services::mempool::{handle::MempoolHandle, service::MempoolService}, transaction_validators::TransactionValidationError, validator::Validator, }; @@ -43,11 +43,12 @@ const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; pub fn spawn( num_preshards: NumPreshards, - gossip: Gossip, epoch_manager: EpochManagerHandle, transaction_validator: TValidator, state_store: SqliteStateStore, consensus_handle: ConsensusHandle, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, #[cfg(feature = "metrics")] metrics_registry: &prometheus::Registry, ) -> (MempoolHandle, JoinHandle>) where @@ -62,11 +63,12 @@ where let mempool = MempoolService::new( num_preshards, rx_mempool_request, - gossip, epoch_manager, transaction_validator, state_store, consensus_handle, + networking, + rx_gossip, #[cfg(feature = "metrics")] metrics, ); diff --git a/applications/tari_validator_node/src/p2p/services/mempool/mod.rs b/applications/tari_validator_node/src/p2p/services/mempool/mod.rs index 799ff4f8f..7f2b66e72 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/mod.rs @@ -28,6 +28,7 @@ pub use initializer::spawn; mod error; mod gossip; +pub use gossip::TOPIC_PREFIX; #[cfg(feature = "metrics")] mod metrics; mod service; diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 2485b360b..15d4fbb3c 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -22,12 +22,14 @@ use std::{collections::HashSet, fmt::Display, iter}; +use libp2p::{gossipsub, PeerId}; use log::*; use tari_dan_common_types::{optional::Optional, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress}; -use tari_dan_p2p::{DanMessage, NewTransactionMessage}; +use tari_dan_p2p::{DanMessage, NewTransactionMessage, TariMessagingSpec}; use tari_dan_storage::{consensus_models::TransactionRecord, StateStore}; use tari_engine_types::commit_result::RejectReason; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerEvent, EpochManagerReader}; +use tari_networking::NetworkingHandle; use tari_state_store_sqlite::SqliteStateStore; use tari_transaction::{Transaction, TransactionId}; use tokio::sync::{mpsc, oneshot}; @@ -37,10 +39,7 @@ use super::metrics::PrometheusMempoolMetrics; use super::MempoolError; use crate::{ consensus::ConsensusHandle, - p2p::services::{ - mempool::{gossip::MempoolGossip, handle::MempoolRequest}, - messaging::Gossip, - }, + p2p::services::mempool::{gossip::MempoolGossip, handle::MempoolRequest}, transaction_validators::TransactionValidationError, validator::Validator, }; @@ -66,15 +65,16 @@ where TValidator: Validator, - gossip: Gossip, epoch_manager: EpochManagerHandle, before_execute_validator: TValidator, state_store: SqliteStateStore, consensus_handle: ConsensusHandle, + networking: NetworkingHandle, + rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, #[cfg(feature = "metrics")] metrics: PrometheusMempoolMetrics, ) -> Self { Self { - gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), gossip), + gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), networking, rx_gossip), transactions: Default::default(), mempool_requests, epoch_manager, diff --git a/applications/tari_validator_node/src/p2p/services/messaging/gossip.rs b/applications/tari_validator_node/src/p2p/services/messaging/gossip.rs deleted file mode 100644 index 3a24258e3..000000000 --- a/applications/tari_validator_node/src/p2p/services/messaging/gossip.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -use libp2p::PeerId; -use tari_dan_common_types::PeerAddress; -use tari_dan_p2p::{proto, DanMessage, TariMessagingSpec}; -use tari_networking::{MessageSpec, NetworkingError, NetworkingHandle, NetworkingService}; -use tokio::sync::mpsc; - -#[derive(Debug)] -pub struct Gossip { - networking: NetworkingHandle, - rx_gossip: mpsc::UnboundedReceiver<(PeerId, proto::network::DanMessage)>, -} - -impl Gossip { - pub fn new( - networking: NetworkingHandle, - rx_gossip: mpsc::UnboundedReceiver<(PeerId, proto::network::DanMessage)>, - ) -> Self { - Self { networking, rx_gossip } - } - - pub fn get_num_incoming_messages(&self) -> usize { - self.rx_gossip.len() - } - - pub async fn next_message(&mut self) -> Option> { - let (from, msg) = self.rx_gossip.recv().await?; - let len = self.rx_gossip.len(); - match msg.try_into() { - Ok(msg) => Some(Ok((from.into(), msg, len))), - Err(e) => Some(Err(e)), - } - } - - pub async fn subscribe_topic + Send>(&mut self, topic: T) -> Result<(), NetworkingError> { - self.networking.subscribe_topic(topic).await - } - - pub async fn unsubscribe_topic + Send>(&mut self, topic: T) -> Result<(), NetworkingError> { - self.networking.unsubscribe_topic(topic).await - } - - pub async fn publish_message + Send>( - &mut self, - topic: T, - message: ::GossipMessage, - ) -> Result<(), NetworkingError> { - self.networking.publish_gossip(topic, message).await - } -} diff --git a/applications/tari_validator_node/src/p2p/services/messaging/inbound.rs b/applications/tari_validator_node/src/p2p/services/messaging/inbound.rs index fb6775e2d..f8104b6a8 100644 --- a/applications/tari_validator_node/src/p2p/services/messaging/inbound.rs +++ b/applications/tari_validator_node/src/p2p/services/messaging/inbound.rs @@ -13,6 +13,7 @@ use crate::p2p::logging::MessageLogger; pub struct ConsensusInboundMessaging { local_address: PeerAddress, rx_inbound_msg: mpsc::UnboundedReceiver<(PeerId, proto::consensus::HotStuffMessage)>, + rx_gossip: mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>, rx_loopback: mpsc::UnboundedReceiver, msg_logger: TMsgLogger, } @@ -21,16 +22,35 @@ impl ConsensusInboundMessaging { pub fn new( local_address: PeerAddress, rx_inbound_msg: mpsc::UnboundedReceiver<(PeerId, proto::consensus::HotStuffMessage)>, + rx_gossip: mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>, rx_loopback: mpsc::UnboundedReceiver, msg_logger: TMsgLogger, ) -> Self { Self { local_address, rx_inbound_msg, + rx_gossip, rx_loopback, msg_logger, } } + + fn handle_message( + &self, + from: PeerId, + msg: proto::consensus::HotStuffMessage, + ) -> Option> { + match HotstuffMessage::try_from(msg) { + Ok(msg) => { + self.msg_logger + .log_inbound_message(&from.to_string(), msg.as_type_str(), "", &msg); + Some(Ok((from.into(), msg))) + }, + Err(err) => Some(Err(InboundMessagingError::InvalidMessage { + reason: err.to_string(), + })), + } + } } #[async_trait] @@ -41,9 +61,9 @@ impl tari_consensus::traits::InboundMessaging async fn next_message(&mut self) -> Option> { tokio::select! { - // BIASED: messaging priority is loopback, then other - biased; - maybe_msg = self.rx_loopback.recv() => maybe_msg.map(|msg| { + // BIASED: messaging priority is loopback, then other + biased; + maybe_msg = self.rx_loopback.recv() => maybe_msg.map(|msg| { self.msg_logger.log_inbound_message( &self.local_address.to_string(), msg.as_type_str(), @@ -52,22 +72,14 @@ impl tari_consensus::traits::InboundMessaging ); Ok((self.local_address, msg)) }), - maybe_msg = self.rx_inbound_msg.recv() => { + maybe_msg = self.rx_inbound_msg.recv() => { let (from, msg) = maybe_msg?; - match HotstuffMessage::try_from(msg) { - Ok(msg) => { - self.msg_logger.log_inbound_message( - &from.to_string(), - msg.as_type_str(), - "", - &msg, - ); - Some(Ok((from.into(), msg))) - } - Err(err) => return Some(Err(InboundMessagingError::InvalidMessage{ reason: err.to_string() } )), - } - - }, + self.handle_message(from, msg) + }, + maybe_msg = self.rx_gossip.recv() => { + let (from, msg) = maybe_msg?; + self.handle_message(from, msg) + }, } } } diff --git a/applications/tari_validator_node/src/p2p/services/messaging/mod.rs b/applications/tari_validator_node/src/p2p/services/messaging/mod.rs index fe3ec7e0f..3517b25e7 100644 --- a/applications/tari_validator_node/src/p2p/services/messaging/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/messaging/mod.rs @@ -4,8 +4,5 @@ mod inbound; pub use inbound::*; -mod gossip; -pub use gossip::*; - mod outbound; pub use outbound::*; diff --git a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs index 3cb36f3da..1251b0c9a 100644 --- a/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs +++ b/applications/tari_validator_node/src/p2p/services/messaging/outbound.rs @@ -25,12 +25,12 @@ use async_trait::async_trait; use tari_consensus::{messages::HotstuffMessage, traits::OutboundMessagingError}; -use tari_dan_common_types::PeerAddress; +use tari_dan_common_types::{PeerAddress, ShardGroup}; use tari_dan_p2p::{proto, TariMessagingSpec}; use tari_networking::{NetworkingHandle, NetworkingService}; use tokio::sync::mpsc; -use crate::p2p::logging::MessageLogger; +use crate::p2p::{logging::MessageLogger, services::consensus_gossip::ConsensusGossipHandle}; const _LOG_TARGET: &str = "tari::dan::messages::outbound::validator_node"; @@ -38,6 +38,7 @@ const _LOG_TARGET: &str = "tari::dan::messages::outbound::validator_node"; pub struct ConsensusOutboundMessaging { our_node_addr: PeerAddress, loopback_sender: mpsc::UnboundedSender, + consensus_gossip: ConsensusGossipHandle, networking: NetworkingHandle, msg_logger: TMsgLogger, } @@ -45,12 +46,14 @@ pub struct ConsensusOutboundMessaging { impl ConsensusOutboundMessaging { pub fn new( loopback_sender: mpsc::UnboundedSender, + consensus_gossip: ConsensusGossipHandle, networking: NetworkingHandle, msg_logger: TMsgLogger, ) -> Self { Self { our_node_addr: (*networking.local_peer_id()).into(), loopback_sender, + consensus_gossip, networking, msg_logger, } @@ -101,37 +104,25 @@ impl tari_consensus::traits::OutboundMessaging Ok(()) } - async fn multicast<'a, I, T>(&mut self, committee: I, message: T) -> Result<(), OutboundMessagingError> + async fn multicast<'a, T>(&mut self, shard_group: ShardGroup, message: T) -> Result<(), OutboundMessagingError> where Self::Addr: 'a, - I: IntoIterator + Send, T: Into + Send, { let message = message.into(); - let (ours, theirs) = committee - .into_iter() - .partition::, _>(|x| **x == self.our_node_addr); - - if ours.is_empty() && theirs.is_empty() { - return Ok(()); - } - // send it once to ourselves - if !ours.is_empty() { + let local_shard_group = self + .consensus_gossip + .get_local_shard_group() + .await + .map_err(OutboundMessagingError::from_error)?; + if local_shard_group == Some(shard_group) { self.send_self(message.clone()).await?; } - for to in &theirs { - self.msg_logger - .log_outbound_message("multicast", &to.to_string(), message.as_type_str(), "", &message); - } - - self.networking - .send_multicast( - theirs.into_iter().map(|a| a.as_peer_id()).collect::>(), - proto::consensus::HotStuffMessage::from(&message), - ) + self.consensus_gossip + .multicast(shard_group, message) .await .map_err(OutboundMessagingError::from_error)?; diff --git a/applications/tari_validator_node/src/p2p/services/mod.rs b/applications/tari_validator_node/src/p2p/services/mod.rs index 85097ac35..f9768316d 100644 --- a/applications/tari_validator_node/src/p2p/services/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/mod.rs @@ -20,5 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +pub mod consensus_gossip; pub mod mempool; pub mod messaging; diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index b2fddc899..cf7399173 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -204,6 +204,10 @@ impl CommitteeInfo { (len - 1) / 3 } + pub fn num_shard_group_members(&self) -> u32 { + self.num_shard_group_members + } + pub fn num_preshards(&self) -> NumPreshards { self.num_shards } diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index a97b64d42..74e345d14 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -214,7 +214,7 @@ where TConsensusSpec: ConsensusSpec next_block.parent() ); - self.broadcast_local_proposal(next_block, foreign_proposals, local_committee) + self.broadcast_local_proposal(next_block, foreign_proposals, &local_committee_info) .await?; Ok(()) @@ -224,19 +224,18 @@ where TConsensusSpec: ConsensusSpec &mut self, next_block: Block, foreign_proposals: Vec, - local_committee: &Committee, + local_committee_info: &CommitteeInfo, ) -> Result<(), HotStuffError> { info!( target: LOG_TARGET, - "🌿 Broadcasting local proposal {} to {} local committees", + "🌿 Broadcasting local proposal {} to local committee", next_block, - local_committee.len(), ); // Broadcast to local and foreign committees self.outbound_messaging .multicast( - local_committee.iter().map(|(addr, _)| addr), + local_committee_info.shard_group(), HotstuffMessage::Proposal(ProposalMessage { block: next_block, foreign_proposals, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 376299f7b..51f8d0158 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -822,34 +822,29 @@ async fn broadcast_foreign_proposal_if_required( // substates we send to validators to only those that are applicable to the transactions that involve // them. - let mut addresses = HashSet::new(); // TODO(perf): fetch only applicable committee addresses - let mut committees = epoch_manager.get_committees(block.epoch()).await?; + for shard_group in non_local_shard_groups { - addresses.extend( - committees - .remove(&shard_group) - .into_iter() - .flat_map(|c| c.into_iter().map(|(addr, _)| addr)), + info!( + target: LOG_TARGET, + "🌐 FOREIGN PROPOSE: Broadcasting locked block {} with {} pledge(s) to shard group {}.", + &block, + &block_pledge.num_substates_pledged(), + shard_group, ); + + outbound_messaging + .multicast( + shard_group, + HotstuffMessage::ForeignProposal(ForeignProposalMessage { + block: block.clone(), + block_pledge: block_pledge.clone(), + justify_qc: justify_qc.clone(), + }), + ) + .await?; } - info!( - target: LOG_TARGET, - "🌐 FOREIGN PROPOSE: Broadcasting locked block {} with {} pledge(s) to {} foreign committees.", - block, - block_pledge.num_substates_pledged(), - addresses.len(), - ); - outbound_messaging - .multicast( - &addresses, - HotstuffMessage::ForeignProposal(ForeignProposalMessage { - block, - block_pledge, - justify_qc, - }), - ) - .await?; + Ok(()) } diff --git a/dan_layer/consensus/src/traits/messaging.rs b/dan_layer/consensus/src/traits/messaging.rs index 919b2fb13..f3fb1f056 100644 --- a/dan_layer/consensus/src/traits/messaging.rs +++ b/dan_layer/consensus/src/traits/messaging.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use async_trait::async_trait; -use tari_dan_common_types::NodeAddressable; +use tari_dan_common_types::{NodeAddressable, ShardGroup}; use crate::messages::HotstuffMessage; @@ -37,10 +37,9 @@ pub trait OutboundMessaging { message: T, ) -> Result<(), OutboundMessagingError>; - async fn multicast<'a, I, T>(&mut self, committee: I, message: T) -> Result<(), OutboundMessagingError> + async fn multicast<'a, T>(&mut self, shard_group: ShardGroup, message: T) -> Result<(), OutboundMessagingError> where Self::Addr: 'a, - I: IntoIterator + Send, T: Into + Send; } diff --git a/dan_layer/consensus_tests/src/support/messaging_impls.rs b/dan_layer/consensus_tests/src/support/messaging_impls.rs index 751945840..be0cec58b 100644 --- a/dan_layer/consensus_tests/src/support/messaging_impls.rs +++ b/dan_layer/consensus_tests/src/support/messaging_impls.rs @@ -6,12 +6,16 @@ use tari_consensus::{ messages::HotstuffMessage, traits::{InboundMessaging, InboundMessagingError, OutboundMessaging, OutboundMessagingError}, }; +use tari_dan_common_types::ShardGroup; +use tari_epoch_manager::EpochManagerReader; use tokio::sync::mpsc; +use super::epoch_manager::TestEpochManager; use crate::support::TestAddress; #[derive(Debug, Clone)] pub struct TestOutboundMessaging { + epoch_manager: TestEpochManager, tx_leader: mpsc::Sender<(TestAddress, HotstuffMessage)>, tx_broadcast: mpsc::Sender<(Vec, HotstuffMessage)>, loopback_sender: mpsc::Sender, @@ -19,12 +23,14 @@ pub struct TestOutboundMessaging { impl TestOutboundMessaging { pub fn create( + epoch_manager: TestEpochManager, tx_leader: mpsc::Sender<(TestAddress, HotstuffMessage)>, tx_broadcast: mpsc::Sender<(Vec, HotstuffMessage)>, ) -> (Self, mpsc::Receiver) { let (loopback_sender, loopback_receiver) = mpsc::channel(100); ( Self { + epoch_manager, tx_leader, tx_broadcast, loopback_sender, @@ -60,18 +66,30 @@ impl OutboundMessaging for TestOutboundMessaging { }) } - async fn multicast<'a, I, T>(&mut self, committee: I, message: T) -> Result<(), OutboundMessagingError> + async fn multicast<'a, T>(&mut self, shard_group: ShardGroup, message: T) -> Result<(), OutboundMessagingError> where Self::Addr: 'a, - I: IntoIterator + Send, T: Into + Send, { - self.tx_broadcast - .send((committee.into_iter().cloned().collect(), message.into())) + let epoch = self + .epoch_manager + .current_epoch() .await - .map_err(|_| OutboundMessagingError::FailedToEnqueueMessage { + .map_err(|e| OutboundMessagingError::UpstreamError(e.into()))?; + let peers: Vec = self + .epoch_manager + .get_committees_by_shard_group(epoch, shard_group) + .await + .map_err(|e| OutboundMessagingError::UpstreamError(e.into()))? + .values() + .flat_map(|c| c.addresses().cloned()) + .collect(); + + self.tx_broadcast.send((peers, message.into())).await.map_err(|_| { + OutboundMessagingError::FailedToEnqueueMessage { reason: "broadcast channel closed".to_string(), - }) + } + }) } } diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index 638f0ea13..e46f6679b 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -117,7 +117,14 @@ impl ValidatorBuilder { let (tx_hs_message, rx_hs_message) = mpsc::channel(100); let (tx_leader, rx_leader) = mpsc::channel(100); - let (outbound_messaging, rx_loopback) = TestOutboundMessaging::create(tx_leader, tx_broadcast); + let epoch_manager = self.epoch_manager.as_ref().unwrap().clone_for( + self.address.clone(), + self.public_key.clone(), + self.shard_address, + ); + + let (outbound_messaging, rx_loopback) = + TestOutboundMessaging::create(epoch_manager.clone(), tx_leader, tx_broadcast); let inbound_messaging = TestInboundMessaging::new(self.address.clone(), rx_hs_message, rx_loopback); let store = SqliteStateStore::connect(&self.sql_url).unwrap(); @@ -125,12 +132,6 @@ impl ValidatorBuilder { let transaction_pool = TransactionPool::new(); let (tx_events, _) = broadcast::channel(100); - let epoch_manager = self.epoch_manager.as_ref().unwrap().clone_for( - self.address.clone(), - self.public_key.clone(), - self.shard_address, - ); - let transaction_executor = TestBlockTransactionProcessor::new(self.transaction_executions.clone()); let worker = HotstuffWorker::::new( diff --git a/dan_layer/p2p/src/message.rs b/dan_layer/p2p/src/message.rs index c006882a6..9ac19b148 100644 --- a/dan_layer/p2p/src/message.rs +++ b/dan_layer/p2p/src/message.rs @@ -3,6 +3,7 @@ use std::fmt::{Display, Formatter}; +use anyhow::bail; use serde::Serialize; use tari_consensus::messages::HotstuffMessage; use tari_transaction::Transaction; @@ -61,6 +62,30 @@ impl DanMessage { } } +impl TryFrom for DanMessage { + type Error = anyhow::Error; + + fn try_from(msg: Message) -> Result { + if let Message::Dan(msg) = msg { + Ok(msg) + } else { + bail!("Invalid variant") + } + } +} + +impl TryFrom for HotstuffMessage { + type Error = anyhow::Error; + + fn try_from(msg: Message) -> Result { + if let Message::Consensus(msg) = msg { + Ok(msg) + } else { + bail!("Invalid variant") + } + } +} + impl From for DanMessage { fn from(value: NewTransactionMessage) -> Self { Self::NewTransaction(Box::new(value)) diff --git a/dan_layer/p2p/src/message_spec.rs b/dan_layer/p2p/src/message_spec.rs index 73f94b26a..fcd1c131f 100644 --- a/dan_layer/p2p/src/message_spec.rs +++ b/dan_layer/p2p/src/message_spec.rs @@ -9,6 +9,7 @@ use crate::proto; pub struct TariMessagingSpec; impl MessageSpec for TariMessagingSpec { - type GossipMessage = proto::network::DanMessage; + type ConsensusGossipMessage = proto::consensus::HotStuffMessage; type Message = proto::consensus::HotStuffMessage; + type TransactionGossipMessage = proto::network::DanMessage; } diff --git a/networking/core/src/handle.rs b/networking/core/src/handle.rs index 6449ac274..fa5266978 100644 --- a/networking/core/src/handle.rs +++ b/networking/core/src/handle.rs @@ -69,7 +69,7 @@ pub enum NetworkingRequest { }, PublishGossip { topic: IdentTopic, - message: TMsg::GossipMessage, + message: Vec, reply_tx: oneshot::Sender>, }, SubscribeTopic { @@ -337,7 +337,7 @@ impl NetworkingService for NetworkingH async fn publish_gossip + Send>( &mut self, topic: TTopic, - message: TMsg::GossipMessage, + message: Vec, ) -> Result<(), NetworkingError> { let (tx, rx) = oneshot::channel(); self.tx_request diff --git a/networking/core/src/lib.rs b/networking/core/src/lib.rs index 01c14100a..9a738f233 100644 --- a/networking/core/src/lib.rs +++ b/networking/core/src/lib.rs @@ -61,7 +61,7 @@ pub trait NetworkingService { async fn publish_gossip + Send>( &mut self, topic: TTopic, - message: TMsg::GossipMessage, + message: Vec, ) -> Result<(), NetworkingError>; async fn subscribe_topic + Send>(&mut self, topic: T) -> Result<(), NetworkingError>; diff --git a/networking/core/src/message.rs b/networking/core/src/message.rs index 22076f549..2b07a44b1 100644 --- a/networking/core/src/message.rs +++ b/networking/core/src/message.rs @@ -5,5 +5,6 @@ use core::fmt; pub trait MessageSpec { type Message: fmt::Debug + Send; - type GossipMessage: fmt::Debug + Send; + type TransactionGossipMessage: fmt::Debug + Send; + type ConsensusGossipMessage: fmt::Debug + Send; } diff --git a/networking/core/src/spawn.rs b/networking/core/src/spawn.rs index 5340d9a0c..77b816698 100644 --- a/networking/core/src/spawn.rs +++ b/networking/core/src/spawn.rs @@ -1,10 +1,10 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use anyhow::anyhow; -use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use libp2p::{gossipsub, identity::Keypair, Multiaddr, PeerId}; use tari_shutdown::ShutdownSignal; use tari_swarm::{is_supported_multiaddr, messaging, messaging::prost::ProstCodec}; use tokio::{ @@ -14,6 +14,16 @@ use tokio::{ use crate::{message::MessageSpec, worker::NetworkingWorker, NetworkingHandle}; +pub const TOPIC_DELIMITER: &str = "-"; + +#[derive(Debug, thiserror::Error)] +pub enum GossipSendError { + #[error("Invalid token topic: {0}")] + InvalidToken(String), + #[error("Send gossip error: {0}")] + SendError(#[from] mpsc::error::SendError<(PeerId, gossipsub::Message)>), +} + pub fn spawn( identity: Keypair, messaging_mode: MessagingMode, @@ -24,7 +34,8 @@ pub fn spawn( where TMsg: MessageSpec + 'static, TMsg::Message: messaging::prost::Message + Default + Clone + 'static, - TMsg::GossipMessage: messaging::prost::Message + Default + Clone + 'static, + TMsg::TransactionGossipMessage: messaging::prost::Message + Default + Clone + 'static, + TMsg::ConsensusGossipMessage: messaging::prost::Message + Default + Clone + 'static, TMsg: MessageSpec, { for (_, addr) in &seed_peers { @@ -59,7 +70,7 @@ where pub enum MessagingMode { Enabled { tx_messages: mpsc::UnboundedSender<(PeerId, TMsg::Message)>, - tx_gossip_messages: mpsc::UnboundedSender<(PeerId, TMsg::GossipMessage)>, + tx_gossip_messages_by_topic: HashMap>, }, Disabled, } @@ -82,12 +93,20 @@ impl MessagingMode { Ok(()) } - pub fn send_gossip_message( - &self, - peer_id: PeerId, - msg: TMsg::GossipMessage, - ) -> Result<(), mpsc::error::SendError<(PeerId, TMsg::GossipMessage)>> { - if let MessagingMode::Enabled { tx_gossip_messages, .. } = self { + pub fn send_gossip_message(&self, peer_id: PeerId, msg: gossipsub::Message) -> Result<(), GossipSendError> { + if let MessagingMode::Enabled { + tx_gossip_messages_by_topic, + .. + } = self + { + let (prefix, _) = msg + .topic + .as_str() + .split_once(TOPIC_DELIMITER) + .ok_or(GossipSendError::InvalidToken(msg.topic.clone().into_string()))?; + let tx_gossip_messages = tx_gossip_messages_by_topic + .get(prefix) + .ok_or(GossipSendError::InvalidToken(msg.topic.clone().into_string()))?; tx_gossip_messages.send((peer_id, msg))?; } Ok(()) diff --git a/networking/core/src/worker.rs b/networking/core/src/worker.rs index bb806fd7e..fe569af0c 100644 --- a/networking/core/src/worker.rs +++ b/networking/core/src/worker.rs @@ -38,7 +38,7 @@ use tari_shutdown::ShutdownSignal; use tari_swarm::{ is_supported_multiaddr, messaging, - messaging::{prost, prost::ProstCodec, Codec}, + messaging::{prost, prost::ProstCodec}, peersync, substream, substream::{NegotiatedSubstream, ProtocolNotification, StreamId}, @@ -72,7 +72,8 @@ pub struct NetworkingWorker where TMsg: MessageSpec, TMsg::Message: prost::Message + Default + Clone + 'static, - TMsg::GossipMessage: prost::Message + Default + Clone + 'static, + TMsg::TransactionGossipMessage: prost::Message + Default + Clone + 'static, + TMsg::ConsensusGossipMessage: prost::Message + Default + Clone + 'static, { _keypair: identity::Keypair, rx_request: mpsc::Receiver>, @@ -81,7 +82,6 @@ where active_connections: HashMap>, pending_substream_requests: HashMap>>, pending_dial_requests: HashMap>>, - gossip_message_codec: ProstCodec, substream_notifiers: Notifiers, swarm: TariSwarm>, config: crate::Config, @@ -95,7 +95,8 @@ impl NetworkingWorker where TMsg: MessageSpec, TMsg::Message: prost::Message + Default + Clone + 'static, - TMsg::GossipMessage: prost::Message + Default + Clone + 'static, + TMsg::TransactionGossipMessage: prost::Message + Default + Clone + 'static, + TMsg::ConsensusGossipMessage: prost::Message + Default + Clone + 'static, { pub(crate) fn new( keypair: identity::Keypair, @@ -116,7 +117,6 @@ where active_connections: HashMap::new(), pending_substream_requests: HashMap::new(), pending_dial_requests: HashMap::new(), - gossip_message_codec: ProstCodec::default(), relays: RelayState::new(known_relay_nodes), swarm, config, @@ -264,22 +264,15 @@ where topic, message, reply_tx, - } => { - let mut buf = Vec::with_capacity(1024); - self.gossip_message_codec - .encode_to(&mut buf, message) - .await - .map_err(NetworkingError::CodecError)?; - match self.swarm.behaviour_mut().gossipsub.publish(topic, buf) { - Ok(msg_id) => { - debug!(target: LOG_TARGET, "📢 Published gossipsub message: {}", msg_id); - let _ignore = reply_tx.send(Ok(())); - }, - Err(err) => { - debug!(target: LOG_TARGET, "🚨 Failed to publish gossipsub message: {}", err); - let _ignore = reply_tx.send(Err(err.into())); - }, - } + } => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) { + Ok(msg_id) => { + debug!(target: LOG_TARGET, "📢 Published gossipsub message: {}", msg_id); + let _ignore = reply_tx.send(Ok(())); + }, + Err(err) => { + debug!(target: LOG_TARGET, "🚨 Failed to publish gossipsub message: {}", err); + let _ignore = reply_tx.send(Err(err.into())); + }, }, NetworkingRequest::SubscribeTopic { topic, reply_tx } => { match self.swarm.behaviour_mut().gossipsub.subscribe(&topic) { @@ -636,28 +629,21 @@ where }, } } else { - match self - .gossip_message_codec - .decode_from(&mut message.data.as_slice()) - .await - { - Ok((length, msg)) => { - info!(target: LOG_TARGET, "📢 Rx Gossipsub: {length} bytes from {source}"); - let _ignore = self.messaging_mode.send_gossip_message(source, msg); + match self.messaging_mode.send_gossip_message(source, message) { + Ok(_) => { self.swarm.behaviour_mut().gossipsub.report_message_validation_result( &message_id, &propagation_source, gossipsub::MessageAcceptance::Accept, )?; }, - Err(err) => { - warn!(target: LOG_TARGET, "📢 Gossipsub message failed to decode: {}", err); + Err(e) => { + warn!(target: LOG_TARGET, "📢 Gossipsub message failed to be handled: {}", e); self.swarm.behaviour_mut().gossipsub.report_message_validation_result( &message_id, &propagation_source, gossipsub::MessageAcceptance::Reject, )?; - return Err(NetworkingError::CodecError(err)); }, } } diff --git a/networking/libp2p-messaging/src/codec/mod.rs b/networking/libp2p-messaging/src/codec/mod.rs index 0be9637f1..e54d404f7 100644 --- a/networking/libp2p-messaging/src/codec/mod.rs +++ b/networking/libp2p-messaging/src/codec/mod.rs @@ -18,11 +18,11 @@ pub trait Codec: Default { /// Reads a message from the given I/O stream according to the /// negotiated protocol. - async fn decode_from(&mut self, reader: &mut R) -> io::Result<(usize, Self::Message)> + async fn decode_from(&self, reader: &mut R) -> io::Result<(usize, Self::Message)> where R: AsyncRead + Unpin + Send; /// Writes a request to the given I/O stream according to the /// negotiated protocol. - async fn encode_to(&mut self, writer: &mut W, message: Self::Message) -> io::Result<()> + async fn encode_to(&self, writer: &mut W, message: Self::Message) -> io::Result<()> where W: AsyncWrite + Unpin + Send; } diff --git a/networking/libp2p-messaging/src/codec/prost.rs b/networking/libp2p-messaging/src/codec/prost.rs index 6b0af47e1..65ab9fd04 100644 --- a/networking/libp2p-messaging/src/codec/prost.rs +++ b/networking/libp2p-messaging/src/codec/prost.rs @@ -26,7 +26,7 @@ where TMsg: prost::Message + Default { type Message = TMsg; - async fn decode_from(&mut self, reader: &mut R) -> std::io::Result<(usize, Self::Message)> + async fn decode_from(&self, reader: &mut R) -> std::io::Result<(usize, Self::Message)> where R: AsyncRead + Unpin + Send { let mut len_buf = [0u8; 4]; reader.read_exact(&mut len_buf).await?; @@ -49,7 +49,7 @@ where TMsg: prost::Message + Default Ok((len, message)) } - async fn encode_to(&mut self, writer: &mut W, message: Self::Message) -> std::io::Result<()> + async fn encode_to(&self, writer: &mut W, message: Self::Message) -> std::io::Result<()> where W: AsyncWrite + Unpin + Send { let mut buf = Vec::new(); message diff --git a/networking/libp2p-messaging/src/handler.rs b/networking/libp2p-messaging/src/handler.rs index d29ebb9ee..39ae455c3 100644 --- a/networking/libp2p-messaging/src/handler.rs +++ b/networking/libp2p-messaging/src/handler.rs @@ -120,7 +120,7 @@ where TCodec: Codec + Send + Clone + 'static } fn on_fully_negotiated_outbound(&mut self, outbound: FullyNegotiatedOutbound, ()>) { - let mut codec = self.codec.clone(); + let codec = self.codec.clone(); let (mut peer_stream, _protocol) = outbound.protocol; let mut msg_stream = self @@ -167,7 +167,7 @@ where TCodec: Codec + Send + Clone + 'static } fn on_fully_negotiated_inbound(&mut self, inbound: FullyNegotiatedInbound, ()>) { - let mut codec = self.codec.clone(); + let codec = self.codec.clone(); let peer_id = self.peer_id; let (mut stream, _protocol) = inbound.protocol; let mut events = self.pending_events_sender.clone();