Skip to content

Commit

Permalink
almost fully done syncing implementation, small changes left
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Dec 20, 2024
1 parent 584c069 commit 037035a
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use tokio::{
use super::{downloader::TemplateDownloadWorker, service::TemplateManagerService, TemplateManager};
use crate::template_manager::interface::TemplateManagerHandle;

pub fn spawn<TAddr: NodeAddressable + 'static>(
pub fn spawn<TAddr: NodeAddressable + 'static, S: tari_state_store_sqlite::SubstateStore + Send + Sync + 'static>(
manager: TemplateManager<TAddr>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
rx_template_sync: broadcast::Receiver<TemplateSyncRequest>,
state_store: S,
shutdown: ShutdownSignal,
) -> (TemplateManagerHandle, JoinHandle<anyhow::Result<()>>) {
let (tx_request, rx_request) = mpsc::channel(1);
Expand All @@ -53,6 +54,7 @@ pub fn spawn<TAddr: NodeAddressable + 'static>(
rx_completed_downloads,
rx_template_sync,
client_factory,
state_store,
shutdown,
);
TemplateDownloadWorker::new(rx_download_queue, tx_completed_downloads).spawn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
convert::TryFrom,
fs,
sync::Arc,
thread,
time::{SystemTime, UNIX_EPOCH},
};

Expand Down Expand Up @@ -369,6 +370,8 @@ impl<TAddr: NodeAddressable + Send + Sync + 'static> TemplateProvider for Templa
if let Some(TemplateResult::Template(fetched_template)) = self.fetch_template(address).optional()? {
template = Some(fetched_template);
}
// sleeping here to not overload the local database while waiting for the template to be ready
thread::sleep(std::time::Duration::from_millis(100));
}
debug!(target: LOG_TARGET, "Failed to fetch template {} within {:?}", address, self.config.pending_templates_wait_timeout());
template.ok_or(Self::Error::TemplateUnavailable)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{string::FromUtf8Error, sync::Arc};

