diff --git a/applications/tari_validator_node/src/consensus/block_transaction_executor.rs b/applications/tari_validator_node/src/consensus/block_transaction_executor.rs index a7cd42bab8..d6bad7cd5a 100644 --- a/applications/tari_validator_node/src/consensus/block_transaction_executor.rs +++ b/applications/tari_validator_node/src/consensus/block_transaction_executor.rs @@ -1,7 +1,7 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use indexmap::IndexMap; use log::info; @@ -13,10 +13,11 @@ use tari_dan_app_utilities::transaction_executor::TransactionExecutor; use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, AtomicDb, StateWriter}; use tari_dan_storage::{ - consensus_models::{ExecutedTransaction, TransactionRecord}, + consensus_models::{ExecutedTransaction, SubstateLockFlag, TransactionRecord, VersionedSubstateIdLockIntent}, StateStore, }; use tari_engine_types::{ + commit_result::{ExecuteResult, FinalizeResult, RejectReason, TransactionResult}, substate::{Substate, SubstateId}, virtual_substate::{VirtualSubstate, VirtualSubstateId, VirtualSubstates}, }; @@ -138,10 +139,38 @@ where store: &PendingSubstateStore, current_epoch: Epoch, ) -> Result { - let id: tari_transaction::TransactionId = *transaction.id(); + let id = *transaction.id(); // Get the latest input substates - let inputs = self.resolve_substates::(&transaction, store)?; + let inputs = match self.resolve_substates::(&transaction, store) { + Ok(inputs) => inputs, + Err(err) => { + // TODO: Hacky - if a transaction uses DOWNed/non-existent inputs we error here. This changes the hard + // error to a propose REJECT. So that we have involved shards, we use the inputs as resolved inputs and + // assume v0 if version is not provided. + let inputs = transaction + .all_inputs_iter() + .map(|input| VersionedSubstateId::new(input.substate_id, input.version.unwrap_or(0))) + .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockFlag::Write)) + .collect(); + return Ok(ExecutedTransaction::new( + transaction, + ExecuteResult { + finalize: FinalizeResult { + transaction_hash: id.into_array().into(), + events: vec![], + logs: vec![], + execution_results: vec![], + result: TransactionResult::Reject(RejectReason::ExecutionFailure(err.to_string())), + fee_receipt: Default::default(), + }, + }, + inputs, + vec![], + Duration::from_secs(0), + )); + }, + }; info!(target: LOG_TARGET, "Transaction {} executing. Inputs: {:?}", id, inputs); // Create a memory db with all the input substates, needed for the transaction execution diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 50ae896b8f..a865f28fe5 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -24,7 +24,7 @@ use std::convert::{TryFrom, TryInto}; use log::*; use tari_bor::{decode_exact, encode}; -use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, ShardGroup, SubstateAddress}; +use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, SubstateAddress}; use tari_dan_p2p::{ proto, proto::rpc::{ @@ -371,21 +371,19 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { let (sender, receiver) = mpsc::channel(10); - let last_state_transition_for_chain = - StateTransitionId::new(Epoch(req.start_epoch), Shard::from(req.start_shard), req.start_seq); + let start_epoch = Epoch(req.start_epoch); + let start_shard = Shard::from(req.start_shard); + let last_state_transition_for_chain = StateTransitionId::new(start_epoch, start_shard, req.start_seq); - // TODO: validate that we can provide the required sync data - let current_shard = ShardGroup::decode_from_u32(req.current_shard_group) - .ok_or_else(|| RpcStatus::bad_request("Invalid shard group"))?; - let current_epoch = Epoch(req.current_epoch); - info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({current_epoch}, {current_shard})"); + let end_epoch = Epoch(req.current_epoch); + info!(target: LOG_TARGET, "🌍peer initiated sync with this node ({}, {}, seq={}) to {}", start_epoch, start_shard, req.start_seq, end_epoch); task::spawn( StateSyncTask::new( self.shard_state_store.clone(), sender, last_state_transition_for_chain, - current_epoch, + end_epoch, ) .run(), ); diff --git a/applications/tari_validator_node/src/p2p/rpc/state_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/state_sync_task.rs index 24322886a8..494d0baea7 100644 --- a/applications/tari_validator_node/src/p2p/rpc/state_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/state_sync_task.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::Epoch; +use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_p2p::proto::rpc::SyncStateResponse; use tari_dan_storage::{ consensus_models::{StateTransition, StateTransitionId}, @@ -43,7 +43,7 @@ impl StateSyncTask { pub async fn run(mut self) -> Result<(), ()> { let mut buffer = Vec::with_capacity(BATCH_SIZE); let mut current_state_transition_id = self.start_state_transition_id; - let mut counter = 0; + let mut counter = 0usize; loop { match self.fetch_next_batch(&mut buffer, current_state_transition_id) { Ok(Some(last_state_transition_id)) => { @@ -57,6 +57,7 @@ impl StateSyncTask { // )))) // .await?; + info!(target: LOG_TARGET, "🌍sync complete ({}). {} update(s) sent.", current_state_transition_id, counter); // Finished return Ok(()); }, @@ -92,7 +93,9 @@ impl StateSyncTask { ) -> Result, StorageError> { self.store.with_read_tx(|tx| { let state_transitions = - StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch)?; + StateTransition::get_n_after(tx, BATCH_SIZE, current_state_transition_id, self.current_epoch) + .optional()? + .unwrap_or_default(); let Some(last) = state_transitions.last() else { return Ok(None); diff --git a/dan_layer/common_types/src/shard.rs b/dan_layer/common_types/src/shard.rs index d11374a5de..5e57524776 100644 --- a/dan_layer/common_types/src/shard.rs +++ b/dan_layer/common_types/src/shard.rs @@ -113,7 +113,7 @@ impl PartialEq for u32 { impl Display for Shard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.as_u32()) + write!(f, "Shard({})", self.as_u32()) } } diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 7c1e2dcce1..fa7a55326b 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -63,6 +63,8 @@ impl ProposedBlockChangeSet { self.quorum_decision = None; self.block_diff = Vec::new(); self.transaction_changes.clear(); + self.state_tree_diffs.clear(); + self.substate_locks.clear(); self } diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index d22e60a655..4c890731e4 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -242,7 +242,7 @@ where TConsensusSpec: ConsensusSpec ) -> Result { let transaction = TransactionRecord::get(store.read_transaction(), transaction_id)?; - // TODO: this can fail due to unknown inputs. Need to return an ABORT executed transaction + // TODO: check the failure cases for this. Some failures should not cause consensus to fail let executed = self .transaction_executor .execute(transaction.into_transaction(), store, current_epoch) diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 06a61b08cd..3883cc483c 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -195,7 +195,6 @@ where TConsensusSpec: ConsensusSpec // Store used for transactions that have inputs without specific versions. // It lives through the entire block so multiple transactions can be sequenced together in the same block - // let tree_store = ChainScopedTreeStore::new(block.epoch(), block.shard_group(), tx); let mut substate_store = PendingSubstateStore::new(tx, *block.parent(), self.config.num_preshards); let mut proposed_block_change_set = ProposedBlockChangeSet::new(block.as_leaf_block()); @@ -719,8 +718,7 @@ where TConsensusSpec: ConsensusSpec block.total_leader_fee(), total_leader_fee ); - // TODO: investigate - // return Ok(proposed_block_change_set.no_vote()); + return Ok(proposed_block_change_set.no_vote()); } let pending = PendingStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?; @@ -883,7 +881,7 @@ where TConsensusSpec: ConsensusSpec // NOTE: this must happen before we commit the diff because the state transitions use this version let pending = PendingStateTreeDiff::remove_by_block(tx, block.id())?; let mut state_tree = ShardedStateTree::new(tx); - state_tree.commit_diff(pending)?; + state_tree.commit_diffs(pending)?; let tx = state_tree.into_transaction(); let local_diff = diff.into_filtered(local_committee_info); diff --git a/dan_layer/consensus/src/hotstuff/substate_store/mod.rs b/dan_layer/consensus/src/hotstuff/substate_store/mod.rs index 86e15a2f09..b3831edd37 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/mod.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/mod.rs @@ -9,3 +9,4 @@ mod sharded_store; pub use error::*; pub use pending_store::*; pub use sharded_state_tree::*; +pub use sharded_store::*; diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index 3e0477af3b..5d93696a6f 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -151,35 +151,6 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor Ok(substate.into_substate()) } - // pub fn calculate_jmt_diff_for_block( - // &mut self, - // block: &Block, - // ) -> Result<(FixedHash, StateHashTreeDiff), SubstateStoreError> { - // let current_version = block.justify().block_height().as_u64(); - // let next_version = block.height().as_u64(); - // - // let pending = PendingStateTreeDiff::get_all_up_to_commit_block( - // self.read_transaction(), - // block.epoch(), - // block.shard_group(), - // block.justify().block_id(), - // )?; - // - // let changes = self.diff.iter().map(|ch| match ch { - // SubstateChange::Up { id, substate, .. } => SubstateTreeChange::Up { - // id: id.substate_id.clone(), - // value_hash: hash_substate(substate.substate_value(), substate.version()), - // }, - // SubstateChange::Down { id, .. } => SubstateTreeChange::Down { - // id: id.substate_id.clone(), - // }, - // }); - // let (state_root, state_tree_diff) = - // calculate_state_merkle_diff(&self.store, current_version, next_version, pending, changes)?; - // - // Ok((state_root, state_tree_diff)) - // } - pub fn try_lock_all>( &mut self, transaction_id: TransactionId, @@ -311,6 +282,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor // - it MUST NOT be locked as READ, WRITE or OUTPUT, unless // - if Same-Transaction OR Local-Only-Rules: // - it MAY be locked as WRITE or READ + // - it MUST NOT be locked as OUTPUT SubstateLockFlag::Output => { if !same_transaction && !has_local_only_rules { warn!( diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs index 5d40a790d0..6f23872f3b 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs @@ -16,6 +16,7 @@ use tari_state_tree::{ JmtStorageError, SpreadPrefixStateTree, StagedTreeStore, + StateHashTreeDiff, StateTreeError, SubstateTreeChange, TreeStoreWriter, @@ -29,7 +30,7 @@ const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree"; pub struct ShardedStateTree { tx: TTx, pending_diffs: HashMap>, - current_tree_diffs: IndexMap, + sharded_tree_diffs: IndexMap, } impl ShardedStateTree { @@ -37,7 +38,7 @@ impl ShardedStateTree { Self { tx, pending_diffs: HashMap::new(), - current_tree_diffs: IndexMap::new(), + sharded_tree_diffs: IndexMap::new(), } } @@ -45,6 +46,10 @@ impl ShardedStateTree { Self { pending_diffs, ..self } } + pub fn transaction(&self) -> &TTx { + &self.tx + } + pub fn into_transaction(self) -> TTx { self.tx } @@ -69,7 +74,7 @@ impl ShardedStateTree<&TTx> { } pub fn into_versioned_tree_diffs(self) -> IndexMap { - self.current_tree_diffs + self.sharded_tree_diffs } pub fn put_substate_tree_changes( @@ -104,7 +109,7 @@ impl ShardedStateTree<&TTx> { debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len()); let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?; state_roots.update(&state_root); - self.current_tree_diffs + self.sharded_tree_diffs .insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff())); } @@ -114,30 +119,40 @@ impl ShardedStateTree<&TTx> { } impl ShardedStateTree<&mut TTx> { - pub fn commit_diff(&mut self, diffs: IndexMap>) -> Result<(), StateTreeError> { + pub fn commit_diffs(&mut self, diffs: IndexMap>) -> Result<(), StateTreeError> { for (shard, pending_diffs) in diffs { for pending_diff in pending_diffs { let version = pending_diff.version; let diff = pending_diff.diff; - let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); - - for stale_tree_node in diff.stale_tree_nodes { - debug!( - "(shard={shard}) Recording stale tree node: {}", - stale_tree_node.as_node_key() - ); - store.record_stale_tree_node(stale_tree_node)?; - } + self.commit_diff(shard, version, diff)?; + } + } - for (key, node) in diff.new_nodes { - debug!("(shard={shard}) Inserting node: {}", key); - store.insert_node(key, node)?; - } + Ok(()) + } - store.set_version(version)?; - } + pub fn commit_diff( + &mut self, + shard: Shard, + version: Version, + diff: StateHashTreeDiff, + ) -> Result<(), StateTreeError> { + let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); + + for stale_tree_node in diff.stale_tree_nodes { + debug!( + "(shard={shard}) Recording stale tree node: {}", + stale_tree_node.as_node_key() + ); + store.record_stale_tree_node(stale_tree_node)?; + } + + for (key, node) in diff.new_nodes { + debug!("(shard={shard}) Inserting node: {}", key); + store.insert_node(key, node)?; } + store.set_version(version)?; Ok(()) } } diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs index 456b3a04e5..2fa8a54c09 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/sharded_store.rs @@ -46,6 +46,10 @@ impl<'a, TTx: StateStoreWriteTransaction> ShardScopedTreeStoreWriter<'a, TTx> { .state_tree_shard_versions_set(self.shard, version) .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string())) } + + pub fn transaction(&mut self) -> &mut TTx { + self.tx + } } impl<'a, TTx> TreeStoreReader for ShardScopedTreeStoreWriter<'a, TTx> diff --git a/dan_layer/p2p/proto/rpc.proto b/dan_layer/p2p/proto/rpc.proto index 7264507aa9..49503860c1 100644 --- a/dan_layer/p2p/proto/rpc.proto +++ b/dan_layer/p2p/proto/rpc.proto @@ -240,7 +240,6 @@ message SyncStateRequest { // The shard in the current shard-epoch that is requested. // This will limit the state transitions returned to those that fall within this shard-epoch. uint64 current_epoch = 4; - uint32 current_shard_group = 5; } message SyncStateResponse { diff --git a/dan_layer/rpc_state_sync/src/error.rs b/dan_layer/rpc_state_sync/src/error.rs index 3e297d828e..e98542bdb0 100644 --- a/dan_layer/rpc_state_sync/src/error.rs +++ b/dan_layer/rpc_state_sync/src/error.rs @@ -8,6 +8,7 @@ use tari_dan_storage::{ }; use tari_epoch_manager::EpochManagerError; use tari_rpc_framework::{RpcError, RpcStatus}; +use tari_state_tree::JmtStorageError; use tari_validator_node_rpc::ValidatorNodeRpcClientError; #[derive(Debug, thiserror::Error)] @@ -34,12 +35,27 @@ pub enum CommsRpcConsensusSyncError { StateTreeError(#[from] tari_state_tree::StateTreeError), } +impl CommsRpcConsensusSyncError { + pub fn error_at_remote(self) -> Result { + match &self { + CommsRpcConsensusSyncError::InvalidResponse(_) | CommsRpcConsensusSyncError::RpcError(_) => Err(self), + _ => Ok(self), + } + } +} + impl From for HotStuffError { fn from(value: CommsRpcConsensusSyncError) -> Self { HotStuffError::SyncError(value.into()) } } +impl From for CommsRpcConsensusSyncError { + fn from(value: JmtStorageError) -> Self { + Self::StateTreeError(value.into()) + } +} + impl From for CommsRpcConsensusSyncError { fn from(value: RpcStatus) -> Self { Self::RpcError(value.into()) diff --git a/dan_layer/rpc_state_sync/src/manager.rs b/dan_layer/rpc_state_sync/src/manager.rs index b90df309b8..1eb89c4100 100644 --- a/dan_layer/rpc_state_sync/src/manager.rs +++ b/dan_layer/rpc_state_sync/src/manager.rs @@ -1,13 +1,16 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::collections::HashMap; +use std::cmp; use anyhow::anyhow; use async_trait::async_trait; use futures::StreamExt; use log::*; -use tari_consensus::traits::{ConsensusSpec, SyncManager, SyncStatus}; +use tari_consensus::{ + hotstuff::substate_store::ShardScopedTreeStoreWriter, + traits::{ConsensusSpec, SyncManager, SyncStatus}, +}; use tari_dan_common_types::{committee::Committee, optional::Optional, shard::Shard, Epoch, NodeHeight, PeerAddress}; use tari_dan_p2p::proto::rpc::{GetCheckpointRequest, GetCheckpointResponse, SyncStateRequest}; use tari_dan_storage::{ @@ -17,16 +20,20 @@ use tari_dan_storage::{ LeafBlock, QcId, StateTransition, + StateTransitionId, SubstateCreatedProof, SubstateDestroyedProof, SubstateRecord, SubstateUpdate, }, StateStore, + StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, }; +use tari_engine_types::substate::hash_substate; use tari_epoch_manager::EpochManagerReader; +use tari_state_tree::{Hash, SpreadPrefixStateTree, SubstateTreeChange}; use tari_transaction::VersionedSubstateId; use tari_validator_node_rpc::{ client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory}, @@ -89,20 +96,32 @@ where TConsensusSpec: ConsensusSpec } } + #[allow(clippy::too_many_lines)] async fn start_state_sync( &self, client: &mut ValidatorNodeRpcClient, + shard: Shard, checkpoint: EpochCheckpoint, ) -> Result<(), CommsRpcConsensusSyncError> { let current_epoch = self.epoch_manager.current_epoch().await?; - let committee_info = self.epoch_manager.get_local_committee_info(current_epoch).await?; - let last_state_transition_id = self.state_store.with_read_tx(|tx| StateTransition::get_last_id(tx))?; + let last_state_transition_id = self + .state_store + .with_read_tx(|tx| StateTransition::get_last_id(tx, shard)) + .optional()? + .unwrap_or_else(|| StateTransitionId::initial(shard)); if current_epoch == last_state_transition_id.epoch() { info!(target: LOG_TARGET, "🛜Already up to date. No need to sync."); return Ok(()); } + // Minimum epoch we should request is 1 since Epoch(0) is the genesis epoch. + let last_state_transition_id = StateTransitionId::new( + cmp::max(last_state_transition_id.epoch(), Epoch(1)), + last_state_transition_id.shard(), + last_state_transition_id.seq(), + ); + info!( target: LOG_TARGET, "🛜Syncing from state transition {last_state_transition_id}" @@ -114,7 +133,6 @@ where TConsensusSpec: ConsensusSpec start_shard: last_state_transition_id.shard().as_u32(), start_seq: last_state_transition_id.seq(), current_epoch: current_epoch.as_u64(), - current_shard_group: committee_info.shard_group().encode_as_u32(), }) .await?; @@ -129,13 +147,56 @@ where TConsensusSpec: ConsensusSpec }, }; - info!(target: LOG_TARGET, "🛜 Next state updates batch of size {}", msg.transitions.len()); + if msg.transitions.is_empty() { + return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( + "Received empty state transition batch." + ))); + } self.state_store.with_write_tx(|tx| { + let persisted_version = tx.state_tree_versions_get_latest(shard)?.unwrap_or(0); + let mut current_version =persisted_version; + let mut next_version = msg.transitions.first().expect("non-empty batch already checked").state_tree_version; + + info!( + target: LOG_TARGET, + "🛜 Next state updates batch of size {} (v{}-v{})", + msg.transitions.len(), + current_version, + msg.transitions.last().unwrap().state_tree_version, + ); + + let mut store = ShardScopedTreeStoreWriter::new(tx, shard); + let mut tree_changes = vec![]; + let mut merkle_root = Hash::zero(); + + for transition in msg.transitions { let transition = StateTransition::try_from(transition).map_err(CommsRpcConsensusSyncError::InvalidResponse)?; - info!(target: LOG_TARGET, "🛜 Applied state update {transition}"); + if transition.id.shard() != shard { + return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( + "Received state transition for shard {} which is not the expected shard {}.", + transition.id.shard(), + shard + ))); + } + + if transition.state_tree_version < current_version { + return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( + "Received state transition with version {} that is not monotonically increasing (expected \ + >= {})", + transition.state_tree_version, + persisted_version + ))); + } + + if transition.id.epoch().is_zero() { + return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( + "Received state transition with epoch 0." + ))); + } + if transition.id.epoch() >= current_epoch { return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( "Received state transition for epoch {} which is at or ahead of our current epoch {}.", @@ -144,27 +205,39 @@ where TConsensusSpec: ConsensusSpec ))); } - self.commit_update(tx, &checkpoint, transition)?; + let change = match &transition.update { + SubstateUpdate::Create(create) => SubstateTreeChange::Up { + id: create.substate.substate_id.clone(), + value_hash: hash_substate(&create.substate.substate_value, create.substate.version), + }, + SubstateUpdate::Destroy(destroy) => SubstateTreeChange::Down { + id: destroy.substate_id.clone(), + }, + }; + + info!(target: LOG_TARGET, "🛜 Applying state update {transition} (v{} to v{})", current_version, transition.state_tree_version); + if next_version != transition.state_tree_version { + let mut state_tree = SpreadPrefixStateTree::new(&mut store); + merkle_root = state_tree.put_substate_changes(Some(current_version).filter(|v| *v > 0), next_version, tree_changes.drain(..))?; + current_version = next_version; + next_version = transition.state_tree_version; + } + tree_changes.push(change); + + self.commit_update(store.transaction(), &checkpoint, transition)?; + } + + if !tree_changes.is_empty() { + let mut state_tree = SpreadPrefixStateTree::new(&mut store); + merkle_root = state_tree.put_substate_changes(Some(current_version).filter(|v| *v > 0), next_version, tree_changes.drain(..))?; } + current_version = next_version; - // let current_version = block.justify().block_height().as_u64(); - // let next_version = block.height().as_u64(); - // - // let changes = updates.iter().map(|update| match update { - // SubstateUpdate::Create(create) => SubstateTreeChange::Up { - // id: create.substate.substate_id.clone(), - // value_hash: hash_substate(&create.substate.substate_value, create.substate.version), - // }, - // SubstateUpdate::Destroy(destroy) => SubstateTreeChange::Down { - // id: destroy.substate_id.clone(), - // }, - // }); - // - // let mut store = ChainScopedTreeStore::new(epoch, shard, tx); - // let mut tree = tari_state_tree::SpreadPrefixStateTree::new(&mut store); - // let _state_root = tree.put_substate_changes(current_version, next_version, changes)?; - - Ok::<_, CommsRpcConsensusSyncError>(()) + if current_version > 0 { + store.set_version(current_version)?; + } + + Ok::<_, CommsRpcConsensusSyncError>(merkle_root) })?; } @@ -211,13 +284,14 @@ where TConsensusSpec: ConsensusSpec )?; }, } + Ok(()) } async fn get_sync_committees( &self, current_epoch: Epoch, - ) -> Result>, CommsRpcConsensusSyncError> { + ) -> Result)>, CommsRpcConsensusSyncError> { // We are behind at least one epoch. // We get the current substate range, and we asks committees from previous epoch in this range to give us // data. @@ -228,6 +302,10 @@ where TConsensusSpec: ConsensusSpec .epoch_manager .get_committees_by_shard_group(prev_epoch, local_info.shard_group()) .await?; + + // TODO: not strictly necessary to sort by shard but easier on the eyes in logs + let mut committees = committees.into_iter().collect::>(); + committees.sort_by_key(|(k, _)| *k); Ok(committees) } } @@ -269,7 +347,7 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static // Sync data from each committee in range of the committee we're joining. // NOTE: we don't have to worry about substates in address range because shard boundaries are fixed. for (shard, mut committee) in prev_epoch_committees { - info!(target: LOG_TARGET, "🛜Syncing state for shard {shard} for epoch {}", current_epoch.saturating_sub(Epoch(1))); + info!(target: LOG_TARGET, "🛜Syncing state for {shard} and {}", current_epoch.saturating_sub(Epoch(1))); committee.shuffle(); for (addr, public_key) in committee { if our_vn.public_key == public_key { @@ -309,7 +387,8 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static }; info!(target: LOG_TARGET, "🛜 Checkpoint: {checkpoint}"); - if let Err(err) = self.start_state_sync(&mut client, checkpoint).await { + if let Err(err) = self.start_state_sync(&mut client, shard, checkpoint).await { + let err = err.error_at_remote()?; warn!( target: LOG_TARGET, "⚠️Failed to sync state from {addr}: {err}. Attempting another peer if available" @@ -317,6 +396,7 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static last_error = Some(err); continue; } + break; } } diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 0377725e09..2913003cc0 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -359,6 +359,22 @@ create table state_tree_shard_versions created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); +CREATE TABLE shard_group_state_tree +( + id integer not NULL primary key AUTOINCREMENT, + epoch bigint not NULL, + key text not NULL, + node text not NULL, + is_stale boolean not null default '0' +); + +-- Scoping by shard +CREATE INDEX shard_group_state_tree_idx_shard_key on shard_group_state_tree (epoch) WHERE is_stale = false; +-- Duplicate keys are not allowed +CREATE UNIQUE INDEX shard_group_state_tree_uniq_idx_key on shard_group_state_tree (epoch, key) WHERE is_stale = false; +-- filtering out or by is_stale is used in every query +CREATE INDEX shard_group_state_tree_idx_is_stale on shard_group_state_tree (is_stale); + -- One entry per shard CREATE UNIQUE INDEX state_tree_uniq_shard_versions_shard on state_tree_shard_versions (shard); @@ -395,6 +411,8 @@ CREATE TABLE state_transitions created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (substate_address) REFERENCES substates (address) ); +CREATE UNIQUE INDEX state_transitions_shard_seq on state_transitions (shard, seq); +CREATE INDEX state_transitions_epoch on state_transitions (epoch); -- Debug Triggers CREATE TABLE transaction_pool_history diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 83168eb7b6..1b120aefb8 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -3,6 +3,7 @@ use std::{ borrow::Borrow, + cmp, collections::{HashMap, HashSet}, marker::PhantomData, ops::RangeInclusive, @@ -679,8 +680,6 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor // TODO: This gets slower as the chain progresses. let block_ids = self.get_block_ids_between(&BlockId::zero(), from_block_id)?; - log::error!(target: LOG_TARGET, "Block_ids = {}", block_ids.join(", ")); - let execution = transaction_executions::table .filter(transaction_executions::transaction_id.eq(serialize_hex(tx_id))) .filter(transaction_executions::block_id.eq_any(block_ids)) @@ -2017,25 +2016,29 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result, StorageError> { use crate::schema::{state_transitions, substates}; - let start_id = state_transitions::table - .select(state_transitions::id) - .filter(state_transitions::epoch.eq(id.epoch().as_u64() as i64)) + // Never return epoch 0 state transitions + let min_epoch = Some(id.epoch().as_u64()).filter(|e| *e > 0).unwrap_or(1) as i64; + let (start_id, seq) = state_transitions::table + .select((state_transitions::id, state_transitions::seq)) + .filter(state_transitions::epoch.ge(min_epoch)) .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) - .filter(state_transitions::seq.eq(0i64)) - .order_by(state_transitions::id.asc()) - .first::(self.connection()) + .order_by(state_transitions::seq.asc()) + .first::<(i32, i64)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_transitions_get_n_after", source: e, })?; - let start_id = start_id + (id.seq() as i32); + let offset = cmp::max(id.seq() as i64, seq); + let start_id = + start_id + i32::try_from(offset).expect("(likely invalid) seq no for transition is too large for SQLite"); let transitions = state_transitions::table .left_join(substates::table.on(state_transitions::substate_address.eq(substates::address))) .select((state_transitions::all_columns, substates::all_columns.nullable())) - .filter(state_transitions::id.gt(start_id)) + .filter(state_transitions::id.ge(start_id)) .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) + .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) .limit(n as i64) .get_results::<(sql_models::StateTransition, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { @@ -2055,25 +2058,21 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor .collect() } - fn state_transitions_get_last_id(&self) -> Result { + fn state_transitions_get_last_id(&self, shard: Shard) -> Result { use crate::schema::state_transitions; - let (seq, epoch, shard) = state_transitions::table - .select(( - state_transitions::seq, - state_transitions::epoch, - state_transitions::shard, - )) + let (seq, epoch) = state_transitions::table + .select((state_transitions::seq, state_transitions::epoch)) + .filter(state_transitions::shard.eq(shard.as_u32() as i32)) .order_by(state_transitions::epoch.desc()) .then_order_by(state_transitions::seq.desc()) - .first::<(i64, i64, i32)>(self.connection()) + .first::<(i64, i64)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_transitions_get_last_id", source: e, })?; let epoch = Epoch(epoch as u64); - let shard = Shard::from(shard as u32); let seq = seq as u64; Ok(StateTransitionId::new(epoch, shard, seq)) diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index fbed1ad76f..5ba56000cc 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1354,7 +1354,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let seq = state_transitions::table .select(dsl::max(state_transitions::seq)) - .filter(state_transitions::epoch.eq(substate.created_at_epoch.as_u64() as i64)) + .filter(state_transitions::shard.eq(substate.created_by_shard.as_u32() as i32)) .first::>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "substates_create", @@ -1420,7 +1420,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let seq = state_transitions::table .select(dsl::max(state_transitions::seq)) - .filter(state_transitions::epoch.eq(epoch.as_u64() as i64)) + .filter(state_transitions::shard.eq(shard.as_u32() as i32)) .first::>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "substates_create", @@ -1428,6 +1428,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta })?; let next_seq = seq.map(|s| s + 1).unwrap_or(0); + let version = self.state_tree_versions_get_latest(shard)?; let values = ( state_transitions::seq.eq(next_seq), state_transitions::epoch.eq(epoch.as_u64() as i64), @@ -1436,7 +1437,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta state_transitions::substate_id.eq(versioned_substate_id.substate_id.to_string()), state_transitions::version.eq(versioned_substate_id.version as i32), state_transitions::transition.eq("DOWN"), - state_transitions::state_version.eq(destroyed_block_height.as_u64() as i64), + state_transitions::state_version.eq(version.unwrap_or(0) as i64), ); diesel::insert_into(state_transitions::table) diff --git a/dan_layer/state_tree/Cargo.toml b/dan_layer/state_tree/Cargo.toml index 73b695bda0..01a273ab3a 100644 --- a/dan_layer/state_tree/Cargo.toml +++ b/dan_layer/state_tree/Cargo.toml @@ -17,7 +17,7 @@ hex = { workspace = true } thiserror = { workspace = true } serde = { workspace = true, features = ["derive"] } log = { workspace = true } -indexmap = { workspace = true } +indexmap = { workspace = true, features = ["serde"] } [dev-dependencies] indexmap = { workspace = true } diff --git a/dan_layer/state_tree/src/jellyfish/mod.rs b/dan_layer/state_tree/src/jellyfish/mod.rs index e4b8bb4ca9..1c6a6d89ad 100644 --- a/dan_layer/state_tree/src/jellyfish/mod.rs +++ b/dan_layer/state_tree/src/jellyfish/mod.rs @@ -11,4 +11,5 @@ mod types; pub use types::*; mod store; + pub use store::*; diff --git a/dan_layer/state_tree/src/jellyfish/types.rs b/dan_layer/state_tree/src/jellyfish/types.rs index 7ebd6914f8..b54ee30095 100644 --- a/dan_layer/state_tree/src/jellyfish/types.rs +++ b/dan_layer/state_tree/src/jellyfish/types.rs @@ -98,16 +98,16 @@ pub type Hash = tari_common_types::types::FixedHash; hash_domain!(SparseMerkleTree, "com.tari.dan.state_tree", 0); -fn hasher() -> TariHasher { - tari_hasher::("hash") +fn jmt_node_hasher() -> TariHasher { + tari_hasher::("JmtNode") } -pub fn hash(data: &T) -> Hash { - hasher().chain(data).result() +pub fn jmt_node_hash(data: &T) -> Hash { + jmt_node_hasher().chain(data).result() } -pub fn hash2(d1: &[u8], d2: &[u8]) -> Hash { - hasher().chain(d1).chain(d2).result() +pub fn jmt_node_hash2(d1: &[u8], d2: &[u8]) -> Hash { + jmt_node_hasher().chain(d1).chain(d2).result() } // SOURCE: https://github.com/aptos-labs/aptos-core/blob/1.0.4/types/src/proof/definition.rs#L182 @@ -274,7 +274,7 @@ impl SparseMerkleLeafNode { } pub fn hash(&self) -> Hash { - hash2(self.key.bytes.as_slice(), self.value_hash.as_slice()) + jmt_node_hash2(self.key.bytes.as_slice(), self.value_hash.as_slice()) } } @@ -292,7 +292,7 @@ impl SparseMerkleInternalNode { } fn hash(&self) -> Hash { - hash2(self.left_child.as_bytes(), self.right_child.as_bytes()) + jmt_node_hash2(self.left_child.as_bytes(), self.right_child.as_bytes()) } } @@ -1150,7 +1150,7 @@ impl LeafNode

{ /// changes within a sparse merkle tree (consider 2 trees, both containing a single element with /// the same value, but stored under different keys - we want their root hashes to differ). pub fn leaf_hash(&self) -> Hash { - hash2(self.leaf_key.bytes.as_slice(), self.value_hash.as_slice()) + jmt_node_hash2(self.leaf_key.bytes.as_slice(), self.value_hash.as_slice()) } } diff --git a/dan_layer/state_tree/src/key_mapper.rs b/dan_layer/state_tree/src/key_mapper.rs index 344713f360..60ac6b0b0d 100644 --- a/dan_layer/state_tree/src/key_mapper.rs +++ b/dan_layer/state_tree/src/key_mapper.rs @@ -13,7 +13,7 @@ pub struct SpreadPrefixKeyMapper; impl DbKeyMapper for SpreadPrefixKeyMapper { fn map_to_leaf_key(id: &SubstateId) -> LeafKey { - let hash = crate::jellyfish::hash(id); + let hash = crate::jellyfish::jmt_node_hash(id); LeafKey::new(hash.to_vec()) } } diff --git a/dan_layer/state_tree/src/tree.rs b/dan_layer/state_tree/src/tree.rs index 623513ef48..fa58817a2e 100644 --- a/dan_layer/state_tree/src/tree.rs +++ b/dan_layer/state_tree/src/tree.rs @@ -45,6 +45,12 @@ impl<'a, S: TreeStoreReader, M: DbKeyMapper> StateTree<'a, S, M> { let (maybe_value, proof) = smt.get_with_proof_ext(key.as_ref(), version)?; Ok((maybe_value, proof)) } + + pub fn get_root_hash(&self, version: Version) -> Result { + let smt = JellyfishMerkleTree::new(self.store); + let root_hash = smt.get_root_hash(version)?; + Ok(root_hash) + } } impl<'a, S: TreeStore, M: DbKeyMapper> StateTree<'a, S, M> { diff --git a/dan_layer/state_tree/tests/support.rs b/dan_layer/state_tree/tests/support.rs index d6f33319da..7dcf31c830 100644 --- a/dan_layer/state_tree/tests/support.rs +++ b/dan_layer/state_tree/tests/support.rs @@ -64,6 +64,16 @@ impl> HashTreeTester { .put_substate_changes(current_version, next_version, changes) .unwrap() } + + pub fn put_changes_at_version(&mut self, changes: impl IntoIterator) -> Hash { + let next_version = self + .current_version + .expect("call put_changes_at_version with None version"); + let current_version = self.current_version.unwrap(); + StateTree::<_, IdentityMapper>::new(&mut self.tree_store) + .put_substate_changes(Some(current_version), next_version, changes) + .unwrap() + } } impl HashTreeTester { diff --git a/dan_layer/state_tree/tests/test.rs b/dan_layer/state_tree/tests/test.rs index c038d736ff..7fb093d3f4 100644 --- a/dan_layer/state_tree/tests/test.rs +++ b/dan_layer/state_tree/tests/test.rs @@ -92,6 +92,18 @@ fn hash_computed_consistently_after_adding_higher_tier_sibling() { assert_eq!(root_after_adding_sibling, reference_root); } +#[test] +fn hash_allows_putting_in_same_version() { + let mut tester_1 = HashTreeTester::new_empty(); + tester_1.put_substate_changes(vec![change(1, Some(30))]); + tester_1.put_substate_changes(vec![change(2, Some(31))]); + // Append another change to the same version + let hash_1 = tester_1.put_changes_at_version(vec![change(3, Some(32))]); + let mut tester_2 = HashTreeTester::new_empty(); + let hash_2 = tester_2.put_substate_changes(vec![change(1, Some(30)), change(2, Some(31)), change(3, Some(32))]); + assert_eq!(hash_1, hash_2); +} + #[test] fn hash_differs_when_states_only_differ_by_node_key() { let mut tester_1 = HashTreeTester::new_empty(); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index d59021608a..d3d5d1cff4 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -939,11 +939,12 @@ impl Display for Block { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "[{}, {}, {}, {} command(s)]", + "[{}, {}, {}, {} cmd(s), {}]", self.height(), self.epoch(), + self.shard_group(), + self.commands().len(), self.id(), - self.commands().len() ) } } diff --git a/dan_layer/storage/src/consensus_models/state_transition.rs b/dan_layer/storage/src/consensus_models/state_transition.rs index d5bba0600a..311676e9cb 100644 --- a/dan_layer/storage/src/consensus_models/state_transition.rs +++ b/dan_layer/storage/src/consensus_models/state_transition.rs @@ -8,6 +8,7 @@ use std::{ }; use tari_dan_common_types::{shard::Shard, Epoch}; +use tari_state_tree::Version; use crate::{consensus_models::SubstateUpdate, StateStoreReadTransaction, StorageError}; @@ -15,7 +16,7 @@ use crate::{consensus_models::SubstateUpdate, StateStoreReadTransaction, Storage pub struct StateTransition { pub id: StateTransitionId, pub update: SubstateUpdate, - pub state_tree_version: u64, + pub state_tree_version: Version, } impl StateTransition { @@ -28,8 +29,11 @@ impl StateTransition { tx.state_transitions_get_n_after(n, after_id, end_epoch) } - pub fn get_last_id(tx: &TTx) -> Result { - tx.state_transitions_get_last_id() + pub fn get_last_id( + tx: &TTx, + shard: Shard, + ) -> Result { + tx.state_transitions_get_last_id(shard) } } @@ -52,6 +56,10 @@ impl StateTransitionId { Self { epoch, shard, seq } } + pub fn initial(shard: Shard) -> Self { + Self::new(Epoch(1), shard, 0) + } + pub fn from_bytes(mut bytes: &[u8]) -> Option { if bytes.len() < Self::BYTE_SIZE { return None; @@ -89,7 +97,7 @@ impl Display for StateTransitionId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "StateTransition(epoch = {}, shard = {}, seq = {})", + "StateTransition({}, {}, seq = {})", self.epoch(), self.shard(), self.seq() diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index d073eddee0..14a8480022 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -290,7 +290,7 @@ pub trait StateStoreReadTransaction: Sized { end_epoch: Epoch, ) -> Result, StorageError>; - fn state_transitions_get_last_id(&self) -> Result; + fn state_transitions_get_last_id(&self, shard: Shard) -> Result; fn state_tree_nodes_get(&self, shard: Shard, key: &NodeKey) -> Result, StorageError>; fn state_tree_versions_get_latest(&self, shard: Shard) -> Result, StorageError>; diff --git a/integration_tests/tests/features/state_sync.feature b/integration_tests/tests/features/state_sync.feature index 63a00187e6..46492ff703 100644 --- a/integration_tests/tests/features/state_sync.feature +++ b/integration_tests/tests/features/state_sync.feature @@ -3,7 +3,7 @@ @state_sync Feature: State Sync - @serial @fixed + @serial @fixed @doit Scenario: New validator node registers and syncs # Initialize a base node, wallet, miner and VN Given a base node BASE