Skip to content

Commit

Permalink
fix: remove epoch manager calls from consensus gossipsub (#1190)
Browse files Browse the repository at this point in the history
Description
---
fix: remove epoch manager calls from consensus gossipsub

Motivation and Context
---
Several epoch manager calls were used for message broadcasts, however,
all required info obtained from these calls is already available at the
consensus level, this PR removes the epoch manager dependency from
consensus gossip only using the epoch manager events receiver.

Also directly encode and send the gossip message in the handle instead
of first routing the message through the consensus gossip service and
then to networking. The consensus gossip service is only responsible for
ensuring that the node is subscribed to the correct shard group topic.

Increased gossipsub message limit to 1MiB to account for large foreign
proposal messages which were rejected when testing. This is temporary
and improvements to the protocol should allow us to reduce this.

How Has This Been Tested?
---
Manually

What process can a PR reviewer use to test or verify this change?
---
Slight and certainly unobservable performance improvements, this is a
simplification PR

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Oct 28, 2024
1 parent d1d0d88 commit f45c0c1
Show file tree
Hide file tree
Showing 24 changed files with 182 additions and 263 deletions.
7 changes: 1 addition & 6 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
)
.await?;

let mut epoch_manager_events = services.epoch_manager.subscribe().await.map_err(|e| {
ExitError::new(
ExitCode::ConfigError,
format!("Epoch manager crashed on startup: {}", e),
)
})?;
let mut epoch_manager_events = services.epoch_manager.subscribe();

let substate_cache_dir = config.common.base_path.join("substate_cache");
let substate_cache = SubstateFileCache::new(substate_cache_dir)
Expand Down
88 changes: 59 additions & 29 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr, time::Duration};
use std::{
collections::{HashMap, HashSet},
fs::File,
path::PathBuf,
str::FromStr,
time::Duration,
};

use anyhow::{anyhow, Context};
use log::info;
Expand Down Expand Up @@ -70,43 +76,57 @@ impl ProcessManager {
sleep(Duration::from_secs(self.instance_manager.num_instances() as u64)).await;
self.check_instances_running()?;

if !self.skip_registration {
let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}

let mut templates_to_register = vec![];
if !self.disable_template_auto_register {
let registered_templates = self.registered_templates().await?;
let registered_template_names: Vec<String> = registered_templates
let registered_template_names = registered_templates
.iter()
.map(|template_data| format!("{}-{}", template_data.name, template_data.version))
.collect();
.map(|template_data| template_data.name.as_str())
.collect::<HashSet<_>>();
let fs_templates = self.file_system_templates().await?;
for template_data in fs_templates.iter().filter(|fs_template_data| {
!registered_template_names.contains(&format!("{}-{}", fs_template_data.name, fs_template_data.version))
}) {
for template_data in fs_templates
.iter()
.filter(|fs_template_data| !registered_template_names.contains(fs_template_data.name.as_str()))
{
info!(
"🟡 Register missing template from local file system: {}",
template_data.name
);
self.register_template(TemplateData {
templates_to_register.push(TemplateData {
name: template_data.name.clone(),
version: template_data.version,
contents_hash: template_data.contents_hash,
contents_url: template_data.contents_url.clone(),
})
.await?;
});
}
}

let num_vns = if self.skip_registration {
0
} else {
self.instance_manager.num_validator_nodes()
};
let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap();

// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

if !self.skip_registration {
self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}
for templates in templates_to_register {
self.register_template(templates).await?;
}

if num_blocks > 0 {
self.mine(20).await?;
}

Ok(())
}

Expand Down Expand Up @@ -310,9 +330,15 @@ impl ProcessManager {
}
},
RegisterTemplate { data, reply } => {
let result = self.register_template(data).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
if let Err(err) = self.register_template(data).await {
if reply.send(Err(err)).is_err() {
log::warn!("Request cancelled before response could be sent")
}
} else {
let result = self.mine(10).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
}
}
},
RegisterValidatorNode { instance_id, reply } => {
Expand Down Expand Up @@ -421,7 +447,6 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(20).await?;
Ok(())
}

Expand Down Expand Up @@ -462,6 +487,9 @@ impl ProcessManager {
}

