Skip to content

Commit

Permalink
support for base layer registration
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Dec 12, 2024
1 parent 890188f commit 83f117c
Show file tree
Hide file tree
Showing 35 changed files with 702 additions and 560 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ tokio = { workspace = true, features = [
] }
tokio-stream = { workspace = true, features = ["sync"] }
config = { workspace = true }
url = { workspace = true }
72 changes: 69 additions & 3 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::time::Duration;

use log::*;
use reqwest::Url;
use tari_base_node_client::{
grpc::GrpcBaseNodeClient,
types::{BaseLayerMetadata, BlockInfo},
Expand All @@ -35,7 +36,12 @@ use tari_core::{
base_node::comms_interface::ValidatorNodeChange,
transactions::{
tari_amount::MicroMinotari,
transaction_components::{SideChainFeatureData, TransactionOutput, ValidatorNodeRegistration},
transaction_components::{
CodeTemplateRegistration,
SideChainFeatureData,
TransactionOutput,
ValidatorNodeRegistration,
},
},
};
use tari_crypto::{
Expand All @@ -56,12 +62,15 @@ use tari_dan_storage::{
StorageError,
};
use tari_dan_storage_sqlite::{error::SqliteStorageError, global::SqliteGlobalDbAdapter};
use tari_engine_types::{confidential::UnclaimedConfidentialOutput, substate::SubstateId};
use tari_engine_types::{confidential::UnclaimedConfidentialOutput, substate::SubstateId, TemplateAddress};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerError, EpochManagerReader};
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
use tari_template_lib::models::{EncryptedData, UnclaimedConfidentialOutputAddress};
use tokio::{task, task::JoinHandle, time};
use url::ParseError;

use crate::template_manager::interface::{TemplateExecutable, TemplateManagerError, TemplateManagerHandle};

const LOG_TARGET: &str = "tari::dan::base_layer_scanner";

Expand All @@ -76,6 +85,9 @@ pub fn spawn<TAddr: NodeAddressable + 'static>(
base_layer_scanning_interval: Duration,
validator_node_sidechain_id: Option<RistrettoPublicKey>,
burnt_utxo_sidechain_id: Option<RistrettoPublicKey>,
// TODO: remove when base layer template registration is removed too
template_manager: TemplateManagerHandle,
template_sidechain_id: Option<PublicKey>,
) -> JoinHandle<anyhow::Result<()>> {
task::spawn(async move {
let base_layer_scanner = BaseLayerScanner::new(
Expand All @@ -89,6 +101,8 @@ pub fn spawn<TAddr: NodeAddressable + 'static>(
base_layer_scanning_interval,
validator_node_sidechain_id,
burnt_utxo_sidechain_id,
template_manager,
template_sidechain_id,
);

base_layer_scanner.start().await?;
Expand All @@ -113,6 +127,9 @@ pub struct BaseLayerScanner<TAddr> {
has_attempted_scan: bool,
validator_node_sidechain_id: Option<PublicKey>,
burnt_utxo_sidechain_id: Option<PublicKey>,
// TODO: remove template related data, when removed base layer template registration support
template_manager: TemplateManagerHandle,
template_sidechain_id: Option<PublicKey>,
}

impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Expand All @@ -127,6 +144,8 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
base_layer_scanning_interval: Duration,
validator_node_sidechain_id: Option<PublicKey>,
burnt_utxo_sidechain_id: Option<PublicKey>,
template_manager: TemplateManagerHandle,
template_sidechain_id: Option<PublicKey>,
) -> Self {
Self {
global_db,
Expand All @@ -145,6 +164,8 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
has_attempted_scan: false,
validator_node_sidechain_id,
burnt_utxo_sidechain_id,
template_manager,
template_sidechain_id,
}
}

Expand Down Expand Up @@ -336,7 +357,22 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
},
// TODO: remove completely SideChainFeature::CodeTemplateRegistration at some point
SideChainFeatureData::CodeTemplateRegistration(reg) => {
trace!(target: LOG_TARGET, "New code template registration scanned: {reg:?}");
if sidechain_feature.sidechain_public_key() != self.template_sidechain_id.as_ref() {
debug!(
target: LOG_TARGET,
"Ignoring code template registration for sidechain ID {:?}. Local node's sidechain ID: {:?}",
sidechain_feature.sidechain_public_key(),
self.template_sidechain_id,
);
continue;
}
self.register_code_template_registration(
reg.template_name.to_string(),
(*output_hash).into(),
reg.clone(),
&block_info,
)
.await?;
},
SideChainFeatureData::ConfidentialOutput(_) => {
// Should be checked by the base layer
Expand Down Expand Up @@ -428,6 +464,32 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Ok(())
}

async fn register_code_template_registration(
&mut self,
template_name: String,
template_address: TemplateAddress,
registration: CodeTemplateRegistration,
block_info: &BlockInfo,
) -> Result<(), BaseLayerScannerError> {
info!(
target: LOG_TARGET,
"🌠 new template found with address {} at height {}", template_address, block_info.height
);
self.template_manager
.add_template(
registration.author_public_key,
template_address,
TemplateExecutable::DownloadableWasm(
Url::parse(registration.binary_url.as_str())?,
registration.binary_sha,
),
Some(template_name),
)
.await?;

Ok(())
}

async fn update_validators(&mut self, epoch: Epoch) -> Result<(), BaseLayerScannerError> {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -610,6 +672,10 @@ pub enum BaseLayerScannerError {
PublicKeyConversion(ByteArrayError),
#[error("GRPC conversion error: {0}")]
GrpcConversion(String),
#[error("Template manager error: {0}")]
TemplateManagerError(#[from] TemplateManagerError),
#[error("URL parse error: {0}")]
UrlParse(#[from] ParseError),
}

enum BlockchainProgression {
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_dan_app_utilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

extern crate core;

pub mod base_layer_scanner;
pub mod common;
pub mod configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,57 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
template_name: Option<String>,
template_status: Option<TemplateStatus>,
) -> Result<(), TemplateManagerError> {
enum TemplateHash {
Hash(Hash),
FixedHash(FixedHash),
}

let mut compiled_code = None;
let mut flow_json = None;
let mut manifest = None;
let mut template_type = DbTemplateType::Wasm;
let template_hash: Hash;
let template_hash: TemplateHash;
let mut template_name = template_name.unwrap_or(String::from("default"));
let mut template_url = None;
match template {
TemplateExecutable::CompiledWasm(binary) => {
let loaded_template = WasmModule::load_template_from_code(binary.as_slice())?;
template_hash = template_hasher32().chain(binary.as_slice()).result();
template_hash = TemplateHash::Hash(template_hasher32().chain(binary.as_slice()).result());
compiled_code = Some(binary);
template_name = loaded_template.template_name().to_string();
},
TemplateExecutable::Manifest(curr_manifest) => {
template_hash = template_hasher32().chain(curr_manifest.as_str()).result();
template_hash = TemplateHash::Hash(template_hasher32().chain(curr_manifest.as_str()).result());
manifest = Some(curr_manifest);
template_type = DbTemplateType::Manifest;
},
TemplateExecutable::Flow(curr_flow_json) => {
template_hash = template_hasher32().chain(curr_flow_json.as_str()).result();
template_hash = TemplateHash::Hash(template_hasher32().chain(curr_flow_json.as_str()).result());
flow_json = Some(curr_flow_json);
template_type = DbTemplateType::Flow;
},
TemplateExecutable::DownloadableWasm(url, hash) => {
template_url = Some(url.to_string());
template_type = DbTemplateType::Wasm;
template_hash = TemplateHash::FixedHash(hash);
},
}

let template = DbTemplate {
author_public_key: FixedHash::try_from(author_public_key.to_vec().as_slice())?,
template_name,
template_address,
expected_hash: FixedHash::from(template_hash.into_array()),
expected_hash: match template_hash {
TemplateHash::Hash(hash) => FixedHash::from(hash.into_array()),
TemplateHash::FixedHash(hash) => hash,
},
status: template_status.unwrap_or(TemplateStatus::New),
compiled_code,
added_at: Utc::now().naive_utc(),
template_type,
flow_json,
manifest,
url: template_url,
};

let mut tx = self.global_db.create_transaction()?;
Expand Down Expand Up @@ -301,6 +316,10 @@ impl<TAddr: NodeAddressable + Send + Sync + 'static> TemplateProvider for Templa
let factory = FlowFactory::try_create::<Self>(definition)?;
LoadedTemplate::Flow(factory)
},
TemplateExecutable::DownloadableWasm(_, _) => {
// impossible case, since there is no separate downloadable wasm type in DB level
return Err(Self::Error::UnsupportedTemplateType);
},
};

self.cache.insert(*address, loaded.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_dan_engine::function_definitions::FlowFunctionDefinition;
use tari_dan_storage::global::{DbTemplateType, DbTemplateUpdate, TemplateStatus};
use tari_engine_types::calculate_template_binary_hash;
use tari_shutdown::ShutdownSignal;
use tari_template_lib::models::TemplateAddress;
use tari_template_lib::{models::TemplateAddress, Hash};
use tari_validator_node_client::types::{ArgDef, FunctionDef, TemplateAbi};
use tokio::{
sync::{mpsc, mpsc::Receiver, oneshot},
Expand All @@ -49,22 +49,22 @@ pub struct TemplateManagerService<TAddr> {
rx_request: Receiver<TemplateManagerRequest>,
manager: TemplateManager<TAddr>,
completed_downloads: mpsc::Receiver<DownloadResult>,
// download_queue: mpsc::Sender<DownloadRequest>,
download_queue: mpsc::Sender<DownloadRequest>,
}

impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
pub fn spawn(
rx_request: Receiver<TemplateManagerRequest>,
manager: TemplateManager<TAddr>,
_download_queue: mpsc::Sender<DownloadRequest>,
download_queue: mpsc::Sender<DownloadRequest>,
completed_downloads: mpsc::Receiver<DownloadResult>,
shutdown: ShutdownSignal,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
Self {
rx_request,
manager,
// download_queue,
download_queue,
completed_downloads,
}
.run(shutdown)
Expand Down Expand Up @@ -97,19 +97,19 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
let templates = self.manager.fetch_pending_templates()?;
for template in templates {
if template.status == TemplateStatus::Pending {
// TODO: handle
// let _ignore = self
// .download_queue
// .send(DownloadRequest {
// template_type: template.template_type,
// address: Hash::try_from(template.template_address.as_slice()).unwrap(),
// expected_binary_hash: template.expected_hash,
// })
// .await;
// info!(
// target: LOG_TARGET,
// "⏳️️ Template {} queued for download", template.template_address
// );
let _ignore = self
.download_queue
.send(DownloadRequest {
template_type: template.template_type,
address: Hash::try_from(template.template_address.into_array()).unwrap(),
expected_binary_hash: template.expected_hash,
url: template.url.unwrap(),
})
.await;
info!(
target: LOG_TARGET,
"⏳️️ Template {} queued for download", template.template_address
);
}
}
Ok(())
Expand Down Expand Up @@ -244,13 +244,39 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
template: TemplateExecutable,
template_name: Option<String>,
) -> Result<(), TemplateManagerError> {
let template_status = if matches!(template, TemplateExecutable::DownloadableWasm(_, _)) {
TemplateStatus::New
} else {
TemplateStatus::Active
};
self.manager.add_template(
author_public_key,
template_address,
template,
template.clone(),
template_name,
Some(TemplateStatus::Active),
Some(template_status),
)?;

// TODO: remove when we remove support for base layer template registration
// update template status and add to download queue if it's a downloadable template
if let TemplateExecutable::DownloadableWasm(url, expected_binary_hash) = template {
// We could queue this up much later, at which point we'd update to pending
self.manager.update_template(template_address, DbTemplateUpdate {
status: Some(TemplateStatus::Pending),
..Default::default()
})?;

let _ignore = self
.download_queue
.send(DownloadRequest {
address: template_address,
template_type: DbTemplateType::Wasm,
url: url.to_string(),
expected_binary_hash,
})
.await;
}

Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ impl TemplateManagerHandle {
author_public_key: PublicKey,
template_address: TemplateAddress,
template: TemplateExecutable,
template_name: Option<String>,
) -> Result<(), TemplateManagerError> {
let (tx, rx) = oneshot::channel();
self.request_tx
.send(TemplateManagerRequest::AddTemplate {
author_public_key,
template_address,
template,
template_name: None,
template_name,
reply: tx,
})
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use reqwest::Url;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_dan_storage::global::{DbTemplate, DbTemplateType};
use tari_template_lib::models::TemplateAddress;
Expand Down Expand Up @@ -52,6 +53,9 @@ pub enum TemplateExecutable {
CompiledWasm(Vec<u8>),
Manifest(String),
Flow(String),
// TODO: remove this when base layer template registration is removed
/// WASM binary download URL and binary hash
DownloadableWasm(Url, FixedHash),
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 83f117c

Please sign in to comment.