Skip to content

Commit

Permalink
fix(gossip): bug vn might not be subscribed to consensus messages (#1191
Browse files Browse the repository at this point in the history
)

Description
---
fix(gossip): bug vn might not be subscribed to consensus messages
Removed unused `ThisValidatorIsRegistered` epoch manager event
Added `registered_shard_group` to `EpochChanged` event that is Some when
local VN is registered for the epoch

Motivation and Context
---
Bug introduced in #1190 where VNs might not be subscribed to consensus
messages.

How Has This Been Tested?
---
Manually, validator node that has been shutdown and restarted without
the epoch changing was not subscribed to consensus messages

What process can a PR reviewer use to test or verify this change?
---
As above

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Oct 29, 2024
1 parent f45c0c1 commit d1c8422
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 56 deletions.
13 changes: 6 additions & 7 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,12 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
}

async fn handle_epoch_manager_event(services: &Services, event: EpochManagerEvent) -> Result<(), anyhow::Error> {
if let EpochManagerEvent::EpochChanged(epoch) = event {
let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?;
services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;
}
let EpochManagerEvent::EpochChanged { epoch, .. } = event;
let all_vns = services.epoch_manager.get_all_validator_nodes(epoch).await?;
services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;

Ok(())
}
Expand Down
13 changes: 6 additions & 7 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ impl DanNode {
}

