diff --git a/applications/tari_dan_app_utilities/src/template_manager/implementation/initializer.rs b/applications/tari_dan_app_utilities/src/template_manager/implementation/initializer.rs index b8ab34eaa..e11f21f66 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/implementation/initializer.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/implementation/initializer.rs @@ -32,11 +32,12 @@ use tokio::{ use super::{downloader::TemplateDownloadWorker, service::TemplateManagerService, TemplateManager}; use crate::template_manager::interface::TemplateManagerHandle; -pub fn spawn( +pub fn spawn( manager: TemplateManager, epoch_manager: EpochManagerHandle, client_factory: TariValidatorNodeRpcClientFactory, rx_template_sync: broadcast::Receiver, + state_store: S, shutdown: ShutdownSignal, ) -> (TemplateManagerHandle, JoinHandle>) { let (tx_request, rx_request) = mpsc::channel(1); @@ -53,6 +54,7 @@ pub fn spawn( rx_completed_downloads, rx_template_sync, client_factory, + state_store, shutdown, ); TemplateDownloadWorker::new(rx_download_queue, tx_completed_downloads).spawn(); diff --git a/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs b/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs index 5846158c5..afce891a2 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs @@ -25,6 +25,7 @@ use std::{ convert::TryFrom, fs, sync::Arc, + thread, time::{SystemTime, UNIX_EPOCH}, }; @@ -369,6 +370,8 @@ impl TemplateProvider for Templa if let Some(TemplateResult::Template(fetched_template)) = self.fetch_template(address).optional()? { template = Some(fetched_template); } + // sleeping here to not overload the local database while waiting for the template to be ready + thread::sleep(std::time::Duration::from_millis(100)); } debug!(target: LOG_TARGET, "Failed to fetch template {} within {:?}", address, self.config.pending_templates_wait_timeout()); template.ok_or(Self::Error::TemplateUnavailable)? diff --git a/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs b/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs index ae2dd8e57..154888510 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs @@ -27,6 +27,7 @@ use std::{string::FromUtf8Error, sync::Arc}; use log::*; use tari_common_types::types::{FixedHash, FixedHashSizeError, PublicKey}; +use tari_consensus::traits::{ReadableSubstateStore, SubstateStore}; use tari_crypto::tari_utilities::{ByteArray, ByteArrayError}; use tari_dan_common_types::{ services::template_provider::TemplateProvider, @@ -40,7 +41,12 @@ use tari_dan_p2p::proto::{ rpc as proto, rpc::{SyncTemplateResponse, TemplateType}, }; -use tari_dan_storage::global::{DbTemplateType, DbTemplateUpdate, TemplateStatus}; +use tari_dan_storage::{ + consensus_models::SubstateRecord, + global::{DbTemplateType, DbTemplateUpdate, TemplateStatus}, + StateStore, + StorageError, +}; use tari_engine_types::{ calculate_template_binary_hash, hashing::template_hasher32, @@ -50,6 +56,7 @@ use tari_engine_types::{ use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerError, EpochManagerReader}; use tari_rpc_framework::RpcError; use tari_shutdown::ShutdownSignal; +use tari_state_store_sqlite::SqliteStateStore; use tari_template_lib::{models::TemplateAddress, Hash}; use tari_validator_node_client::types::{ArgDef, FunctionDef, TemplateAbi}; use tari_validator_node_rpc::{ @@ -59,7 +66,7 @@ use tari_validator_node_rpc::{ }; use thiserror::Error; use tokio::{ - sync::{broadcast, mpsc, mpsc::Receiver, oneshot}, + sync::{broadcast, mpsc, mpsc::Receiver, oneshot, Mutex}, task::JoinHandle, }; @@ -80,21 +87,19 @@ pub enum TemplateManagerServiceError { TemplateManager(#[from] TemplateManagerError), #[error("Epoch manager error: {0}")] EpochManager(#[from] EpochManagerError), - #[error("Invalid substate ID: {0}")] - InvalidSubstateId(&'static str), #[error("VN RPC client error: {0}")] VnRpcClient(#[from] ValidatorNodeRpcClientError), #[error("RPC error: {0}")] Rpc(#[from] RpcError), - #[error("Byte array error: {0}")] - ByteArray(ByteArrayError), #[error("UTF-8 bytes to string conversion error: {0}")] BytesUtf8Conversion(#[from] FromUtf8Error), #[error("Fixed hash conversion error: {0}")] FixedHashConversion(#[from] FixedHashSizeError), + #[error("Storage error: {0}")] + Storage(#[from] StorageError), } -pub struct TemplateManagerService { +pub struct TemplateManagerService { rx_request: Receiver, manager: TemplateManager, epoch_manager: EpochManagerHandle, @@ -102,9 +107,12 @@ pub struct TemplateManagerService { download_queue: mpsc::Sender, rx_template_sync: broadcast::Receiver, client_factory: Arc, + state_store: S, } -impl TemplateManagerService { +impl + TemplateManagerService +{ pub fn spawn( rx_request: Receiver, manager: TemplateManager, @@ -113,6 +121,7 @@ impl TemplateManagerService { completed_downloads: mpsc::Receiver, rx_template_sync: broadcast::Receiver, client_factory: TariValidatorNodeRpcClientFactory, + state_store: S, shutdown: ShutdownSignal, ) -> JoinHandle> { tokio::spawn(async move { @@ -124,6 +133,7 @@ impl TemplateManagerService { completed_downloads, rx_template_sync, client_factory: Arc::new(client_factory), + state_store, } .run(shutdown) .await?; @@ -179,10 +189,7 @@ impl TemplateManagerService { /// Handles request to check if a template by address (substate ID and version) is present locally or needs syncing. /// If needs syncing, just does it. - async fn handle_template_sync_request( - &mut self, - req: TemplateSyncRequest, - ) -> Result<(), TemplateManagerServiceError> { + async fn handle_template_sync_request(&self, req: TemplateSyncRequest) -> Result<(), TemplateManagerServiceError> { warn!(target: LOG_TARGET, "New template sync request: {req:?}"); // TODO: remove, only for testing let template_address = req.address(); @@ -202,14 +209,13 @@ impl TemplateManagerService { self.manager.add_pending_template(template_address)?; // sync + let substate_id = SubstateId::from(PublishedTemplateAddress::from_hash(req.address())); + let version = self.state_store.get_latest_version(&substate_id).unwrap_or(0); let mut owner_committee = self .epoch_manager .get_committee_for_substate( self.epoch_manager.current_epoch().await?, - SubstateAddress::from_substate_id( - &SubstateId::from(PublishedTemplateAddress::from_hash(req.address())), - 0, - ), // TODO: get version somehow + SubstateAddress::from_substate_id(&substate_id, version), ) .await?; owner_committee.shuffle(); diff --git a/applications/tari_indexer/src/bootstrap.rs b/applications/tari_indexer/src/bootstrap.rs index c42b592f2..a1b9ff11e 100644 --- a/applications/tari_indexer/src/bootstrap.rs +++ b/applications/tari_indexer/src/bootstrap.rs @@ -150,6 +150,7 @@ pub async fn spawn_services( epoch_manager.clone(), validator_node_client_factory.clone(), rx_template_sync, + substate_store.clone(), shutdown.clone(), ); diff --git a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs index 00c2c9ac4..fd2b5a993 100644 --- a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs +++ b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs @@ -40,7 +40,10 @@ use diesel_migrations::{EmbeddedMigrations, MigrationHarness}; use log::*; use tari_crypto::tari_utilities::hex::to_hex; use tari_dan_common_types::{substate_type::SubstateType, Epoch, ShardGroup}; -use tari_dan_storage::{consensus_models::BlockId, StorageError}; +use tari_dan_storage::{ + consensus_models::{BlockId, SubstateRecord}, + StorageError, +}; use tari_dan_storage_sqlite::{error::SqliteStorageError, SqliteTransaction}; use tari_engine_types::substate::SubstateId; use tari_indexer_client::types::ListSubstateItem; @@ -135,6 +138,18 @@ pub trait SubstateStore { } } +impl tari_state_store_sqlite::SubstateStore for SqliteSubstateStore { + fn get_latest_version(&self, substate_id: &SubstateId) -> Result { + let mut tx = self.create_read_tx()?; + tx.get_latest_version_for_substate(substate_id)? + .map(|version| version as u32) + .ok_or(StorageError::NotFound { + item: "substate", + key: substate_id.to_string(), + }) + } +} + #[derive(Debug, Error)] pub enum StoreError { #[error("Storage error: {details}")] diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index c10d3aeb0..3566a362d 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -263,6 +263,7 @@ pub async fn spawn_services( epoch_manager.clone(), validator_node_client_factory.clone(), rx_template_sync, + state_store.clone(), shutdown.clone(), ); handles.push(join_handle); diff --git a/bindings/src/index.ts b/bindings/src/index.ts deleted file mode 100644 index fadd8abd1..000000000 --- a/bindings/src/index.ts +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2023 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -export * from "./types/AbortReason"; -export * from "./types/AccessRule"; -export * from "./types/Account"; -export * from "./types/Amount"; -export * from "./types/Arg"; -export * from "./types/ArgDef"; -export * from "./types/AuthHook"; -export * from "./types/Block"; -export * from "./types/BlockHeader"; -export * from "./types/BucketId"; -export * from "./types/Claims"; -export * from "./types/Command"; -export * from "./types/Committee"; -export * from "./types/CommitteeInfo"; -export * from "./types/CommitteeShardInfo"; -export * from "./types/ComponentAccessRules"; -export * from "./types/ComponentAddress"; -export * from "./types/ComponentBody"; -export * from "./types/ComponentHeader"; -export * from "./types/ComponentKey"; -export * from "./types/ConfidentialClaim"; -export * from "./types/ConfidentialOutput"; -export * from "./types/ConfidentialOutputStatement"; -export * from "./types/ConfidentialStatement"; -export * from "./types/ConfidentialTransferInputSelection"; -export * from "./types/ConfidentialWithdrawProof"; -export * from "./types/Decision"; -export * from "./types/ElgamalVerifiableBalance"; -export * from "./types/EntityId"; -export * from "./types/Epoch"; -export * from "./types/Era"; -export * from "./types/Event"; -export * from "./types/EvictNodeAtom"; -export * from "./types/Evidence"; -export * from "./types/ExecuteResult"; -export * from "./types/ExecutedTransaction"; -export * from "./types/ExtraData"; -export * from "./types/FeeBreakdown"; -export * from "./types/FeeClaim"; -export * from "./types/FeeClaimAddress"; -export * from "./types/FeeCostBreakdown"; -export * from "./types/FeeReceipt"; -export * from "./types/FeeSource"; -export * from "./types/FinalizeResult"; -export * from "./types/ForeignProposalAtom"; -export * from "./types/FunctionDef"; -export * from "./types/IndexedValue"; -export * from "./types/IndexedWellKnownTypes"; -export * from "./types/Instruction"; -export * from "./types/InstructionResult"; -export * from "./types/JrpcPermission"; -export * from "./types/JrpcPermissions"; -export * from "./types/LeaderFee"; -export * from "./types/LockFlag"; -export * from "./types/LogEntry"; -export * from "./types/LogLevel"; -export * from "./types/Metadata"; -export * from "./types/MintConfidentialOutputAtom"; -export * from "./types/NetworkCommitteeInfo"; -export * from "./types/NodeHeight"; -export * from "./types/NonFungible"; -export * from "./types/NonFungibleAddress"; -export * from "./types/NonFungibleAddressContents"; -export * from "./types/NonFungibleContainer"; -export * from "./types/NonFungibleId"; -export * from "./types/NonFungibleIndex"; -export * from "./types/NonFungibleIndexAddress"; -export * from "./types/NonFungibleToken"; -export * from "./types/NumPreshards"; -export * from "./types/Ordering"; -export * from "./types/OwnerRule"; -export * from "./types/PeerAddress"; -export * from "./types/ProofId"; -export * from "./types/PublishedTemplate"; -export * from "./types/PublishedTemplateAddress"; -export * from "./types/QuorumCertificate"; -export * from "./types/QuorumDecision"; -export * from "./types/RejectReason"; -export * from "./types/RequireRule"; -export * from "./types/Resource"; -export * from "./types/ResourceAccessRules"; -export * from "./types/ResourceAddress"; -export * from "./types/ResourceContainer"; -export * from "./types/ResourceType"; -export * from "./types/RestrictedAccessRule"; -export * from "./types/ResumeNodeAtom"; -export * from "./types/RuleRequirement"; -export * from "./types/Shard"; -export * from "./types/ShardEvidence"; -export * from "./types/ShardGroup"; -export * from "./types/ShardGroupEvidence"; -export * from "./types/Substate"; -export * from "./types/SubstateAddress"; -export * from "./types/SubstateDestroyed"; -export * from "./types/SubstateDiff"; -export * from "./types/SubstateId"; -export * from "./types/SubstateLockType"; -export * from "./types/SubstateRecord"; -export * from "./types/SubstateRequirement"; -export * from "./types/SubstateRequirementLockIntent"; -export * from "./types/SubstateType"; -export * from "./types/SubstateValue"; -export * from "./types/SuspendNodeAtom"; -export * from "./types/TemplateDef"; -export * from "./types/TemplateDefV1"; -export * from "./types/Transaction"; -export * from "./types/TransactionAtom"; -export * from "./types/TransactionPoolRecord"; -export * from "./types/TransactionPoolStage"; -export * from "./types/TransactionReceipt"; -export * from "./types/TransactionReceiptAddress"; -export * from "./types/TransactionResult"; -export * from "./types/TransactionSignature"; -export * from "./types/TransactionStatus"; -export * from "./types/Type"; -export * from "./types/UnclaimedConfidentialOutput"; -export * from "./types/UnclaimedConfidentialOutputAddress"; -export * from "./types/UnsignedTransaction"; -export * from "./types/ValidatorSignature"; -export * from "./types/Vault"; -export * from "./types/VaultId"; -export * from "./types/VersionedSubstateId"; -export * from "./types/VersionedSubstateIdLockIntent"; -export * from "./types/ViewableBalanceProof"; -export * from "./base-node-client"; -export * from "./tari-indexer-client"; -export * from "./validator-node-client"; -export * from "./wallet-daemon-client"; -export * from "./helpers/helpers"; diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index d1fdc9040..2a5874f0b 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -92,6 +92,11 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> ReadableSubstateStore } Ok(substate.into_substate()) } + + fn get_latest_version(&self, substate_id: &SubstateId) -> Result { + let (version, _) = SubstateRecord::get_latest_version(self.read_transaction(), substate_id)?; + Ok(version) + } } impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingSubstateStore<'a, 'tx, TStore> { diff --git a/dan_layer/consensus/src/traits/substate_store.rs b/dan_layer/consensus/src/traits/substate_store.rs index 53c66f527..4047b63f6 100644 --- a/dan_layer/consensus/src/traits/substate_store.rs +++ b/dan_layer/consensus/src/traits/substate_store.rs @@ -6,7 +6,7 @@ use tari_dan_storage::{ consensus_models::{SubstateChange, SubstateRecord}, StateStoreReadTransaction, }; -use tari_engine_types::substate::{Substate, SubstateDiff}; +use tari_engine_types::substate::{Substate, SubstateDiff, SubstateId}; use tari_transaction::TransactionId; use crate::hotstuff::substate_store::SubstateStoreError; @@ -15,6 +15,8 @@ pub trait ReadableSubstateStore { type Error; fn get(&self, id: VersionedSubstateIdRef<'_>) -> Result; + + fn get_latest_version(&self, substate_id: &SubstateId) -> Result; } pub trait WriteableSubstateStore: ReadableSubstateStore { @@ -34,4 +36,9 @@ impl ReadableSubstateStore for &T { let substate = SubstateRecord::get(*self, &id.to_substate_address())?; Ok(substate.into_substate()) } + + fn get_latest_version(&self, substate_id: &SubstateId) -> Result { + let (version, _) = SubstateRecord::get_latest_version(*self, substate_id)?; + Ok(version) + } } diff --git a/dan_layer/state_store_sqlite/src/lib.rs b/dan_layer/state_store_sqlite/src/lib.rs index 677be0292..32b348e4e 100644 --- a/dan_layer/state_store_sqlite/src/lib.rs +++ b/dan_layer/state_store_sqlite/src/lib.rs @@ -9,6 +9,8 @@ mod sql_models; mod sqlite_transaction; mod store; // mod tree_store; +mod substate; mod writer; pub use store::SqliteStateStore; +pub use substate::SubstateStore; diff --git a/dan_layer/state_store_sqlite/src/store.rs b/dan_layer/state_store_sqlite/src/store.rs index 394d524d9..5bd3cda9a 100644 --- a/dan_layer/state_store_sqlite/src/store.rs +++ b/dan_layer/state_store_sqlite/src/store.rs @@ -13,7 +13,8 @@ use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use log::log; use serde::{de::DeserializeOwned, Serialize}; use tari_dan_common_types::NodeAddressable; -use tari_dan_storage::{StateStore, StorageError}; +use tari_dan_storage::{StateStore, StateStoreReadTransaction, StorageError}; +use tari_engine_types::substate::SubstateId; use crate::{ error::SqliteStorageError, @@ -69,14 +70,24 @@ impl fmt::Debug for SqliteStateStore { } } +impl crate::SubstateStore for SqliteStateStore { + fn get_latest_version(&self, substate_id: &SubstateId) -> Result { + let mut tx = self.create_read_tx()?; + let (version, _) = tx.substates_get_max_version_for_substate(substate_id)?; + Ok(version) + } +} + impl StateStore for SqliteStateStore { type Addr = TAddr; type ReadTransaction<'a> - = SqliteStateStoreReadTransaction<'a, Self::Addr> - where TAddr: 'a; + = SqliteStateStoreReadTransaction<'a, Self::Addr> + where + TAddr: 'a; type WriteTransaction<'a> - = SqliteStateStoreWriteTransaction<'a, Self::Addr> - where TAddr: 'a; + = SqliteStateStoreWriteTransaction<'a, Self::Addr> + where + TAddr: 'a; fn create_read_tx(&self) -> Result, StorageError> { let tx = SqliteTransaction::begin(self.connection.lock().unwrap())?; diff --git a/dan_layer/state_store_sqlite/src/substate.rs b/dan_layer/state_store_sqlite/src/substate.rs new file mode 100644 index 000000000..b727452b9 --- /dev/null +++ b/dan_layer/state_store_sqlite/src/substate.rs @@ -0,0 +1,10 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use tari_dan_storage::StorageError; +use tari_engine_types::substate::SubstateId; + +/// General trait for different state stores to do operations on substates. +pub trait SubstateStore { + fn get_latest_version(&self, substate_id: &SubstateId) -> Result; +}