Skip to content

Commit

Permalink
fix: remove epoch manager calls from consensus gossipsub
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 28, 2024
1 parent d1d0d88 commit 1701a01
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 263 deletions.
7 changes: 1 addition & 6 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
)
.await?;

let mut epoch_manager_events = services.epoch_manager.subscribe().await.map_err(|e| {
ExitError::new(
ExitCode::ConfigError,
format!("Epoch manager crashed on startup: {}", e),
)
})?;
let mut epoch_manager_events = services.epoch_manager.subscribe();

let substate_cache_dir = config.common.base_path.join("substate_cache");
let substate_cache = SubstateFileCache::new(substate_cache_dir)
Expand Down
88 changes: 59 additions & 29 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr, time::Duration};
use std::{
collections::{HashMap, HashSet},
fs::File,
path::PathBuf,
str::FromStr,
time::Duration,
};

use anyhow::{anyhow, Context};
use log::info;
Expand Down Expand Up @@ -70,43 +76,57 @@ impl ProcessManager {
sleep(Duration::from_secs(self.instance_manager.num_instances() as u64)).await;
self.check_instances_running()?;

if !self.skip_registration {
let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}

let mut templates_to_register = vec![];
if !self.disable_template_auto_register {
let registered_templates = self.registered_templates().await?;
let registered_template_names: Vec<String> = registered_templates
let registered_template_names = registered_templates
.iter()
.map(|template_data| format!("{}-{}", template_data.name, template_data.version))
.collect();
.map(|template_data| template_data.name.as_str())
.collect::<HashSet<_>>();
let fs_templates = self.file_system_templates().await?;
for template_data in fs_templates.iter().filter(|fs_template_data| {
!registered_template_names.contains(&format!("{}-{}", fs_template_data.name, fs_template_data.version))
}) {
for template_data in fs_templates
.iter()
.filter(|fs_template_data| !registered_template_names.contains(fs_template_data.name.as_str()))
{
info!(
"🟡 Register missing template from local file system: {}",
template_data.name
);
self.register_template(TemplateData {
templates_to_register.push(TemplateData {
name: template_data.name.clone(),
version: template_data.version,
contents_hash: template_data.contents_hash,
contents_url: template_data.contents_url.clone(),
})
.await?;
});
}
}

let num_vns = if self.skip_registration {
0
} else {
self.instance_manager.num_validator_nodes()
};
let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap();

// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

if !self.skip_registration {
self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}
for templates in templates_to_register {
self.register_template(templates).await?;
}

if num_blocks > 0 {
self.mine(20).await?;
}

Ok(())
}

Expand Down Expand Up @@ -310,9 +330,15 @@ impl ProcessManager {
}
},
RegisterTemplate { data, reply } => {
let result = self.register_template(data).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
if let Err(err) = self.register_template(data).await {
if reply.send(Err(err)).is_err() {
log::warn!("Request cancelled before response could be sent")
}
} else {
let result = self.mine(10).await;
if reply.send(result).is_err() {
log::warn!("Request cancelled before response could be sent")
}
}
},
RegisterValidatorNode { instance_id, reply } => {
Expand Down Expand Up @@ -421,7 +447,6 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(20).await?;
Ok(())
}

Expand Down Expand Up @@ -462,6 +487,9 @@ impl ProcessManager {
}