async fn handle_epoch_manager_event(&mut self, event: EpochManagerEvent) -> Result<(), anyhow::Error> {
if let EpochManagerEvent::EpochChanged(epoch) = event {
let all_vns = self.services.epoch_manager.get_all_validator_nodes(epoch).await?;
self.services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;
}
let EpochManagerEvent::EpochChanged { epoch, .. } = event;
let all_vns = self.services.epoch_manager.get_all_validator_nodes(epoch).await?;
self.services
.networking
.set_want_peers(all_vns.into_iter().map(|vn| vn.address.as_peer_id()))
.await?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl ConsensusGossipService {
}
},
Ok(event) = self.epoch_manager_events.recv() => {
if let EpochManagerEvent::ThisValidatorIsRegistered{shard_group, ..} = event {
let EpochManagerEvent::EpochChanged{ registered_shard_group, ..} = event ;
if let Some(shard_group) = registered_shard_group{
self.subscribe(shard_group).await?;
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ impl MempoolGossip<PeerAddress> {
}
}

pub async fn subscribe(&mut self, epoch: Epoch) -> Result<(), MempoolError> {
let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
pub async fn subscribe(&mut self, shard_group: ShardGroup) -> Result<(), MempoolError> {
match self.is_subscribed {
Some(b) if b == committee_shard.shard_group() => {
Some(b) if b == shard_group => {
return Ok(());
},
Some(_) => {
Expand All @@ -100,9 +99,9 @@ impl MempoolGossip<PeerAddress> {
}

self.networking
.subscribe_topic(shard_group_to_topic(committee_shard.shard_group()))
.subscribe_topic(shard_group_to_topic(shard_group))
.await?;
self.is_subscribed = Some(committee_shard.shard_group());
self.is_subscribed = Some(shard_group);
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
}
}
Ok(event) = events.recv() => {
if let EpochManagerEvent::EpochChanged(epoch) = event {
if self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await?{
info!(target: LOG_TARGET, "Mempool service subscribing transaction messages for epoch {}", epoch);
self.gossip.subscribe(epoch).await?;
}
let EpochManagerEvent::EpochChanged { epoch, registered_shard_group} = event;
if let Some(shard_group) = registered_shard_group {
info!(target: LOG_TARGET, "Mempool service subscribing transaction messages for {shard_group} in {epoch}");
self.gossip.subscribe(shard_group).await?;
}
},

Expand Down
16 changes: 7 additions & 9 deletions dan_layer/consensus/src/hotstuff/state_machine/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where TSpec: ConsensusSpec
event = epoch_events.recv() => {
match event {
Ok(event) => {
if let Some(event) = self.on_epoch_event(context, event).await? {
if let Some(event) = self.on_epoch_event( event).await? {
return Ok(event);
}
},
Expand Down Expand Up @@ -78,20 +78,18 @@ where TSpec: ConsensusSpec
Ok(is_registered)
}

async fn on_epoch_event(
&self,
context: &mut ConsensusWorkerContext<TSpec>,
event: EpochManagerEvent,
) -> Result<Option<ConsensusStateEvent>, HotStuffError> {
async fn on_epoch_event(&self, event: EpochManagerEvent) -> Result<Option<ConsensusStateEvent>, HotStuffError> {
match event {
EpochManagerEvent::EpochChanged(epoch) => {
if self.is_registered_for_epoch(context, epoch).await? {
EpochManagerEvent::EpochChanged {
epoch,
registered_shard_group,
} => {
if registered_shard_group.is_some() {
Ok(Some(ConsensusStateEvent::RegisteredForEpoch { epoch }))
} else {
Ok(None)
}
},
EpochManagerEvent::ThisValidatorIsRegistered { .. } => Ok(None),
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,11 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {

async fn on_epoch_manager_event(&mut self, event: EpochManagerEvent) -> Result<(), HotStuffError> {
match event {
EpochManagerEvent::EpochChanged(epoch) => {
if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? {
EpochManagerEvent::EpochChanged {
epoch,
registered_shard_group,
} => {
if registered_shard_group.is_none() {
info!(
target: LOG_TARGET,
"💤 This validator is not registered for epoch {}. Going to sleep.", epoch
Expand All @@ -554,7 +557,6 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
// If we can propose a block end, let's not wait for the block time to do it
// self.pacemaker.beat();
},
EpochManagerEvent::ThisValidatorIsRegistered { .. } => {},
}

Ok(())
Expand Down
9 changes: 5 additions & 4 deletions dan_layer/consensus_tests/src/support/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ impl TestEpochManager {
}
}

pub async fn set_current_epoch(&mut self, current_epoch: Epoch) -> &Self {
pub async fn set_current_epoch(&mut self, current_epoch: Epoch, shard_group: ShardGroup) -> &Self {
self.current_epoch = current_epoch;
{
let mut lock = self.inner.lock().await;
lock.current_epoch = current_epoch;
lock.is_epoch_active = true;
}

let _ = self
.tx_epoch_events
.send(EpochManagerEvent::EpochChanged(current_epoch));
let _ = self.tx_epoch_events.send(EpochManagerEvent::EpochChanged {
epoch: current_epoch,
registered_shard_group: Some(shard_group),
});

self
}
Expand Down
5 changes: 4 additions & 1 deletion dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ impl Test {
info!("🌟 Starting {epoch}");
for validator in self.validators.values_mut() {
// Fire off initial epoch change event so that the pacemaker starts
validator.epoch_manager.set_current_epoch(epoch).await;
validator
.epoch_manager
.set_current_epoch(epoch, validator.shard_group)
.await;
}

self.wait_for_all_validators_to_start_consensus().await;
Expand Down
20 changes: 11 additions & 9 deletions dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
let vns = validator_nodes.get_all_within_epoch(epoch, self.config.validator_node_sidechain_id.as_ref())?;

let num_committees = calculate_num_committees(vns.len() as u64, self.config.committee_size);
for vn in &vns {
for vn in vns {
validator_nodes.set_committee_shard(
vn.shard_key,
vn.shard_key.to_shard_group(self.config.num_preshards, num_committees),
Expand All @@ -149,13 +149,6 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
}

tx.commit()?;
if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) {
self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered {
epoch,
shard_group: vn.shard_key.to_shard_group(self.config.num_preshards, num_committees),
shard_key: vn.shard_key,
});
}

Ok(())
}
Expand Down Expand Up @@ -550,7 +543,16 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
}
}

self.publish_event(EpochManagerEvent::EpochChanged(self.current_epoch));
let num_committees = self.get_number_of_committees(self.current_epoch)?;
let shard_group = self
.get_our_validator_node(self.current_epoch)
.optional()?
.map(|vn| vn.shard_key.to_shard_group(self.config.num_preshards, num_committees));

self.publish_event(EpochManagerEvent::EpochChanged {
epoch: self.current_epoch,
registered_shard_group: shard_group,
});

Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions dan_layer/epoch_manager/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::{Epoch, ShardGroup, SubstateAddress};
use tari_dan_common_types::{Epoch, ShardGroup};

#[derive(Debug, Clone)]
pub enum EpochManagerEvent {
EpochChanged(Epoch),
ThisValidatorIsRegistered {
EpochChanged {
epoch: Epoch,
shard_group: ShardGroup,
shard_key: SubstateAddress,
/// Some if the local validator is registered for the epoch, otherwise None
registered_shard_group: Option<ShardGroup>,
},
}

0 comments on commit d1c8422

Please sign in to comment.