Skip to content

Commit

Permalink
fixes for global transaction, added new consensus test
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Dec 27, 2024
1 parent 20726d7 commit d436226
Show file tree
Hide file tree
Showing 49 changed files with 452 additions and 242 deletions.
3 changes: 3 additions & 0 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
target: LOG_TARGET,
"🌠 new template found with address {} at height {}", template_address, block_info.height
);
let consensus_constants = self.epoch_manager.get_base_layer_consensus_constants().await?;
let epoch = consensus_constants.height_to_epoch(block_info.height);
self.template_manager
.add_template(
registration.author_public_key,
Expand All @@ -484,6 +486,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
registration.binary_sha,
),
Some(template_name),
epoch,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use chrono::Utc;
use log::*;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_common_types::{optional::Optional, services::template_provider::TemplateProvider, NodeAddressable};
use tari_dan_common_types::{
optional::Optional,
services::template_provider::TemplateProvider,
Epoch,
NodeAddressable,
};
use tari_dan_engine::{
flow::FlowFactory,
function_definitions::FlowFunctionDefinition,
Expand Down Expand Up @@ -192,6 +197,7 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
template: TemplateExecutable,
template_name: Option<String>,
template_status: Option<TemplateStatus>,
epoch: Epoch,
) -> Result<(), TemplateManagerError> {
enum TemplateHash {
Hash(Hash),
Expand Down Expand Up @@ -244,6 +250,7 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
flow_json,
manifest,
url: template_url,
epoch,
};

let mut tx = self.global_db.create_transaction()?;
Expand Down Expand Up @@ -326,21 +333,6 @@ impl<TAddr: NodeAddressable + Send + Sync + 'static> TemplateProvider for Templa

Ok(Some(loaded))
}

fn add_wasm_template(
&self,
author_public_key: PublicKey,
template_address: tari_engine_types::TemplateAddress,
template: &[u8],
) -> Result<(), Self::Error> {
self.add_template(
author_public_key,
template_address,
TemplateExecutable::CompiledWasm(template.to_vec()),
None,
Some(TemplateStatus::Active),
)
}
}

impl<TAddr> Clone for TemplateManager<TAddr> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

