Skip to content

Commit

Permalink
impl in progress, removing all occurences of end_epoch in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Nov 5, 2024
1 parent b60016e commit 5c8d44d
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 12,843 deletions.
12,748 changes: 0 additions & 12,748 deletions Cargo.lock

This file was deleted.

1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tari_bor = { workspace = true, default-features = true }
tari_indexer_lib = { workspace = true }
tari_networking = { workspace = true }
tari_validator_node_rpc = { workspace = true }
minotari_app_grpc = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
61 changes: 47 additions & 14 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::time::Duration;

use log::*;
use minotari_app_grpc::tari_rpc::ValidatorNodeChangeState;
use tari_base_node_client::{
grpc::GrpcBaseNodeClient,
types::{BaseLayerMetadata, BlockInfo},
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct BaseLayerScanner<TAddr> {
last_scanned_height: u64,
last_scanned_tip: Option<FixedHash>,
last_scanned_hash: Option<FixedHash>,
last_scanned_validator_node_mr: Option<FixedHash>,
next_block_hash: Option<FixedHash>,
base_node_client: GrpcBaseNodeClient,
epoch_manager: EpochManagerHandle<TAddr>,
Expand Down Expand Up @@ -141,6 +143,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
last_scanned_tip: None,
last_scanned_height: 0,
last_scanned_hash: None,
last_scanned_validator_node_mr: None,
next_block_hash: None,
base_node_client,
epoch_manager,
Expand Down Expand Up @@ -223,6 +226,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
);
// TODO: we need to figure out where the fork happened, and delete data after the fork.
self.last_scanned_hash = None;
self.last_scanned_validator_node_mr = None;
self.last_scanned_height = 0;
self.sync_blockchain().await?;
},
Expand Down Expand Up @@ -278,6 +282,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Some(end_height) => end_height,
};
let mut scan = tip.tip_hash;
let mut current_last_validator_nodes_mr = self.last_scanned_validator_node_mr;
loop {
let header = self.base_node_client.get_header_by_hash(scan).await?;
if let Some(last_tip) = self.last_scanned_tip {
Expand All @@ -290,9 +295,43 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
// This will be processed down below.
break;
}
current_last_validator_nodes_mr = Some(header.validator_node_mr.clone());
self.epoch_manager.add_block_hash(header.height, scan).await?;
scan = header.prev_hash;
}

// syncing validator node changes
let mut validator_nodes_to_register: Vec<PublicKey> = vec![];
if current_last_validator_nodes_mr != self.last_scanned_validator_node_mr {
info!(target: LOG_TARGET,
"⛓️ Syncing validator nodes (sidechain ID: {:?}) from base node (height range: {}-{})",
self.validator_node_sidechain_id,
start_scan_height,
end_height,
);

let node_changes = self
.base_node_client
.get_validator_node_changes(start_scan_height, end_height, self.validator_node_sidechain_id.as_ref())
.await
.map_err(BaseLayerScannerError::BaseNodeError)?;

for node_change in node_changes {
match node_change.state() {
ValidatorNodeChangeState::Add => {
let node_public_key =
PublicKey::from_canonical_bytes(&node_change.public_key).map_err(|error| {
// TODO: convert error
})?;
validator_nodes_to_register.push(node_public_key);
},
ValidatorNodeChangeState::Remove => {},
}
}

self.last_scanned_validator_node_mr = current_last_validator_nodes_mr;
}

