diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index 8bec9cb34..290994c01 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -224,13 +224,12 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow } async fn handle_epoch_manager_event(services: &Services, event: EpochManagerEvent) -> Result<(), anyhow::Error> { - if let EpochManagerEvent::EpochChanged(epoch) = event { - let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?; - services - .networking - .set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id())) - .await?; - } + let EpochManagerEvent::EpochChanged { epoch, .. } = event; + let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?; + services + .networking + .set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id())) + .await?; Ok(()) } diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index e57d7206c..f9b498945 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -75,13 +75,12 @@ impl DanNode { } async fn handle_epoch_manager_event(&mut self, event: EpochManagerEvent) -> Result<(), anyhow::Error> { - if let EpochManagerEvent::EpochChanged(epoch) = event { - let all_vns = self.services.epoch_manager.get_all_validator_nodes(epoch).await?; - self.services - .networking - .set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id())) - .await?; - } + let EpochManagerEvent::EpochChanged { epoch, .. } = event; + let all_vns = self.services.epoch_manager.get_all_validator_nodes(epoch).await?; + self.services + .networking + .set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id())) + .await?; Ok(()) } diff --git a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs index fa492738a..874717cfe 100644 --- a/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs +++ b/applications/tari_validator_node/src/p2p/services/consensus_gossip/service.rs @@ -71,7 +71,8 @@ impl ConsensusGossipService { } }, Ok(event) = self.epoch_manager_events.recv() => { - if let EpochManagerEvent::ThisValidatorIsRegistered{shard_group, ..} = event { + let EpochManagerEvent::EpochChanged{ registered_shard_group, ..} = event ; + if let Some(shard_group) = registered_shard_group{ self.subscribe(shard_group).await?; } }, diff --git a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs index 1d3c2a1ba..d106611b0 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs @@ -87,10 +87,9 @@ impl MempoolGossip { } } - pub async fn subscribe(&mut self, epoch: Epoch) -> Result<(), MempoolError> { - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; + pub async fn subscribe(&mut self, shard_group: ShardGroup) -> Result<(), MempoolError> { match self.is_subscribed { - Some(b) if b == committee_shard.shard_group() => { + Some(b) if b == shard_group => { return Ok(()); }, Some(_) => { @@ -100,9 +99,9 @@ impl MempoolGossip { } self.networking - .subscribe_topic(shard_group_to_topic(committee_shard.shard_group())) + .subscribe_topic(shard_group_to_topic(shard_group)) .await?; - self.is_subscribed = Some(committee_shard.shard_group()); + self.is_subscribed = Some(shard_group); Ok(()) } diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index a8d5b821f..45a026c22 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -101,11 +101,10 @@ where TValidator: Validator { - if let EpochManagerEvent::EpochChanged(epoch) = event { - if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await?{ - info!(target: LOG_TARGET, "Mempool service subscribing transaction messages for epoch {}", epoch); - self.gossip.subscribe(epoch).await?; - } + let EpochManagerEvent::EpochChanged { epoch, registered_shard_group} = event; + if let Some(shard_group) = registered_shard_group { + info!(target: LOG_TARGET, "Mempool service subscribing transaction messages for {shard_group} in {epoch}"); + self.gossip.subscribe(shard_group).await?; } }, diff --git a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs index 633e17666..c5577aab3 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/idle.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/idle.rs @@ -45,7 +45,7 @@ where TSpec: ConsensusSpec event = epoch_events.recv() => { match event { Ok(event) => { - if let Some(event) = self.on_epoch_event(context, event).await? { + if let Some(event) = self.on_epoch_event( event).await? { return Ok(event); } }, @@ -78,20 +78,18 @@ where TSpec: ConsensusSpec Ok(is_registered) } - async fn on_epoch_event( - &self, - context: &mut ConsensusWorkerContext, - event: EpochManagerEvent, - ) -> Result, HotStuffError> { + async fn on_epoch_event(&self, event: EpochManagerEvent) -> Result, HotStuffError> { match event { - EpochManagerEvent::EpochChanged(epoch) => { - if self.is_registered_for_epoch(context, epoch).await? { + EpochManagerEvent::EpochChanged { + epoch, + registered_shard_group, + } => { + if registered_shard_group.is_some() { Ok(Some(ConsensusStateEvent::RegisteredForEpoch { epoch })) } else { Ok(None) } }, - EpochManagerEvent::ThisValidatorIsRegistered { .. } => Ok(None), } } } diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index cd25d2a7d..e1ff6fa29 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -534,8 +534,11 @@ impl HotstuffWorker { async fn on_epoch_manager_event(&mut self, event: EpochManagerEvent) -> Result<(), HotStuffError> { match event { - EpochManagerEvent::EpochChanged(epoch) => { - if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { + EpochManagerEvent::EpochChanged { + epoch, + registered_shard_group, + } => { + if registered_shard_group.is_none() { info!( target: LOG_TARGET, "💤 This validator is not registered for epoch {}. Going to sleep.", epoch @@ -554,7 +557,6 @@ impl HotstuffWorker { // If we can propose a block end, let's not wait for the block time to do it // self.pacemaker.beat(); }, - EpochManagerEvent::ThisValidatorIsRegistered { .. } => {}, } Ok(()) diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index c4a5d133b..863853a02 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -37,7 +37,7 @@ impl TestEpochManager { } } - pub async fn set_current_epoch(&mut self, current_epoch: Epoch) -> &Self { + pub async fn set_current_epoch(&mut self, current_epoch: Epoch, shard_group: ShardGroup) -> &Self { self.current_epoch = current_epoch; { let mut lock = self.inner.lock().await; @@ -45,9 +45,10 @@ impl TestEpochManager { lock.is_epoch_active = true; } - let _ = self - .tx_epoch_events - .send(EpochManagerEvent::EpochChanged(current_epoch)); + let _ = self.tx_epoch_events.send(EpochManagerEvent::EpochChanged { + epoch: current_epoch, + registered_shard_group: Some(shard_group), + }); self } diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 712c53e8b..f4533cff5 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -289,7 +289,10 @@ impl Test { info!("🌟 Starting {epoch}"); for validator in self.validators.values_mut() { // Fire off initial epoch change event so that the pacemaker starts - validator.epoch_manager.set_current_epoch(epoch).await; + validator + .epoch_manager + .set_current_epoch(epoch, validator.shard_group) + .await; } self.wait_for_all_validators_to_start_consensus().await; diff --git a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs index 2c75d37d7..2f3bbc0dc 100644 --- a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs +++ b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs @@ -139,7 +139,7 @@ impl let vns = validator_nodes.get_all_within_epoch(epoch, self.config.validator_node_sidechain_id.as_ref())?; let num_committees = calculate_num_committees(vns.len() as u64, self.config.committee_size); - for vn in &vns { + for vn in vns { validator_nodes.set_committee_shard( vn.shard_key, vn.shard_key.to_shard_group(self.config.num_preshards, num_committees), @@ -149,13 +149,6 @@ impl } tx.commit()?; - if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) { - self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered { - epoch, - shard_group: vn.shard_key.to_shard_group(self.config.num_preshards, num_committees), - shard_key: vn.shard_key, - }); - } Ok(()) } @@ -550,7 +543,16 @@ impl } } - self.publish_event(EpochManagerEvent::EpochChanged(self.current_epoch)); + let num_committees = self.get_number_of_committees(self.current_epoch)?; + let shard_group = self + .get_our_validator_node(self.current_epoch) + .optional()? + .map(|vn| vn.shard_key.to_shard_group(self.config.num_preshards, num_committees)); + + self.publish_event(EpochManagerEvent::EpochChanged { + epoch: self.current_epoch, + registered_shard_group: shard_group, + }); Ok(()) } diff --git a/dan_layer/epoch_manager/src/event.rs b/dan_layer/epoch_manager/src/event.rs index 66d61ec93..5ecc96f3e 100644 --- a/dan_layer/epoch_manager/src/event.rs +++ b/dan_layer/epoch_manager/src/event.rs @@ -1,14 +1,13 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_dan_common_types::{Epoch, ShardGroup, SubstateAddress}; +use tari_dan_common_types::{Epoch, ShardGroup}; #[derive(Debug, Clone)] pub enum EpochManagerEvent { - EpochChanged(Epoch), - ThisValidatorIsRegistered { + EpochChanged { epoch: Epoch, - shard_group: ShardGroup, - shard_key: SubstateAddress, + /// Some if the local validator is registered for the epoch, otherwise None + registered_shard_group: Option, }, }