use log::*;
use tari_common_types::types::PublicKey;
use tari_dan_common_types::{services::template_provider::TemplateProvider, NodeAddressable};
use tari_dan_common_types::{services::template_provider::TemplateProvider, Epoch, NodeAddressable};
use tari_dan_engine::function_definitions::FlowFunctionDefinition;
use tari_dan_storage::global::{DbTemplateType, DbTemplateUpdate, TemplateStatus};
use tari_engine_types::calculate_template_binary_hash;
Expand Down Expand Up @@ -124,11 +124,12 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
template_address,
template,
template_name,
epoch,
reply,
} => {
handle(
reply,
self.handle_add_template(author_public_key, template_address, template, template_name)
self.handle_add_template(author_public_key, template_address, template, template_name, epoch)
.await,
);
},
Expand Down Expand Up @@ -243,6 +244,7 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
template_address: tari_engine_types::TemplateAddress,
template: TemplateExecutable,
template_name: Option<String>,
epoch: Epoch,
) -> Result<(), TemplateManagerError> {
let template_status = if matches!(template, TemplateExecutable::DownloadableWasm(_, _)) {
TemplateStatus::New
Expand All @@ -255,6 +257,7 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
template.clone(),
template_name,
Some(template_status),
epoch,
)?;

// TODO: remove when we remove support for base layer template registration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_common_types::types::PublicKey;
use tari_dan_common_types::Epoch;
use tari_template_lib::models::TemplateAddress;
use tari_validator_node_client::types::TemplateAbi;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -70,6 +71,7 @@ impl TemplateManagerHandle {
template_address: TemplateAddress,
template: TemplateExecutable,
template_name: Option<String>,
epoch: Epoch,
) -> Result<(), TemplateManagerError> {
let (tx, rx) = oneshot::channel();
self.request_tx
Expand All @@ -78,6 +80,7 @@ impl TemplateManagerHandle {
template_address,
template,
template_name,
epoch,
reply: tx,
})
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use reqwest::Url;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_dan_common_types::Epoch;
use tari_dan_storage::global::{DbTemplate, DbTemplateType};
use tari_template_lib::models::TemplateAddress;
use tari_validator_node_client::types::TemplateAbi;
Expand Down Expand Up @@ -91,6 +92,7 @@ pub enum TemplateManagerRequest {
template_address: tari_engine_types::TemplateAddress,
template: TemplateExecutable,
template_name: Option<String>,
epoch: Epoch,
reply: oneshot::Sender<Result<(), TemplateManagerError>>,
},
GetTemplate {
Expand Down
1 change: 0 additions & 1 deletion applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ pub async fn spawn_services(
handles.push(consensus_join_handle);

let (mempool, join_handle) = mempool::spawn(
consensus_constants.num_preshards,
epoch_manager.clone(),
create_mempool_transaction_validator(template_manager.clone()),
state_store.clone(),
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl DanNode {
.services
.state_store
.with_read_tx(|tx| block.get_committing_transactions(tx))?;

let templates = transactions
.into_iter()
.filter_map(|record| {
Expand All @@ -115,6 +114,7 @@ impl DanNode {
})
});

let epoch = self.services.consensus_handle.current_epoch();
// adding templates to template manager
let mut template_counter = 0;
for (author_pub_key, template_address, template) in templates {
Expand All @@ -127,7 +127,7 @@ impl DanNode {
if let Err(err) = self
.services
.template_manager
.add_template(author_pub_key, template_address, template, None)
.add_template(author_pub_key, template_address, template, None, epoch)
.await
{
error!(target: LOG_TARGET, "🚨Failed to add template: {}", err);
Expand Down
58 changes: 32 additions & 26 deletions applications/tari_validator_node/src/p2p/services/mempool/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, iter};

use libp2p::{gossipsub, PeerId};
use log::*;
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_common_types::{Epoch, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage, TariMessagingSpec};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_networking::{NetworkingHandle, NetworkingService};
Expand Down Expand Up @@ -47,7 +47,6 @@ impl MempoolGossipCodec {

#[derive(Debug)]
pub(super) struct MempoolGossip<TAddr> {
num_preshards: NumPreshards,
epoch_manager: EpochManagerHandle<TAddr>,
is_subscribed: Option<ShardGroup>,
networking: NetworkingHandle<TariMessagingSpec>,
Expand All @@ -57,13 +56,11 @@ pub(super) struct MempoolGossip<TAddr> {

impl MempoolGossip<PeerAddress> {
pub fn new(
num_preshards: NumPreshards,
epoch_manager: EpochManagerHandle<PeerAddress>,
networking: NetworkingHandle<TariMessagingSpec>,
rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>,
) -> Self {
Self {
num_preshards,
epoch_manager,
is_subscribed: None,
networking,
Expand Down Expand Up @@ -143,28 +140,37 @@ impl MempoolGossip<PeerAddress> {
exclude_shard_group: Option<ShardGroup>,
) -> Result<(), MempoolError> {
let n = self.epoch_manager.get_num_committees(epoch).await?;
let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
let local_shard_group = committee_shard.shard_group();
let shard_groups = msg
.transaction
.all_inputs_iter()
.map(|s| {
s.or_zero_version()
.to_substate_address()
.to_shard_group(self.num_preshards, n)
})
.chain(iter::once(
msg.transaction
.id()
.to_substate_address()
.to_shard_group(self.num_preshards, n),
))
.filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group)
.collect::<HashSet<_>>();
// If the only shard group involved is the excluded one.
if shard_groups.is_empty() {
return Ok(());
}
let committee_info = self.epoch_manager.get_local_committee_info(epoch).await?;
let local_shard_group = committee_info.shard_group();
let shard_groups = if msg.transaction.is_global() {
Box::new(
committee_info
.all_shard_groups_iter()
.filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group),
)
} else {
let shard_groups = msg
.transaction
.all_inputs_iter()
.map(|s| {
s.or_zero_version()
.to_substate_address()
.to_shard_group(committee_info.num_preshards(), n)
})
.chain(iter::once(
msg.transaction
.id()
.to_substate_address()
.to_shard_group(committee_info.num_preshards(), n),
))
.filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group)
.collect::<HashSet<_>>();
// If the only shard group involved is the excluded one.
if shard_groups.is_empty() {
return Ok(());
}
Box::new(shard_groups.into_iter()) as Box<dyn Iterator<Item = ShardGroup> + Send>
};

let msg = self
.codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use libp2p::{gossipsub, PeerId};
use log::*;
use tari_dan_common_types::{NumPreshards, PeerAddress};
use tari_dan_common_types::PeerAddress;
use tari_dan_p2p::TariMessagingSpec;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_networking::NetworkingHandle;
Expand All @@ -42,7 +42,6 @@ use crate::{
const LOG_TARGET: &str = "tari::dan::validator_node::mempool";

pub fn spawn<TValidator>(
num_preshards: NumPreshards,
epoch_manager: EpochManagerHandle<PeerAddress>,
transaction_validator: TValidator,
state_store: SqliteStateStore<PeerAddress>,
Expand All @@ -61,7 +60,6 @@ where
#[cfg(feature = "metrics")]
let metrics = PrometheusMempoolMetrics::new(metrics_registry);
let mempool = MempoolService::new(
num_preshards,
rx_mempool_request,
epoch_manager,
transaction_validator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{collections::HashSet, fmt::Display, iter};
use libp2p::{gossipsub, PeerId};
use log::*;
use tari_consensus::hotstuff::HotstuffEvent;
use tari_dan_common_types::{optional::Optional, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_common_types::{optional::Optional, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_p2p::{DanMessage, NewTransactionMessage, TariMessagingSpec};
use tari_dan_storage::{consensus_models::TransactionRecord, StateStore};
use tari_engine_types::commit_result::RejectReason;
Expand Down Expand Up @@ -67,7 +67,6 @@ impl<TValidator> MempoolService<TValidator>
where TValidator: Validator<Transaction, Context = (), Error = TransactionValidationError>
{
pub(super) fn new(
num_preshards: NumPreshards,
mempool_requests: mpsc::Receiver<MempoolRequest>,
epoch_manager: EpochManagerHandle<PeerAddress>,
before_execute_validator: TValidator,
Expand All @@ -78,7 +77,7 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
#[cfg(feature = "metrics")] metrics: PrometheusMempoolMetrics,
) -> Self {
Self {
gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), networking, rx_gossip),
gossip: MempoolGossip::new(epoch_manager.clone(), networking, rx_gossip),
transactions: Default::default(),
mempool_requests,
epoch_manager,
Expand Down
7 changes: 7 additions & 0 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ impl CommitteeInfo {
}

pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool {
if substate_id.is_global() {
return true;
}
// version doesnt affect shard
let addr = SubstateAddress::from_substate_id(substate_id, 0);
let shard = addr.to_shard(self.num_shards);
Expand Down Expand Up @@ -265,6 +268,10 @@ impl CommitteeInfo {
.into_iter()
.filter(|substate_address| self.includes_substate_address(substate_address.borrow()))
}

pub fn all_shard_groups_iter(&self) -> impl Iterator<Item = ShardGroup> {
self.num_shards.all_shard_groups_iter(self.num_committees)
}
}

#[derive(Debug, Clone, Serialize)]
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/common_types/src/lock_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl SubstateLockType {
pub fn is_output(&self) -> bool {
matches!(self, Self::Output)
}

pub fn is_input(&self) -> bool {
!self.is_output()
}
}

impl fmt::Display for SubstateLockType {
Expand Down
Loading

0 comments on commit d436226

Please sign in to comment.