for current_height in start_scan_height..=end_height {
let utxos = self
.base_node_client
Expand Down Expand Up @@ -329,26 +368,20 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
};
match sidechain_feature {
SideChainFeature::ValidatorNodeRegistration(reg) => {
info!(
target: LOG_TARGET,
"⛓️ Validator node registration UTXO for {} sidechain {} found at height {}",
reg.public_key(),
reg.sidechain_id().map(|v| v.to_hex()).unwrap_or("None".to_string()),
current_height,
);
if reg.sidechain_id() == self.validator_node_sidechain_id.as_ref() {
if validator_nodes_to_register.contains(reg.public_key()) {
info!(
target: LOG_TARGET,
"⛓️ Validator node registration UTXO for {} sidechain {} found at height {}",
reg.public_key(),
reg.sidechain_id().map(|v| v.to_hex()).unwrap_or("None".to_string()),
current_height,
);
self.register_validator_node_registration(
current_height,
reg.clone(),
output.minimum_value_promise,
)
.await?;
} else {
warn!(
target: LOG_TARGET,
"Ignoring validator node registration for sidechain ID {:?}. Expected sidechain ID: {:?}",
reg.sidechain_id().map(|v| v.to_hex()),
self.validator_node_sidechain_id.as_ref().map(|v| v.to_hex()));
}
},
SideChainFeature::CodeTemplateRegistration(reg) => {
Expand Down
55 changes: 41 additions & 14 deletions clients/base_node_client/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use std::convert::TryInto;

use async_trait::async_trait;
use log::*;
use minotari_app_grpc::tari_rpc::{self as grpc, GetShardKeyRequest};
use minotari_app_grpc::tari_rpc::{
self as grpc,
GetShardKeyRequest,
GetValidatorNodeChangesRequest,
ValidatorNodeChange,
};
use minotari_node_grpc_client::BaseNodeGrpcClient;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_core::{blocks::BlockHeader, transactions::transaction_components::CodeTemplateRegistration};
Expand Down Expand Up @@ -79,14 +84,14 @@ impl GrpcBaseNodeClient {
match stream.message().await {
Ok(Some(_val)) => {
count += 1;
}
},
Ok(None) => {
break;
}
},
Err(e) => {
warn!(target: LOG_TARGET, "Error getting mempool transaction count: {}", e);
return Err(BaseNodeClientError::ConnectionError);
}
},
}
}
Ok(count)
Expand Down Expand Up @@ -115,6 +120,28 @@ impl BaseNodeClient for GrpcBaseNodeClient {
})
}

async fn get_validator_node_changes(
&mut self,
start_height: u64,
end_height: u64,
sidechain_id: Option<&PublicKey>,
) -> Result<Vec<ValidatorNodeChange>, BaseNodeClientError> {
let client = self.connection().await?;
let result = client
.get_validator_node_changes(GetValidatorNodeChangesRequest {
start_height,
end_height,
sidechain_id: match sidechain_id {
None => vec![],
Some(sidechain_id) => sidechain_id.to_vec(),
},
})
.await?
.into_inner();

Ok(result.changes)
}

async fn get_validator_nodes(&mut self, height: u64) -> Result<Vec<BaseLayerValidatorNode>, BaseNodeClientError> {
let inner = self.connection().await?;

Expand Down Expand Up @@ -150,18 +177,18 @@ impl BaseNodeClient for GrpcBaseNodeClient {
)
}))
}
.transpose()?,
.transpose()?,
});
}
},
Ok(None) => {
break;
}
},
Err(e) => {
return Err(BaseNodeClientError::InvalidPeerMessage(format!(
"Error reading stream: {}",
e
)));
}
},
}
}

Expand Down Expand Up @@ -221,16 +248,16 @@ impl BaseNodeClient for GrpcBaseNodeClient {
BaseNodeClientError::InvalidPeerMessage("invalid template registration".to_string())
})?;
templates.push(template_registration);
}
},
Ok(None) => {
break;
}
},
Err(e) => {
return Err(BaseNodeClientError::InvalidPeerMessage(format!(
"Error reading stream: {}",
e
)));
}
},
}
}
Ok(templates)
Expand Down Expand Up @@ -302,16 +329,16 @@ impl BaseNodeClient for GrpcBaseNodeClient {
.map_err(BaseNodeClientError::InvalidPeerMessage)?,
};
responses.push(resp);
}
},
Ok(None) => {
break;
}
},
Err(e) => {
return Err(BaseNodeClientError::InvalidPeerMessage(format!(
"Error reading stream: {}",
e
)));
}
},
}
}

