diff --git a/Cargo.lock b/Cargo.lock index d7affe69c..3bf791ae9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,7 +1566,7 @@ dependencies = [ "fern", "futures 0.3.30", "humantime 2.1.0", - "indexmap 2.2.6", + "indexmap 2.5.0", "itertools 0.11.0", "log", "rand", @@ -3262,7 +3262,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.2.6", + "indexmap 2.5.0", "slab", "tokio", "tokio-util 0.7.11", @@ -3898,9 +3898,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -3953,7 +3953,7 @@ dependencies = [ "config", "cucumber", "httpmock", - "indexmap 2.2.6", + "indexmap 2.5.0", "libp2p", "log", "log4rs", @@ -6481,7 +6481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.2.6", + "indexmap 2.5.0", ] [[package]] @@ -8055,7 +8055,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.5.0", "itoa", "ryu", "serde", @@ -8902,7 +8902,7 @@ version = "0.7.0" dependencies = [ "anyhow", "async-trait", - "indexmap 2.2.6", + "indexmap 2.5.0", "log", "serde", "tari_common", @@ -9095,7 +9095,7 @@ version = "0.7.0" dependencies = [ "blake2", "ethnum", - "indexmap 2.2.6", + "indexmap 2.5.0", "libp2p-identity", "newtype-ops", "prost 0.12.6", @@ -9123,7 +9123,7 @@ dependencies = [ "cargo_toml 0.11.8", "d3ne", "env_logger 0.10.2", - "indexmap 2.2.6", + "indexmap 2.5.0", "log", "rand", "semver", @@ -9174,7 +9174,7 @@ version = "0.7.0" dependencies = [ "anyhow", "chrono", - "indexmap 2.2.6", + "indexmap 2.5.0", "log", "rand", "serde", @@ -9381,7 +9381,7 @@ dependencies = [ "blake2", "digest", "hex", - "indexmap 2.2.6", + "indexmap 2.5.0", "lazy_static", "rand", "serde", @@ -9806,7 +9806,7 @@ dependencies = [ "diesel", "diesel_migrations", "hex", - "indexmap 2.2.6", + "indexmap 2.5.0", "log", "rand", "serde", @@ -9827,7 +9827,7 @@ name = "tari_state_tree" version = "0.7.0" dependencies = [ "hex", - "indexmap 2.2.6", + "indexmap 2.5.0", "itertools 0.11.0", "log", "serde", @@ -9986,7 +9986,7 @@ dependencies = [ name = "tari_transaction" version = "0.7.0" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.5.0", "rand", "serde", "tari_common_types", @@ -10041,7 +10041,7 @@ dependencies = [ "config", "futures 0.3.30", "include_dir", - "indexmap 2.2.6", + "indexmap 2.5.0", "json5", "libp2p", "libsqlite3-sys", @@ -10573,7 +10573,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", @@ -10586,7 +10586,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.5.0", "toml_datetime", "winnow 0.5.40", ] @@ -10597,7 +10597,7 @@ version = "0.22.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", @@ -10817,7 +10817,7 @@ dependencies = [ "bincode 2.0.0-rc.3", "bytes 1.6.1", "clap 4.5.10", - "indexmap 2.2.6", + "indexmap 2.5.0", "once_cell", "rand", "rayon", @@ -10916,7 +10916,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6" dependencies = [ "chrono", - "indexmap 2.2.6", + "indexmap 2.5.0", "thiserror", "ts-rs-macros", ] diff --git a/Cargo.toml b/Cargo.toml index 91671533c..cc5262886 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,7 +181,7 @@ httpmock = "0.6.8" humantime = "2.1.0" humantime-serde = "1.1.1" include_dir = "0.7.2" -indexmap = "2.2.6" +indexmap = "2.5.0" indoc = "1.0.6" itertools = "0.11.0" lazy_static = "1.4.0" diff --git a/dan_layer/common_types/src/substate_address.rs b/dan_layer/common_types/src/substate_address.rs index ccaf2035d..650f42b4b 100644 --- a/dan_layer/common_types/src/substate_address.rs +++ b/dan_layer/common_types/src/substate_address.rs @@ -117,6 +117,11 @@ impl SubstateAddress { &self.0[..ObjectKey::LENGTH] } + pub fn to_object_key(&self) -> ObjectKey { + ObjectKey::try_from(self.object_key_bytes()) + .expect("SubstateAddress: object_key_bytes must return valid ObjectKey bytes") + } + pub fn to_version(&self) -> u32 { let mut buf = [0u8; size_of::()]; buf.copy_from_slice(&self.0[ObjectKey::LENGTH..]); diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 37c02bbb2..4039544fa 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -82,6 +82,7 @@ impl ProposedBlockChangeSet { } pub fn no_vote(&mut self) -> &mut Self { + // TODO: store a NO-VOTE reason for debugging purposes so that we can review it easily // This means no vote self.quorum_decision = None; // The remaining info discarded (not strictly necessary) @@ -198,7 +199,7 @@ impl ProposedBlockChangeSet { ) -> Result, TransactionPoolError> { self.transaction_changes .get(transaction_id) - .and_then(|change| change.next_update.as_ref().map(|u| &u.transaction)) + .and_then(|change| change.next_update.as_ref().map(|u| u.transaction())) .cloned() .map(Ok) .or_else(|| { @@ -218,7 +219,8 @@ impl ProposedBlockChangeSet { .entry(*transaction.transaction_id()) .or_default(); - change_mut.next_update = Some(TransactionPoolStatusUpdate { transaction }); + let ready_now = transaction.is_ready_for_next_stage(); + change_mut.next_update = Some(TransactionPoolStatusUpdate::new(transaction, ready_now)); Ok(self) } } diff --git a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs index 2be32896c..d3bdfce5b 100644 --- a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs +++ b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs @@ -12,7 +12,6 @@ use tari_dan_storage::{ ForeignProposal, LeafBlock, LockedBlock, - ShardGroupEvidence, TransactionAtom, TransactionPoolRecord, TransactionPoolStage, @@ -121,30 +120,15 @@ pub fn process_foreign_block( proposed_block_change_set.add_transaction_execution(exec)?; } - // We need to add the justify QC to the evidence because the all prepare block could not include it - // yet - let foreign_evidence = atom - .evidence - .get(&foreign_committee_info.shard_group()) - .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: format!( - "Foreign proposal did not contain evidence for {}", - foreign_committee_info.shard_group() - ), - })?; - // Update the transaction record with any new information provided by this foreign block tx_rec .evidence_mut() - .update(foreign_committee_info.shard_group(), foreign_evidence) + .update(&atom.evidence) .add_prepare_qc_evidence(foreign_committee_info, *justify_qc.id()); tx_rec.set_remote_decision(remote_decision); validate_and_add_pledges( &tx_rec, - foreign_evidence, block.id(), atom, &mut block_pledge, @@ -279,30 +263,15 @@ pub fn process_foreign_block( proposed_block_change_set.add_transaction_execution(exec)?; } - // We need to add the justify QC to the evidence because the all prepare block could not include it - // yet - let foreign_evidence = atom - .evidence - .get(&foreign_committee_info.shard_group()) - .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: format!( - "Foreign proposal did not contain evidence for {}", - foreign_committee_info.shard_group() - ), - })?; - // Update the transaction record with any new information provided by this foreign block tx_rec .evidence_mut() - .update(foreign_committee_info.shard_group(), foreign_evidence) + .update(&atom.evidence) .add_accept_qc_evidence(foreign_committee_info, *justify_qc.id()); tx_rec.set_remote_decision(remote_decision); validate_and_add_pledges( &tx_rec, - foreign_evidence, block.id(), atom, &mut block_pledge, @@ -312,11 +281,11 @@ pub fn process_foreign_block( )?; // Good debug info - // tx_rec.evidence().iter().for_each(|(addr, ev)| { - // let includes_local = local_committee_info.includes_substate_address(addr); + // tx_rec.evidence().iter().for_each(|(sg, ev)| { + // let is_local = local_committee_info.shard_group() == *sg; // log::error!( // target: LOG_TARGET, - // "🐞 LOCALACCEPT EVIDENCE (l={}, f={}) {}: {}", includes_local, !includes_local, addr, ev + // "🐞 LOCALACCEPT EVIDENCE (l={}, f={}) {}: {}", is_local, !is_local, sg, ev // ); // }); @@ -354,6 +323,17 @@ pub fn process_foreign_block( } proposed_block_change_set.set_next_transaction_update(tx_rec)?; } + } else if tx_rec.current_stage().is_local_prepared() && tx_rec.is_ready_for_next_stage() { + info!( + target: LOG_TARGET, + "🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_PREPARED({}, {}) Local Stage: {}", + tx_rec.transaction_id(), + tx_rec.current_decision(), + tx_rec.current_stage() + ); + + tx_rec.set_next_stage(TransactionPoolStage::LocalPrepared)?; + proposed_block_change_set.set_next_transaction_update(tx_rec)?; } else if tx_rec.current_stage().is_local_accepted() && tx_rec.is_ready_for_next_stage() { info!( target: LOG_TARGET, @@ -364,7 +344,6 @@ pub fn process_foreign_block( ); tx_rec.set_next_stage(TransactionPoolStage::LocalAccepted)?; - proposed_block_change_set.set_next_transaction_update(tx_rec)?; } else { info!( @@ -376,7 +355,6 @@ pub fn process_foreign_block( tx_rec.evidence().all_addresses_accepted() ); // Still need to update the evidence - proposed_block_change_set.set_next_transaction_update(tx_rec)?; } }, @@ -423,7 +401,6 @@ pub fn process_foreign_block( fn validate_and_add_pledges( transaction: &TransactionPoolRecord, - evidence: &ShardGroupEvidence, foreign_block_id: &BlockId, atom: &TransactionAtom, block_pledge: &mut BlockPledge, @@ -431,6 +408,20 @@ fn validate_and_add_pledges( proposed_block_change_set: &mut ProposedBlockChangeSet, is_prepare_phase: bool, ) -> Result<(), HotStuffError> { + // We need to add the justify QC to the evidence because the prepare block should not include it + // yet + let evidence = atom + .evidence + .get(&foreign_committee_info.shard_group()) + .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { + block_id: *foreign_block_id, + transaction_id: atom.id, + details: format!( + "Foreign proposal did not contain evidence for {}", + foreign_committee_info.shard_group() + ), + })?; + #[allow(clippy::mutable_key_type)] match atom.decision { Decision::Commit => { diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 41fef3bba..f2b5187a1 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -148,8 +148,9 @@ pub struct NeedsSync { fn msg_epoch_and_height(msg: &HotstuffMessage) -> Option { match msg { HotstuffMessage::Proposal(msg) => Some((msg.block.epoch(), msg.block.height())), - // Votes for block 2, occur at current height 3 + // Votes for block 2 occur in view 3 HotstuffMessage::Vote(msg) => Some((msg.epoch, msg.block_height.saturating_add(NodeHeight(1)))), + HotstuffMessage::NewView(msg) => Some((msg.high_qc.epoch(), msg.new_height)), _ => None, } } diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 95719da1e..80ff69f5b 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -374,16 +374,6 @@ where TConsensusSpec: ConsensusSpec high_qc.qc_id ); - // if !pool_tx.is_ready() { - // if pool_tx.current_stage().is_local_prepared() && pool_tx.can_continue() { - // pool_tx.set_next_stage(TransactionPoolStage::LocalPrepared, true)?; - // } else if pool_tx.current_stage().is_local_accepted() && pool_tx.can_continue() { - // pool_tx.set_next_stage(TransactionPoolStage::LocalAccepted, true)?; - // } else { - // // Nothing - // } - // } - change_set.set_next_transaction_update(pool_tx)?; } @@ -535,7 +525,7 @@ where TConsensusSpec: ConsensusSpec commands.insert(command); } } - timer.finish(); + timer.done(); // This relies on the UTXO commands being ordered last for utxo in burnt_utxos { @@ -568,7 +558,7 @@ where TConsensusSpec: ConsensusSpec pending_tree_diffs, substate_store.diff(), )?; - timer.finish(); + timer.done(); let non_local_shards = get_non_local_shards(substate_store.diff(), local_committee_info); @@ -677,21 +667,7 @@ where TConsensusSpec: ConsensusSpec tx_rec.transaction_id(), )) })?; - for up in diff.up_iter() { - log::error!( - target: LOG_TARGET, - "PROPOSE UP: {}v{}", - up.0, up.1.version() - ); - } - for down in diff.down_iter() { - log::error!( - target: LOG_TARGET, - "PROPOSE DOWN: {}v{}", - down.0, down.1 - ); - } if let Err(err) = substate_store.put_diff(*tx_rec.transaction_id(), diff) { error!( target: LOG_TARGET, diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index f8444a568..10c386ba6 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -606,21 +606,6 @@ where TConsensusSpec: ConsensusSpec if tx_rec.current_decision().is_commit() { if let Some(diff) = execution.result().finalize.accept() { - for up in diff.up_iter() { - log::error!( - target: LOG_TARGET, - "VOTE UP: {}v{}", - up.0, up.1.version() - ); - } - - for down in diff.down_iter() { - log::error!( - target: LOG_TARGET, - "VOTE DOWN: {}v{}", - down.0, down.1 - ); - } if let Err(err) = substate_store.put_diff(atom.id, diff) { warn!( target: LOG_TARGET, @@ -1262,8 +1247,7 @@ where TConsensusSpec: ConsensusSpec if !tx_rec.current_stage().is_local_accepted() { warn!( target: LOG_TARGET, - "{} ❌ NO VOTE: AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}", - self.local_validator_pk, + "❌ NO VOTE: AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}", block, tx_rec.transaction_id(), tx_rec.current_stage() diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index d86ad904a..7b935533c 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -303,9 +303,12 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler (TransactionRecord, Vec, Vec) { let (transaction, inputs) = self.build_transaction(decision, num_inputs); - let new_outputs = (0..num_outputs) + let new_outputs = (0..num_new_outputs) .map(|i| build_substate_id_for_committee(i as u32 % self.num_committees, self.num_committees)) .collect::>(); diff --git a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs index eaab534a1..24893274b 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs @@ -51,7 +51,7 @@ impl TransactionPoolRecord { let mut transaction_fee = self.transaction_fee; if let Some(update) = update { - evidence.union(deserialize_json(&update.evidence)?); + evidence.merge(deserialize_json(&update.evidence)?); is_ready = update.is_ready; pending_stage = Some(parse_from_string(&update.stage)?); local_decision = Some(update.local_decision); diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 055ab95dc..03fd2bc19 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1019,12 +1019,12 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta source: e, })?; - // Set is_ready to the last value we set here. Bit of a hack to get has_uncommitted_transactions to return a - // more accurate value without querying the updates table + // Set is_ready and pending_stage to the updated values. This allows has_uncommitted_transactions to return an + // accurate value without querying records in the updates table. diesel::update(transaction_pool::table) .filter(transaction_pool::transaction_id.eq(&transaction_id)) .set(( - transaction_pool::is_ready.eq(update.is_ready()), + transaction_pool::is_ready.eq(update.is_ready_now()), transaction_pool::pending_stage.eq(update.stage().to_string()), )) .execute(self.connection()) diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 2903e8a45..0c73bf891 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -98,11 +98,11 @@ mod confirm_all_transitions { tx_2.set_next_stage(TransactionPoolStage::Prepared).unwrap(); tx_3.set_next_stage(TransactionPoolStage::Prepared).unwrap(); - tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate { transaction: tx_1 }) + tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate::new(tx_1, true)) .unwrap(); - tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate { transaction: tx_2 }) + tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate::new(tx_2, true)) .unwrap(); - tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate { transaction: tx_3 }) + tx.transaction_pool_add_pending_update(&block_id, &TransactionPoolStatusUpdate::new(tx_3, true)) .unwrap(); let rec = tx diff --git a/dan_layer/storage/src/consensus_models/evidence.rs b/dan_layer/storage/src/consensus_models/evidence.rs index 68cbc593c..ce472ed2a 100644 --- a/dan_layer/storage/src/consensus_models/evidence.rs +++ b/dan_layer/storage/src/consensus_models/evidence.rs @@ -3,7 +3,7 @@ use std::fmt::{Display, Formatter}; -use indexmap::{map::Entry, IndexMap}; +use indexmap::IndexMap; use log::*; use serde::{Deserialize, Serialize}; use tari_dan_common_types::{ @@ -27,6 +27,7 @@ const LOG_TARGET: &str = "tari::dan::consensus_models::evidence"; ts(export, export_to = "../../bindings/src/types/") )] pub struct Evidence { + // Serialize JSON as an array of objects since ShardGroup is a non-string key #[serde(with = "serde_with::vec")] evidence: IndexMap, } @@ -113,10 +114,10 @@ impl Evidence { } pub fn add_prepare_qc_evidence(&mut self, committee_info: &CommitteeInfo, qc_id: QcId) -> &mut Self { - for (shard_group, evidence_mut) in self.evidence_in_committee_iter_mut(committee_info) { + for evidence_mut in self.evidence_in_committee_iter_mut(committee_info) { debug!( target: LOG_TARGET, - "add_prepare_qc_evidence {} {shard_group} QC[{qc_id}]", + "add_prepare_qc_evidence {} QC[{qc_id}]", committee_info.shard_group(), ); evidence_mut.prepare_qc = Some(qc_id); @@ -126,10 +127,10 @@ impl Evidence { } pub fn add_accept_qc_evidence(&mut self, committee_info: &CommitteeInfo, qc_id: QcId) -> &mut Self { - for (address, evidence_mut) in self.evidence_in_committee_iter_mut(committee_info) { + for evidence_mut in self.evidence_in_committee_iter_mut(committee_info) { debug!( target: LOG_TARGET, - "add_accept_qc_evidence {} {address} QC[{qc_id}]", + "add_accept_qc_evidence {} QC[{qc_id}]", committee_info.shard_group(), ); evidence_mut.accept_qc = Some(qc_id); @@ -141,10 +142,11 @@ impl Evidence { fn evidence_in_committee_iter_mut<'a>( &'a mut self, committee_info: &'a CommitteeInfo, - ) -> impl Iterator { + ) -> impl Iterator { self.evidence .iter_mut() .filter(|(sg, _)| committee_info.shard_group() == **sg) + .map(|(_, e)| e) } /// Returns an iterator over the substate addresses in this Evidence object. @@ -171,27 +173,32 @@ impl Evidence { self } - /// Add or update substate addresses and locks into Evidence - pub fn update(&mut self, shard_group: ShardGroup, evidence: &ShardGroupEvidence) -> &mut Self { - match self.evidence.entry(shard_group) { - Entry::Occupied(mut entry) => { - entry.get_mut().substates = evidence.substates.clone(); - }, - Entry::Vacant(entry) => { - entry.insert(ShardGroupEvidence { - substates: evidence.substates.clone(), - prepare_qc: None, - accept_qc: None, - }); - self.evidence.sort_keys(); - }, + /// Add or update shard groups, substates and locks into Evidence. Existing prepare/accept QC IDs are not changed. + pub fn update(&mut self, other: &Evidence) -> &mut Self { + for (sg, evidence) in other.iter() { + let evidence_mut = self.evidence.entry(*sg).or_default(); + evidence_mut + .substates + .extend(evidence.substates.iter().map(|(addr, lock)| (*addr, *lock))); + evidence_mut.sort_substates(); } + self.evidence.sort_keys(); self } /// Merges the other Evidence into this Evidence. - pub fn union(&mut self, other: Evidence) -> &mut Self { - self.evidence.extend(other.evidence); + pub fn merge(&mut self, other: Evidence) -> &mut Self { + for (sg, evidence) in other.evidence { + let evidence_mut = self.evidence.entry(sg).or_default(); + evidence_mut.substates.extend(evidence.substates); + evidence_mut.sort_substates(); + if let Some(qc_id) = evidence.prepare_qc { + evidence_mut.prepare_qc = Some(qc_id); + } + if let Some(qc_id) = evidence.accept_qc { + evidence_mut.accept_qc = Some(qc_id); + } + } self.evidence.sort_keys(); self } @@ -218,47 +225,6 @@ impl Display for Evidence { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[cfg_attr( - feature = "ts", - derive(ts_rs::TS), - ts(export, export_to = "../../bindings/src/types/") -)] -pub struct ShardEvidence { - #[cfg_attr(feature = "ts", ts(type = "string | null"))] - pub prepare_justify: Option, - #[cfg_attr(feature = "ts", ts(type = "string | null"))] - pub accept_justify: Option, - pub lock: SubstateLockType, -} - -impl ShardEvidence { - pub fn is_prepare_justified(&self) -> bool { - self.prepare_justify.is_some() - } - - pub fn is_accept_justified(&self) -> bool { - self.accept_justify.is_some() - } -} - -impl Display for ShardEvidence { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "({}, ", self.lock)?; - if let Some(qc_id) = self.prepare_justify { - write!(f, "Prepare[{}]", qc_id)?; - } else { - write!(f, "Prepare[NONE]")?; - } - if let Some(qc_id) = self.accept_justify { - write!(f, ", Accept[{}]", qc_id)?; - } else { - write!(f, ", Accept[NONE]")?; - } - write!(f, ")") - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] #[cfg_attr( feature = "ts", @@ -286,6 +252,10 @@ impl ShardGroupEvidence { &self.substates } + pub fn sort_substates(&mut self) { + self.substates.sort_keys(); + } + pub fn contains(&self, substate_address: &SubstateAddress) -> bool { self.substates.contains_key(substate_address) } @@ -325,31 +295,47 @@ mod tests { #[test] fn it_merges_two_evidences_together() { + let sg1 = ShardGroup::new(0, 1); + let sg2 = ShardGroup::new(2, 3); + let sg3 = ShardGroup::new(4, 5); + let mut evidence1 = Evidence::empty(); - evidence1.update(vec![ - (seed_substate_address(1), SubstateLockType::Write), - (seed_substate_address(2), SubstateLockType::Read), - ]); + evidence1.add_shard_group_evidence(sg1, seed_substate_address(1), SubstateLockType::Write); + evidence1.add_shard_group_evidence(sg1, seed_substate_address(2), SubstateLockType::Read); let mut evidence2 = Evidence::empty(); - evidence2.update(vec![ - (seed_substate_address(2), SubstateLockType::Output), - (seed_substate_address(3), SubstateLockType::Output), - ]); + evidence2.add_shard_group_evidence(sg1, seed_substate_address(2), SubstateLockType::Output); + evidence2.add_shard_group_evidence(sg2, seed_substate_address(3), SubstateLockType::Output); + evidence2.add_shard_group_evidence(sg3, seed_substate_address(4), SubstateLockType::Output); - evidence1.union(evidence2); + evidence1.merge(evidence2); assert_eq!(evidence1.len(), 3); assert_eq!( - evidence1.get(&seed_substate_address(1)).unwrap().lock, + *evidence1 + .get(&sg1) + .unwrap() + .substates + .get(&seed_substate_address(1)) + .unwrap(), SubstateLockType::Write ); assert_eq!( - evidence1.get(&seed_substate_address(2)).unwrap().lock, + *evidence1 + .get(&sg1) + .unwrap() + .substates + .get(&seed_substate_address(2)) + .unwrap(), SubstateLockType::Output ); assert_eq!( - evidence1.get(&seed_substate_address(3)).unwrap().lock, + *evidence1 + .get(&sg1) + .unwrap() + .substates + .get(&seed_substate_address(2)) + .unwrap(), SubstateLockType::Output ); } diff --git a/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs b/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs index fe5a198d7..02d6ae223 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs @@ -11,9 +11,18 @@ use crate::{ #[derive(Debug, Clone)] pub struct TransactionPoolStatusUpdate { pub transaction: TransactionPoolRecord, + ready_now: bool, } impl TransactionPoolStatusUpdate { + pub fn new(transaction: TransactionPoolRecord, ready_now: bool) -> Self { + Self { transaction, ready_now } + } + + pub fn transaction(&self) -> &TransactionPoolRecord { + &self.transaction + } + pub fn transaction_id(&self) -> &TransactionId { self.transaction.transaction_id() } @@ -30,6 +39,10 @@ impl TransactionPoolStatusUpdate { self.transaction.is_ready() } + pub fn is_ready_now(&self) -> bool { + self.ready_now + } + pub fn decision(&self) -> Decision { self.transaction.current_decision() }