use log::*;
use tari_common_types::types::{FixedHash, FixedHashSizeError, PublicKey};
use tari_consensus::traits::{ReadableSubstateStore, SubstateStore};
use tari_crypto::tari_utilities::{ByteArray, ByteArrayError};
use tari_dan_common_types::{
services::template_provider::TemplateProvider,
Expand All @@ -40,7 +41,12 @@ use tari_dan_p2p::proto::{
rpc as proto,
rpc::{SyncTemplateResponse, TemplateType},
};
use tari_dan_storage::global::{DbTemplateType, DbTemplateUpdate, TemplateStatus};
use tari_dan_storage::{
consensus_models::SubstateRecord,
global::{DbTemplateType, DbTemplateUpdate, TemplateStatus},
StateStore,
StorageError,
};
use tari_engine_types::{
calculate_template_binary_hash,
hashing::template_hasher32,
Expand All @@ -50,6 +56,7 @@ use tari_engine_types::{
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerError, EpochManagerReader};
use tari_rpc_framework::RpcError;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
use tari_template_lib::{models::TemplateAddress, Hash};
use tari_validator_node_client::types::{ArgDef, FunctionDef, TemplateAbi};
use tari_validator_node_rpc::{
Expand All @@ -59,7 +66,7 @@ use tari_validator_node_rpc::{
};
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, mpsc::Receiver, oneshot},
sync::{broadcast, mpsc, mpsc::Receiver, oneshot, Mutex},
task::JoinHandle,
};

Expand All @@ -80,31 +87,32 @@ pub enum TemplateManagerServiceError {
TemplateManager(#[from] TemplateManagerError),
#[error("Epoch manager error: {0}")]
EpochManager(#[from] EpochManagerError),
#[error("Invalid substate ID: {0}")]
InvalidSubstateId(&'static str),
#[error("VN RPC client error: {0}")]
VnRpcClient(#[from] ValidatorNodeRpcClientError),
#[error("RPC error: {0}")]
Rpc(#[from] RpcError),
#[error("Byte array error: {0}")]
ByteArray(ByteArrayError),
#[error("UTF-8 bytes to string conversion error: {0}")]
BytesUtf8Conversion(#[from] FromUtf8Error),
#[error("Fixed hash conversion error: {0}")]
FixedHashConversion(#[from] FixedHashSizeError),
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
}

pub struct TemplateManagerService<TAddr> {
pub struct TemplateManagerService<TAddr, S> {
rx_request: Receiver<TemplateManagerRequest>,
manager: TemplateManager<TAddr>,
epoch_manager: EpochManagerHandle<PeerAddress>,
completed_downloads: mpsc::Receiver<DownloadResult>,
download_queue: mpsc::Sender<DownloadRequest>,
rx_template_sync: broadcast::Receiver<TemplateSyncRequest>,
client_factory: Arc<TariValidatorNodeRpcClientFactory>,
state_store: S,
}

impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
impl<TAddr: NodeAddressable + 'static, S: tari_state_store_sqlite::SubstateStore + Send + Sync + 'static>
TemplateManagerService<TAddr, S>
{
pub fn spawn(
rx_request: Receiver<TemplateManagerRequest>,
manager: TemplateManager<TAddr>,
Expand All @@ -113,6 +121,7 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
completed_downloads: mpsc::Receiver<DownloadResult>,
rx_template_sync: broadcast::Receiver<TemplateSyncRequest>,
client_factory: TariValidatorNodeRpcClientFactory,
state_store: S,
shutdown: ShutdownSignal,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
Expand All @@ -124,6 +133,7 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
completed_downloads,
rx_template_sync,
client_factory: Arc::new(client_factory),
state_store,
}
.run(shutdown)
.await?;
Expand Down Expand Up @@ -179,10 +189,7 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {

/// Handles request to check if a template by address (substate ID and version) is present locally or needs syncing.
/// If needs syncing, just does it.
async fn handle_template_sync_request(
&mut self,
req: TemplateSyncRequest,
) -> Result<(), TemplateManagerServiceError> {
async fn handle_template_sync_request(&self, req: TemplateSyncRequest) -> Result<(), TemplateManagerServiceError> {
warn!(target: LOG_TARGET, "New template sync request: {req:?}"); // TODO: remove, only for testing

let template_address = req.address();
Expand All @@ -202,14 +209,13 @@ impl<TAddr: NodeAddressable + 'static> TemplateManagerService<TAddr> {
self.manager.add_pending_template(template_address)?;

// sync
let substate_id = SubstateId::from(PublishedTemplateAddress::from_hash(req.address()));
let version = self.state_store.get_latest_version(&substate_id).unwrap_or(0);
let mut owner_committee = self
.epoch_manager
.get_committee_for_substate(
self.epoch_manager.current_epoch().await?,
SubstateAddress::from_substate_id(
&SubstateId::from(PublishedTemplateAddress::from_hash(req.address())),
0,
), // TODO: get version somehow
SubstateAddress::from_substate_id(&substate_id, version),
)
.await?;
owner_committee.shuffle();
Expand Down
1 change: 1 addition & 0 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub async fn spawn_services(
epoch_manager.clone(),
validator_node_client_factory.clone(),
rx_template_sync,
substate_store.clone(),
shutdown.clone(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use log::*;
use tari_crypto::tari_utilities::hex::to_hex;
use tari_dan_common_types::{substate_type::SubstateType, Epoch, ShardGroup};
use tari_dan_storage::{consensus_models::BlockId, StorageError};
use tari_dan_storage::{
consensus_models::{BlockId, SubstateRecord},
StorageError,
};
use tari_dan_storage_sqlite::{error::SqliteStorageError, SqliteTransaction};
use tari_engine_types::substate::SubstateId;
use tari_indexer_client::types::ListSubstateItem;
Expand Down Expand Up @@ -135,6 +138,18 @@ pub trait SubstateStore {
}
}

impl tari_state_store_sqlite::SubstateStore for SqliteSubstateStore {
fn get_latest_version(&self, substate_id: &SubstateId) -> Result<u32, StorageError> {
let mut tx = self.create_read_tx()?;
tx.get_latest_version_for_substate(substate_id)?
.map(|version| version as u32)
.ok_or(StorageError::NotFound {
item: "substate",
key: substate_id.to_string(),
})
}
}

#[derive(Debug, Error)]
pub enum StoreError {
#[error("Storage error: {details}")]
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub async fn spawn_services(
epoch_manager.clone(),
validator_node_client_factory.clone(),
rx_template_sync,
state_store.clone(),
shutdown.clone(),
);
handles.push(join_handle);
Expand Down
132 changes: 0 additions & 132 deletions bindings/src/index.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> ReadableSubstateStore
}
Ok(substate.into_substate())
}

fn get_latest_version(&self, substate_id: &SubstateId) -> Result<u32, Self::Error> {
let (version, _) = SubstateRecord::get_latest_version(self.read_transaction(), substate_id)?;
Ok(version)
}
}

impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingSubstateStore<'a, 'tx, TStore> {
Expand Down
9 changes: 8 additions & 1 deletion dan_layer/consensus/src/traits/substate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tari_dan_storage::{
consensus_models::{SubstateChange, SubstateRecord},
StateStoreReadTransaction,
};
use tari_engine_types::substate::{Substate, SubstateDiff};
use tari_engine_types::substate::{Substate, SubstateDiff, SubstateId};
use tari_transaction::TransactionId;

use crate::hotstuff::substate_store::SubstateStoreError;
Expand All @@ -15,6 +15,8 @@ pub trait ReadableSubstateStore {
type Error;

fn get(&self, id: VersionedSubstateIdRef<'_>) -> Result<Substate, Self::Error>;

fn get_latest_version(&self, substate_id: &SubstateId) -> Result<u32, Self::Error>;
}

pub trait WriteableSubstateStore: ReadableSubstateStore {
Expand All @@ -34,4 +36,9 @@ impl<T: StateStoreReadTransaction> ReadableSubstateStore for &T {
let substate = SubstateRecord::get(*self, &id.to_substate_address())?;
Ok(substate.into_substate())
}

fn get_latest_version(&self, substate_id: &SubstateId) -> Result<u32, Self::Error> {
let (version, _) = SubstateRecord::get_latest_version(*self, substate_id)?;
Ok(version)
}
}
2 changes: 2 additions & 0 deletions dan_layer/state_store_sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod sql_models;
mod sqlite_transaction;
mod store;
// mod tree_store;
mod substate;
mod writer;

pub use store::SqliteStateStore;
pub use substate::SubstateStore;
Loading

0 comments on commit 037035a

Please sign in to comment.