async fn mine(&mut self, blocks: u64) -> anyhow::Result<()> {
if blocks == 0 {
return Ok(());
}
let executable = self
.executable_manager
.get_executable(InstanceType::MinoTariMiner)
Expand Down Expand Up @@ -510,13 +538,15 @@ impl ProcessManager {
.await?
.into_inner();
let template_address = TemplateAddress::try_from_vec(resp.template_address).unwrap();
info!("🟢 Registered template {template_address}. Mining some blocks");
self.mine(10).await?;
info!("🟢 Registered template {template_address}.");

Ok(())
}

async fn wait_for_wallet_funds(&mut self, min_expected_blocks: u64) -> anyhow::Result<()> {
if min_expected_blocks == 0 {
return Ok(());
}
// WARN: Assumes one wallet
let wallet = self.instance_manager.minotari_wallets().next().ok_or_else(|| {
anyhow!("No MinoTariConsoleWallet instances found. Please start a wallet before waiting for funds")
Expand Down
12 changes: 9 additions & 3 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ use tari_engine_types::{
substate::{SubstateId, SubstateValue},
vault::Vault,
};
use tari_epoch_manager::base_layer::{EpochManagerConfig, EpochManagerHandle};
use tari_epoch_manager::{
base_layer::{EpochManagerConfig, EpochManagerHandle},
EpochManagerReader,
};
use tari_indexer_lib::substate_scanner::SubstateScanner;
use tari_networking::{MessagingMode, NetworkingHandle, RelayCircuitLimits, RelayReservationLimits, SwarmConfig};
use tari_rpc_framework::RpcServer;
Expand Down Expand Up @@ -250,8 +253,11 @@ pub async fn spawn_services(
};

// Consensus gossip
let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) =
consensus_gossip::spawn(epoch_manager.clone(), networking.clone(), rx_consensus_gossip_messages);
let (consensus_gossip_service, join_handle, rx_consensus_gossip_messages) = consensus_gossip::spawn(
epoch_manager.subscribe(),
networking.clone(),
rx_consensus_gossip_messages,
);
handles.push(join_handle);

// Messaging
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl DanNode {

pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> {
let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events();
let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?;
let mut epoch_manager_events = self.services.epoch_manager.subscribe();

// if let Err(err) = self.dial_local_shard_peers().await {
// error!(target: LOG_TARGET, "Failed to dial local shard peers: {}", err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@

use tari_epoch_manager::EpochManagerError;
use tari_networking::NetworkingError;
use tokio::sync::{mpsc, oneshot};

use super::ConsensusGossipRequest;
use tokio::sync::oneshot;

#[derive(thiserror::Error, Debug)]
pub enum ConsensusGossipError {
Expand All @@ -38,12 +36,6 @@ pub enum ConsensusGossipError {
NetworkingError(#[from] NetworkingError),
}

impl From<mpsc::error::SendError<ConsensusGossipRequest>> for ConsensusGossipError {
fn from(_: mpsc::error::SendError<ConsensusGossipRequest>) -> Self {
Self::RequestCancelled
}
}

impl From<oneshot::error::RecvError> for ConsensusGossipError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::RequestCancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,56 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_consensus::messages::HotstuffMessage;
use tari_dan_common_types::ShardGroup;
use tokio::sync::{mpsc, oneshot};
use tari_dan_p2p::{proto, TariMessagingSpec};
use tari_networking::{NetworkingHandle, NetworkingService};
use tari_swarm::messaging::{
prost::{Message, ProstCodec},
Codec,
};

use super::ConsensusGossipError;
use crate::p2p::services::consensus_gossip::service::shard_group_to_topic;

pub enum ConsensusGossipRequest {
Multicast {
shard_group: ShardGroup,
message: HotstuffMessage,
reply: oneshot::Sender<Result<(), ConsensusGossipError>>,
},
GetLocalShardGroup {
reply: oneshot::Sender<Result<Option<ShardGroup>, ConsensusGossipError>>,
},
}
const LOG_TARGET: &str = "tari::validator_node::consensus_gossip";

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConsensusGossipHandle {
tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>,
}

impl Clone for ConsensusGossipHandle {
fn clone(&self) -> Self {
ConsensusGossipHandle {
tx_consensus_request: self.tx_consensus_request.clone(),
}
}
networking: NetworkingHandle<TariMessagingSpec>,
codec: ProstCodec<proto::consensus::HotStuffMessage>,
}

impl ConsensusGossipHandle {
pub(super) fn new(tx_consensus_request: mpsc::Sender<ConsensusGossipRequest>) -> Self {
Self { tx_consensus_request }
pub(super) fn new(networking: NetworkingHandle<TariMessagingSpec>) -> Self {
Self {
networking,
codec: ProstCodec::default(),
}
}

pub async fn multicast(
&self,
&mut self,
shard_group: ShardGroup,
message: HotstuffMessage,
) -> Result<(), ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::Multicast {
shard_group,
message,
reply: tx,
})
.await?;
let topic = shard_group_to_topic(shard_group);

rx.await?
}
let message = proto::consensus::HotStuffMessage::from(&message);
let mut buf = Vec::with_capacity(message.encoded_len());

debug!(
target: LOG_TARGET,
"multicast: topic: {} Message size: {}bytes", topic, buf.len()
);
self.codec
.encode_to(&mut buf, message)
.await
.map_err(|e| ConsensusGossipError::InvalidMessage(e.into()))?;

pub async fn get_local_shard_group(&self) -> Result<Option<ShardGroup>, ConsensusGossipError> {
let (tx, rx) = oneshot::channel();
self.tx_consensus_request
.send(ConsensusGossipRequest::GetLocalShardGroup { reply: tx })
.await?;
self.networking.publish_gossip(topic, buf).await?;

rx.await?
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@

use libp2p::{gossipsub, PeerId};
use log::*;
use tari_dan_common_types::PeerAddress;
use tari_dan_p2p::{proto, TariMessagingSpec};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_epoch_manager::EpochManagerEvent;
use tari_networking::NetworkingHandle;
use tokio::{sync::mpsc, task, task::JoinHandle};
use tokio::{
sync::{broadcast, mpsc},
task,
task::JoinHandle,
};

use crate::p2p::services::consensus_gossip::{service::ConsensusGossipService, ConsensusGossipHandle};

const LOG_TARGET: &str = "tari::dan::validator_node::mempool";
const LOG_TARGET: &str = "tari::validator_node::consensus_gossip::initializer";

pub fn spawn(
epoch_manager: EpochManagerHandle<PeerAddress>,
epoch_manager_events: broadcast::Receiver<EpochManagerEvent>,
networking: NetworkingHandle<TariMessagingSpec>,
rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>,
) -> (
ConsensusGossipHandle,
JoinHandle<anyhow::Result<()>>,
mpsc::Receiver<(PeerId, proto::consensus::HotStuffMessage)>,
) {
let (tx_consensus_request, rx_consensus_request) = mpsc::channel(10);
let (tx_consensus_gossip, rx_consensus_gossip) = mpsc::channel(10);

let consensus_gossip = ConsensusGossipService::new(
rx_consensus_request,
epoch_manager,
networking,
rx_gossip,
tx_consensus_gossip,
);
let handle = ConsensusGossipHandle::new(tx_consensus_request);
let consensus_gossip =
ConsensusGossipService::new(epoch_manager_events, networking.clone(), rx_gossip, tx_consensus_gossip);
let handle = ConsensusGossipHandle::new(networking);

let join_handle = task::spawn(consensus_gossip.run());
debug!(target: LOG_TARGET, "Spawning consensus gossip service (task: {:?})", join_handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod error;
pub use error::*;

mod handle;
pub use handle::{ConsensusGossipHandle, ConsensusGossipRequest};
pub use handle::ConsensusGossipHandle;

mod initializer;
pub use initializer::spawn;
Expand Down
Loading

0 comments on commit 1701a01

Please sign in to comment.