Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 30, 2024
1 parent f70287a commit 9059ee2
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 95 deletions.
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/substate_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
if missing_substate_ids.contains(requested_input.substate_id()) {
// TODO/NOTE: This assumes that consensus is up to date (i.e. doesnt need to sync, or catch up). We need
// to check the if the substate is in our shard range. The best action then may be to
// let consensus handle it (deferred) which is what happens currently anyway.
// let consensus handle it which is what happens currently anyway.
missing_substates.insert(requested_input);
// Not a local substate, so we will need to fetch it remotely
continue;
Expand Down
5 changes: 3 additions & 2 deletions dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tari_dan_storage::{
TransactionPoolRecord,
TransactionPoolStage,
TransactionPoolStatusUpdate,
VersionedStateHashTreeDiff,
},
StateStoreReadTransaction,
StateStoreWriteTransaction,
Expand All @@ -42,7 +43,7 @@ pub struct ProposedBlockChangeSet {
block: LeafBlock,
quorum_decision: Option<QuorumDecision>,
block_diff: Vec<SubstateChange>,
state_tree_diffs: IndexMap<Shard, StateHashTreeDiff>,
state_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
substate_locks: IndexMap<SubstateId, Vec<LockedSubstate>>,
transaction_changes: IndexMap<TransactionId, TransactionChangeSet>,
}
Expand All @@ -66,7 +67,7 @@ impl ProposedBlockChangeSet {
self
}

pub fn set_state_tree_diffs(&mut self, diffs: IndexMap<Shard, StateHashTreeDiff>) -> &mut Self {
pub fn set_state_tree_diffs(&mut self, diffs: IndexMap<Shard, VersionedStateHashTreeDiff>) -> &mut Self {
self.state_tree_diffs = diffs;
self
}
Expand Down
15 changes: 11 additions & 4 deletions dan_layer/consensus/src/hotstuff/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup};
use tari_dan_storage::{
consensus_models::{Block, LeafBlock, PendingStateTreeDiff, QuorumCertificate, SubstateChange},
consensus_models::{
Block,
LeafBlock,
PendingStateTreeDiff,
QuorumCertificate,
SubstateChange,
VersionedStateHashTreeDiff,
},
StateStoreReadTransaction,
};
use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError};
Expand Down Expand Up @@ -165,9 +172,9 @@ fn with_dummy_blocks<TAddr, TLeaderStrategy, F>(
pub fn calculate_state_merkle_root<TTx: StateStoreReadTransaction>(
tx: &TTx,
local_shard_group: ShardGroup,
pending_tree_diffs: HashMap<Shard, PendingStateTreeDiff>,
pending_tree_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>,
changes: &[SubstateChange],
) -> Result<(Hash, IndexMap<Shard, StateHashTreeDiff>), StateTreeError> {
) -> Result<(Hash, IndexMap<Shard, VersionedStateHashTreeDiff>), StateTreeError> {
let mut change_map = IndexMap::with_capacity(changes.len());

changes
Expand All @@ -179,5 +186,5 @@ pub fn calculate_state_merkle_root<TTx: StateStoreReadTransaction>(

let mut sharded_tree = ShardedStateTree::new(tx).with_pending_diffs(pending_tree_diffs);
let state_root = sharded_tree.put_substate_tree_changes(change_map)?;
Ok((state_root, sharded_tree.into_tree_diffs()))
Ok((state_root, sharded_tree.into_versioned_tree_diffs()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ where TConsensusSpec: ConsensusSpec
Command::LocalOnly(t) => {
info!(
target: LOG_TARGET,
"👨‍🔧 LOCAL-ONLY: Executing deferred transaction {} in block {}",
"👨‍🔧 LOCAL-ONLY: Executing transaction {} in block {}",
tx_rec.transaction_id(),
block,
);
Expand Down Expand Up @@ -433,7 +433,7 @@ where TConsensusSpec: ConsensusSpec
Command::Prepare(t) => {
info!(
target: LOG_TARGET,
"👨‍🔧 PREPARE: Executing deferred transaction {} in block {}",
"👨‍🔧 PREPARE: Executing transaction {} in block {}",
tx_rec.transaction_id(),
block,
);
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus/src/hotstuff/pacemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ impl PaceMaker {
let delta = self.delta_time();
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
info!(target: LOG_TARGET, "⚠️ Leader timeout! Current view: {}, Delta: {:.2?}", self.current_view, delta);
self.current_view.set_next_height();
on_leader_timeout.leader_timed_out(self.current_view.get_height());
// self.current_view.set_next_height();
// on_leader_timeout.leader_timed_out(self.current_view.get_height());
},

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use std::collections::HashMap;
use indexmap::IndexMap;
use log::debug;
use tari_dan_common_types::shard::Shard;
use tari_dan_storage::{consensus_models::PendingStateTreeDiff, StateStoreReadTransaction, StateStoreWriteTransaction};
use tari_dan_storage::{
consensus_models::{PendingStateTreeDiff, VersionedStateHashTreeDiff},
StateStoreReadTransaction,
StateStoreWriteTransaction,
};
use tari_state_tree::{
Hash,
JmtStorageError,
Expand All @@ -26,7 +30,7 @@ const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree";
pub struct ShardedStateTree<TTx> {
tx: TTx,
pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>,
current_tree_diffs: IndexMap<Shard, StateHashTreeDiff>,
current_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
}

impl<TTx> ShardedStateTree<TTx> {
Expand Down Expand Up @@ -61,7 +65,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
Ok(maybe_version)
}

pub fn into_tree_diffs(self) -> IndexMap<Shard, StateHashTreeDiff> {
pub fn into_versioned_tree_diffs(self) -> IndexMap<Shard, VersionedStateHashTreeDiff> {
self.current_tree_diffs
}

Expand All @@ -72,23 +76,27 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
let mut state_roots = Vec::with_capacity(changes.len());
for (shard, changes) in changes {
let current_version = self.get_current_version(shard)?;
let next_version = current_version.map(|v| v + 1).unwrap_or(0);
let next_version = current_version.unwrap_or(0) + 1;

// Read only state store that is scoped to the shard
let scoped_store = ShardScopedTreeStoreReader::new(self.tx, shard);
// Staged store that tracks changes to the state tree
let mut store = StagedTreeStore::new(&scoped_store);
// Apply pending (not yet committed) diffs to the staged store
if let Some(diff) = self.pending_diffs.remove(&shard) {
store.apply_pending_diff(diff.diff);
if let Some(diffs) = self.pending_diffs.remove(&shard) {
debug!(target: LOG_TARGET, "Applying {num_diffs} pending diff(s) to shard {shard} (version={version})", num_diffs = diffs.len(), version = diffs.last().map(|d| d.version).unwrap_or(0));
for diff in diffs {
store.apply_pending_diff(diff.diff);
}
}

// Apply state updates to the state tree that is backed by the staged shard-scoped store
let mut state_tree = SpreadPrefixStateTree::new(&mut store);
debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len());
let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?;
state_roots.push(state_root);
self.current_tree_diffs.insert(shard, store.into_diff());
self.current_tree_diffs
.insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff()));
}

// TODO:
Expand All @@ -100,25 +108,27 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
}

impl<TTx: StateStoreWriteTransaction> ShardedStateTree<&mut TTx> {
pub fn commit_diff(&mut self, diffs: IndexMap<Shard, PendingStateTreeDiff>) -> Result<(), StateTreeError> {
for (shard, pending_diff) in diffs {
let diff = pending_diff.diff;
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
log::debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}

for (key, node) in diff.new_nodes {
log::debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
pub fn commit_diff(&mut self, diffs: IndexMap<Shard, Vec<PendingStateTreeDiff>>) -> Result<(), StateTreeError> {
for (shard, pending_diffs) in diffs {
for pending_diff in pending_diffs {
let diff = pending_diff.diff;
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);

for stale_tree_node in diff.stale_tree_nodes {
debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}

for (key, node) in diff.new_nodes {
debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}

store.increment_version()?;
}

store.increment_version()?;
}

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ async fn node_requests_missing_transaction_from_local_leader() {
async fn multi_validator_propose_blocks_with_new_transactions_until_all_committed() {
setup_logger();
let mut test = Test::builder()
.disable_timeout()
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1"])//, "2", "3", "4", "5"])
.add_committee(0, vec!["1", "2", "3", "4", "5"])
.start()
.await;
let mut remaining_txs = 10u32;
Expand Down Expand Up @@ -761,7 +762,7 @@ async fn single_shard_unversioned_inputs() {
let mut test = Test::builder().add_committee(0, vec!["1", "2"]).start().await;
// First get transaction in the mempool
let inputs = test.create_substates_on_all_vns(1);
// Remove versions from inputs to allow deferred transactions
// Remove versions from inputs to test substate version resolution
let unversioned_inputs = inputs
.iter()
.map(|i| SubstateRequirement::new(i.substate_id.clone(), None));
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/support/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tari_dan_storage::global::models::ValidatorNode;
use tari_epoch_manager::{EpochManagerError, EpochManagerEvent, EpochManagerReader};
use tokio::sync::{broadcast, Mutex, MutexGuard};

use crate::support::{address::TestAddress, helpers::random_substate_in_shard, TEST_NUM_PRESHARDS};
use crate::support::{address::TestAddress, helpers::random_substate_in_shard_group, TEST_NUM_PRESHARDS};

#[derive(Debug, Clone)]
pub struct TestEpochManager {
Expand Down Expand Up @@ -84,7 +84,7 @@ impl TestEpochManager {
let mut state = self.state_lock().await;
for (shard_group, committee) in committees {
for (address, pk) in &committee.members {
let substate_address = random_substate_in_shard(shard_group, TEST_NUM_PRESHARDS);
let substate_address = random_substate_in_shard_group(shard_group, TEST_NUM_PRESHARDS);
state.validator_shards.insert(
address.clone(),
(
Expand Down
5 changes: 3 additions & 2 deletions dan_layer/consensus_tests/src/support/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use tari_transaction::VersionedSubstateId;

use crate::support::TestAddress;

pub(crate) fn random_substate_in_shard(shard: ShardGroup, num_shards: NumPreshards) -> VersionedSubstateId {
let range = shard.to_substate_address_range(num_shards);
pub(crate) fn random_substate_in_shard_group(shard_group: ShardGroup, num_shards: NumPreshards) -> VersionedSubstateId {
let range = shard_group.to_substate_address_range(num_shards);
let size = range.end().to_u256() - range.start().to_u256();
let a = OsRng.gen_range(*range.start().array()..*range.end().array());
let middlish = range.start().to_u256() + size / 2;
let entity_id = EntityId::new(copy_fixed(&middlish.to_be_bytes()[0..EntityId::LENGTH]));
let rand_bytes = OsRng.gen::<[u8; ComponentKey::LENGTH]>();
Expand Down
8 changes: 4 additions & 4 deletions dan_layer/consensus_tests/src/support/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tari_engine_types::{
};
use tari_transaction::{Transaction, TransactionId, VersionedSubstateId};

use crate::support::{committee_number_to_shard_group, helpers::random_substate_in_shard, TEST_NUM_PRESHARDS};
use crate::support::{committee_number_to_shard_group, helpers::random_substate_in_shard_group, TEST_NUM_PRESHARDS};

pub fn build_transaction_from(
tx: Transaction,
Expand Down Expand Up @@ -111,10 +111,10 @@ pub fn build_transaction(
// We create these outputs so that the test VNs dont have to have any UP substates
// Equal potion of shards to each committee
let outputs = (0..num_committees)
.flat_map(|shard| {
.flat_map(|group_no| {
iter::repeat_with(move || {
random_substate_in_shard(
committee_number_to_shard_group(TEST_NUM_PRESHARDS, shard, num_committees),
random_substate_in_shard_group(
committee_number_to_shard_group(TEST_NUM_PRESHARDS, group_no, num_committees),
TEST_NUM_PRESHARDS,
)
})
Expand Down
44 changes: 16 additions & 28 deletions dan_layer/state_store_sqlite/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tari_dan_storage::{
TransactionPoolStage,
TransactionPoolStatusUpdate,
TransactionRecord,
VersionedStateHashTreeDiff,
Vote,
},
StateStoreReadTransaction,
Expand Down Expand Up @@ -1451,11 +1452,12 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
fn pending_state_tree_diffs_remove_by_block(
&mut self,
block_id: &BlockId,
) -> Result<IndexMap<Shard, PendingStateTreeDiff>, StorageError> {
) -> Result<IndexMap<Shard, Vec<PendingStateTreeDiff>>, StorageError> {
use crate::schema::pending_state_tree_diffs;

let diff_recs = pending_state_tree_diffs::table
.filter(pending_state_tree_diffs::block_id.eq(serialize_hex(block_id)))
.order_by(pending_state_tree_diffs::block_height.asc())
.get_results::<sql_models::PendingStateTreeDiff>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "pending_state_tree_diffs_remove_by_block",
Expand All @@ -1474,10 +1476,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
for diff in diff_recs {
let shard = Shard::from(diff.shard as u32);
let diff = PendingStateTreeDiff::try_from(diff)?;
diffs
.entry(shard)
.or_insert_with(PendingStateTreeDiff::default)
.merge(diff);
diffs.entry(shard).or_insert_with(Vec::new).push(diff);
}

Ok(diffs)
Expand All @@ -1487,28 +1486,10 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
&mut self,
block_id: BlockId,
shard: Shard,
diff: StateHashTreeDiff,
diff: VersionedStateHashTreeDiff,
) -> Result<(), StorageError> {
use crate::schema::{blocks, pending_state_tree_diffs};

let version = self.state_tree_versions_get_latest(shard)?;

// Fetch the number of pending between this block and the current commit block
let commit_block = self.get_commit_block_id()?;
let block_ids = self.get_block_ids_that_change_state_between(&commit_block, &block_id)?;

let num_pending = pending_state_tree_diffs::table
.select(count(pending_state_tree_diffs::id))
.filter(pending_state_tree_diffs::block_id.eq_any(block_ids))
.filter(pending_state_tree_diffs::shard.eq(shard.as_u32() as i32))
.first::<i64>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "pending_state_tree_diffs_insert (count)",
source: e,
})?;

let next_version = version.unwrap_or(0) + (num_pending as u64);

let insert = (
pending_state_tree_diffs::block_id.eq(serialize_hex(block_id)),
pending_state_tree_diffs::shard.eq(shard.as_u32() as i32),
Expand All @@ -1517,8 +1498,8 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
.filter(blocks::block_id.eq(serialize_hex(block_id)))
.single_value()
.assume_not_null()),
pending_state_tree_diffs::version.eq(next_version as i64),
pending_state_tree_diffs::diff_json.eq(serialize_json(&diff)?),
pending_state_tree_diffs::version.eq(diff.version as i64),
pending_state_tree_diffs::diff_json.eq(serialize_json(&diff.diff)?),
);

diesel::insert_into(pending_state_tree_diffs::table)
Expand Down Expand Up @@ -1566,7 +1547,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
use crate::schema::state_tree;

let key = node.as_node_key();
diesel::update(state_tree::table)
let num_effected = diesel::update(state_tree::table)
.filter(state_tree::shard.eq(shard.as_u32() as i32))
.filter(state_tree::key.eq(key.to_string()))
.set(state_tree::is_stale.eq(true))
Expand All @@ -1576,6 +1557,13 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta
source: e,
})?;

if num_effected == 0 {
return Err(StorageError::NotFound {
item: "state_tree_node".to_string(),
key: key.to_string(),
});
}

Ok(())
}

Expand All @@ -1584,7 +1572,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta

let values = (
state_tree_shard_versions::shard.eq(shard.as_u32() as i32),
state_tree_shard_versions::version.eq(0),
state_tree_shard_versions::version.eq(1),
);

diesel::insert_into(state_tree_shard_versions::table)
Expand Down
3 changes: 3 additions & 0 deletions dan_layer/state_tree/src/jellyfish/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,9 @@ pub enum JmtStorageError {

#[error("Unexpected error: {0}")]
UnexpectedError(String),

#[error("Attempted to insert node {0} that already exists")]
Conflict(NodeKey),
}

impl IsNotFoundError for JmtStorageError {
Expand Down
Loading

0 comments on commit 9059ee2

Please sign in to comment.