Expand Down
7 changes: 7 additions & 0 deletions clients/base_node_client/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use async_trait::async_trait;
use minotari_app_grpc::tari_rpc::ValidatorNodeChange;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_core::{blocks::BlockHeader, transactions::transaction_components::CodeTemplateRegistration};
use tari_dan_common_types::SubstateAddress;
Expand All @@ -15,6 +16,12 @@ use crate::{
pub trait BaseNodeClient: Send + Sync + Clone {
async fn test_connection(&mut self) -> Result<(), BaseNodeClientError>;
async fn get_tip_info(&mut self) -> Result<BaseLayerMetadata, BaseNodeClientError>;
async fn get_validator_node_changes(
&mut self,
start_height: u64,
end_height: u64,
sidechain_id: Option<&PublicKey>,
) -> Result<Vec<ValidatorNodeChange>, BaseNodeClientError>;
async fn get_validator_nodes(&mut self, height: u64) -> Result<Vec<BaseLayerValidatorNode>, BaseNodeClientError>;
async fn get_shard_key(
&mut self,
Expand Down
2 changes: 0 additions & 2 deletions clients/validator_node_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ pub struct ValidatorNode {
pub public_key: PublicKey,
pub shard_key: SubstateAddress,
pub start_epoch: Epoch,
pub end_epoch: Epoch,
#[cfg_attr(feature = "ts", ts(type = "string"))]
pub fee_claim_public_key: PublicKey,
}
Expand All @@ -525,7 +524,6 @@ impl From<models::ValidatorNode<PeerAddress>> for ValidatorNode {
public_key: value.public_key,
shard_key: value.shard_key,
start_epoch: value.start_epoch,
end_epoch: value.end_epoch,
fee_claim_public_key: value.fee_claim_public_key,
}
}
Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus_tests/src/support/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl TestEpochManager {
shard_key,
registered_at_base_height: our_validator_node.registered_at_base_height,
start_epoch: our_validator_node.start_epoch,
end_epoch: our_validator_node.end_epoch,
fee_claim_public_key: PublicKey::default(),
sidechain_id: None,
});
Expand All @@ -77,7 +76,6 @@ impl TestEpochManager {
shard_key,
registered_at_base_height: 0,
start_epoch: Epoch(0),
end_epoch: Epoch(1),
fee_claim_public_key: PublicKey::default(),
sidechain_id: None,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct BaseLayerEpochManager<TGlobalStore, TBaseNodeClient> {
}

impl<TAddr: NodeAddressable + DerivableFromPublicKey>
BaseLayerEpochManager<SqliteGlobalDbAdapter<TAddr>, GrpcBaseNodeClient>
BaseLayerEpochManager<SqliteGlobalDbAdapter<TAddr>, GrpcBaseNodeClient>
{
pub fn new(
config: EpochManagerConfig,
Expand Down Expand Up @@ -563,22 +563,6 @@ BaseLayerEpochManager<SqliteGlobalDbAdapter<TAddr>, GrpcBaseNodeClient>
}
}

pub async fn remaining_registration_epochs(&mut self) -> Result<Option<Epoch>, EpochManagerError> {
let last_registration_epoch = match self.last_registration_epoch()? {
Some(epoch) => epoch,
None => return Ok(None),
};

let constants = self.get_base_layer_consensus_constants().await?;
let expiry = constants.validator_node_registration_expiry();

// Note this can be negative in some cases
let num_blocks_since_last_reg = self.current_epoch.saturating_sub(last_registration_epoch);

// None indicates that we are not registered, or a previous registration has expired
Ok(expiry.checked_sub(num_blocks_since_last_reg))
}

pub fn get_our_validator_node(&self, epoch: Epoch) -> Result<ValidatorNode<TAddr>, EpochManagerError> {
let vn = self
.get_validator_node_by_public_key(epoch, &self.node_public_key)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey + 'static>
EpochManagerRequest::WaitForInitialScanningToComplete { reply } => {
self.inner.add_notify_on_scanning_complete(reply);
},
EpochManagerRequest::RemainingRegistrationEpochs { reply } => {
handle(reply, self.inner.remaining_registration_epochs().await, context)
},
EpochManagerRequest::GetBaseLayerConsensusConstants { reply } => handle(
reply,
self.inner.get_base_layer_consensus_constants().await.cloned(),
Expand Down
10 changes: 0 additions & 10 deletions dan_layer/epoch_manager/src/base_layer/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ impl<TAddr: NodeAddressable> EpochManagerHandle<TAddr> {
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

/// Returns the number of epochs remaining for the current registration if registered, otherwise None
pub async fn remaining_registration_epochs(&self) -> Result<Option<Epoch>, EpochManagerError> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(EpochManagerRequest::RemainingRegistrationEpochs { reply: tx })
.await
.map_err(|_| EpochManagerError::SendError)?;
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

pub async fn add_validator_node_registration(
&self,
block_height: u64,
Expand Down
3 changes: 0 additions & 3 deletions dan_layer/epoch_manager/src/base_layer/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ pub enum EpochManagerRequest<TAddr> {
WaitForInitialScanningToComplete {
reply: Reply<()>,
},
RemainingRegistrationEpochs {
reply: Reply<Option<Epoch>>,
},
GetBaseLayerConsensusConstants {
reply: Reply<BaseLayerConsensusConstants>,
},
Expand Down
1 change: 0 additions & 1 deletion dan_layer/storage/src/global/models/validator_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct ValidatorNode<TAddr> {
pub shard_key: SubstateAddress,
pub registered_at_base_height: u64,
pub start_epoch: Epoch,
pub end_epoch: Epoch,
pub fee_claim_public_key: PublicKey,
pub sidechain_id: Option<PublicKey>,
}
Expand Down
Loading

0 comments on commit 5c8d44d

Please sign in to comment.