async fn mine(&mut self, blocks: u64) -> anyhow::Result<()> {
if blocks == 0 {
return Ok(());
}
let executable = self
.executable_manager
.get_executable(InstanceType::MinoTariMiner)
Expand Down Expand Up @@ -510,13 +538,15 @@ impl ProcessManager {
.await?
.into_inner();
let template_address = TemplateAddress::try_from_vec(resp.template_address).unwrap();
info!("🟢 Registered template {template_address}. Mining some blocks");
self.mine(10).await?;
info!("🟢 Registered template {template_address}.");

Ok(())
}

async fn wait_for_wallet_funds(&mut self, min_expected_blocks: u64) -> anyhow::Result<()> {
if min_expected_blocks == 0 {
return Ok(());
}
// WARN: Assumes one wallet
let wallet = self.instance_manager.minotari_wallets().next().ok_or_else(|| {
anyhow!("No MinoTariConsoleWallet instances found. Please start a wallet before waiting for funds")
Expand Down
12 changes: 9 additions & 3 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ use tari_engine_types::{
substate::{SubstateId, SubstateValue},
vault::Vault,
};
use tari_epoch_manager::base_layer::{EpochManagerConfig, EpochManagerHandle};
use tari_epoch_manager::{
base_layer::{EpochManagerConfig, EpochManagerHandle},
EpochManagerReader,
};
use tari_indexer_lib::substate_scanner::SubstateScanner;
use tari_networking::{MessagingMode, NetworkingHandle, RelayCircuitLimits, RelayReservationLimits, SwarmConfig};
use tari_rpc_framework::RpcServer;
Expand Down Expand Up @@ -250,8 +253,11 @@ pub async fn spawn_services(
};

// Consensus gossip
let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) =
consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages);
let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = consensus_gossip::spawn(
epoch_manager.subscribe(),
networking.clone(),
rx_consensus_gossip_messages,
);
handles.push(join_handle);

// Messaging
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl DanNode {

pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> {
let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events();
let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?;
let mut epoch_manager_events = self.services.epoch_manager.subscribe();

// if let Err(err) = self.dial_local_shard_peers().await {
// error!(target: LOG_TARGET, "Failed to dial local shard peers: {}", err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@

use tari_epoch_manager::EpochManagerError;
use tari_networking::NetworkingError;
use tokio::sync::{mpsc, oneshot};

use super::ConsensusGossipRequest;
use tokio::sync::oneshot;

#[derive(thiserror::Error, Debug)]
pub enum ConsensusGossipError {
Expand All @@ -38,12 +36,6 @@ pub enum ConsensusGossipError {
NetworkingError(#[from] NetworkingError),
}

impl From<mpsc::error::SendError<ConsensusGossipRequest>> for ConsensusGossipError {
fn from(_: mpsc::error::SendError<ConsensusGossipRequest>) -> Self {
Self::RequestCancelled
}
}

impl From<oneshot::error::RecvError> for ConsensusGossipError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::RequestCancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,56 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_consensus::messages::HotstuffMessage;
use tari_dan_common_types::ShardGroup;
use tokio::sync::{mpsc, oneshot};
use tari_dan_p2p::{proto, TariMessagingSpec};
use tari_networking::{NetworkingHandle, NetworkingService};
use tari_swarm::messaging::{
prost::{Message, ProstCodec},
Codec,
};

use super::ConsensusGossipError;
use crate::p2p::services::consensus_gossip::service::shard_group_to_topic;

pub enum ConsensusGossipRequest {
Multicast {
shard_group: ShardGroup,
message: HotstuffMessage,
reply: oneshot::Sender<Result<(), ConsensusGossipError>>,
},
GetLocalShardGroup {
reply: oneshot::Sender<Result<Option<ShardGroup>, ConsensusGossipError>>,
},
}
const LOG_TARGET: &str = "tari::validator_node::consensus_gossip";

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConsensusGossipHandle {
tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>,
}

impl Clone for ConsensusGossipHandle {
fn clone(&self) -> Self {
ConsensusGossipHandle {
tx_consensus_request: self.tx_consensus_request.clone(),
}
}
networking: NetworkingHandle<TariMessagingSpec>,
codec: ProstCodec<proto::consensus::HotStuffMessage>,
}

impl ConsensusGossipHandle {
pub(super) fn new(tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>) -> Self {
Self { tx_consensus_request }
pub(super) fn new(networking: NetworkingHandle<TariMessagingSpec>) -> Self {
Self {
networking,
codec: ProstCodec::default(),
}
}

pub async fn multicast(
&self,
&mut self,
shard_group: ShardGroup,
message: HotstuffMessage,
) -> Result<(), ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::Multicast {
shard_group,
message,
reply: tx,
})
.await?;
let topic = shard_group_to_topic(shard_group);

rx.await?
}
let message = proto::consensus::HotStuffMessage::from(&message);
let mut buf = Vec::with_capacity(message.encoded_len());

debug!(
target: LOG_TARGET,
"multicast: topic: {} Message size: {}bytes", topic, buf.len()
);
self.codec
.encode_to(&mut buf, message)
.await
.map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?;

pub async fn get_local_shard_group(&self) -> Result<Option<ShardGroup>, ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx })
.await?;
self.networking.publish_gossip(topic, buf).await?;

rx.await?
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@

use libp2p::{gossipsub, PeerId};
use log::*;
use tari_dan_common_types::PeerAddress;
use tari_dan_p2p::{proto, TariMessagingSpec};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_epoch_manager::EpochManagerEvent;
use tari_networking::NetworkingHandle;
use tokio::{sync::mpsc, task, task::JoinHandle};
use tokio::{
sync::{broadcast, mpsc},
task,
task::JoinHandle,
};

use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle};

const LOG_TARGET: &str = "tari::dan::validator_node::mempool";
const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::initializer";

pub fn spawn(
epoch_manager: EpochManagerHandle<PeerAddress>,
epoch_manager_events: broadcast::Receiver<EpochManagerEvent>,
networking: NetworkingHandle<TariMessagingSpec>,
rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>,
) -> (
ConsensusGossipHandle,
JoinHandle<anyhow::Result<()>>,
mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>,
) {
let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10);
let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10);

let consensus_gossip = ConsensusGossipService::new(
rx_consensus_request,
epoch_manager,
networking,
rx_gossip,
tx_consensus_gossip,
);
let handle = ConsensusGossipHandle::new(tx_consensus_request);
let consensus_gossip =
ConsensusGossipService::new(epoch_manager_events, networking.clone(), rx_gossip, tx_consensus_gossip);
let handle = ConsensusGossipHandle::new(networking);

let join_handle = task::spawn(consensus_gossip.run());
debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod error;
pub use error::*;

mod handle;
pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest};
pub use handle::ConsensusGossipHandle;

mod initializer;
pub use initializer::spawn;
Expand Down
Loading

0 comments on commit f45c0c1

Please sign in to comment.