From e5b39a7f90e7fee2b71ddac2aeefae8bfa203efa Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 19 Aug 2024 18:28:23 +0400 Subject: [PATCH] feat(consensus): sequence transaction from foreign LocalPrepare/Accept --- .../src/process_manager/manager.rs | 2 +- dan_layer/common_types/src/committee.rs | 11 +- .../src/hotstuff/on_message_validate.rs | 123 ++++++++++---- .../consensus/src/hotstuff/on_propose.rs | 16 +- .../hotstuff/on_receive_foreign_proposal.rs | 150 +++++++++++------- .../hotstuff/transaction_manager/manager.rs | 26 +-- dan_layer/consensus/src/hotstuff/worker.rs | 79 +++++---- dan_layer/consensus/src/messages/proposal.rs | 18 ++- dan_layer/consensus_tests/src/consensus.rs | 29 ++-- .../src/support/executions_store.rs | 10 +- .../consensus_tests/src/support/harness.rs | 34 ++-- .../src/support/transaction.rs | 26 +-- .../up.sql | 46 ++++-- dan_layer/state_store_sqlite/src/schema.rs | 24 +++ .../src/sql_models/foreign_parked_block.rs | 34 ++++ .../state_store_sqlite/src/sql_models/mod.rs | 2 + dan_layer/state_store_sqlite/src/writer.rs | 113 ++++++++++++- .../storage/src/consensus_models/block.rs | 22 ++- .../src/consensus_models/block_pledges.rs | 10 +- .../foreign_parked_proposal.rs | 77 +++++++++ dan_layer/storage/src/consensus_models/mod.rs | 2 + .../src/consensus_models/transaction_pool.rs | 2 +- dan_layer/storage/src/state_store/mod.rs | 16 ++ 23 files changed, 647 insertions(+), 225 deletions(-) create mode 100644 dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs create mode 100644 dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs diff --git a/applications/tari_swarm_daemon/src/process_manager/manager.rs b/applications/tari_swarm_daemon/src/process_manager/manager.rs index f81463bbde..fc05ad78e8 100644 --- a/applications/tari_swarm_daemon/src/process_manager/manager.rs +++ b/applications/tari_swarm_daemon/src/process_manager/manager.rs @@ -228,7 +228,7 @@ impl ProcessManager { // inputs for a transaction. sleep(Duration::from_secs(2)).await; } - self.mine(10).await?; + self.mine(20).await?; Ok(()) } diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index 91255925dd..79dac7d11b 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -6,6 +6,7 @@ use std::{borrow::Borrow, cmp, ops::RangeInclusive}; use rand::{rngs::OsRng, seq::SliceRandom}; use serde::{Deserialize, Serialize}; use tari_common_types::types::PublicKey; +use tari_engine_types::substate::SubstateId; use crate::{shard::Shard, Epoch, NumPreshards, ShardGroup, SubstateAddress}; @@ -203,7 +204,7 @@ impl CommitteeInfo { (len - 1) / 3 } - pub fn num_shards(&self) -> NumPreshards { + pub fn num_preshards(&self) -> NumPreshards { self.num_shards } @@ -215,12 +216,18 @@ impl CommitteeInfo { self.shard_group.to_substate_address_range(self.num_shards) } - // TODO: change these to take in a SubstateId pub fn includes_substate_address(&self, substate_address: &SubstateAddress) -> bool { let s = substate_address.to_shard(self.num_shards); self.shard_group.contains(&s) } + pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool { + // version doesnt affect shard + let addr = SubstateAddress::from_substate_id(substate_id, 0); + let shard = addr.to_shard(self.num_shards); + self.shard_group.contains(&shard) + } + pub fn includes_all_substate_addresses, B: Borrow>( &self, substate_addresses: I, diff --git a/dan_layer/consensus/src/hotstuff/on_message_validate.rs b/dan_layer/consensus/src/hotstuff/on_message_validate.rs index 8901dca557..920688cc81 100644 --- a/dan_layer/consensus/src/hotstuff/on_message_validate.rs +++ b/dan_layer/consensus/src/hotstuff/on_message_validate.rs @@ -5,13 +5,12 @@ use std::collections::HashSet; use log::*; use tari_common_types::types::PublicKey; -use tari_dan_common_types::{Epoch, NodeHeight}; +use tari_dan_common_types::{committee::CommitteeInfo, Epoch, NodeHeight}; use tari_dan_storage::{ - consensus_models::{Block, BlockId, TransactionRecord}, + consensus_models::{Block, BlockId, ForeignParkedProposal, TransactionRecord}, StateStore, StateStoreWriteTransaction, }; -use tari_epoch_manager::EpochManagerReader; use tari_transaction::TransactionId; use tokio::sync::broadcast; @@ -19,7 +18,7 @@ use super::config::HotstuffConfig; use crate::{ block_validations, hotstuff::{error::HotStuffError, HotstuffEvent}, - messages::{HotstuffMessage, MissingTransactionsRequest, ProposalMessage}, + messages::{ForeignProposalMessage, HotstuffMessage, MissingTransactionsRequest, ProposalMessage}, traits::{ConsensusSpec, OutboundMessaging}, }; @@ -64,23 +63,15 @@ impl OnMessageValidate { pub async fn handle( &mut self, current_height: NodeHeight, + local_committee_info: &CommitteeInfo, from: TConsensusSpec::Addr, msg: HotstuffMessage, ) -> Result, HotStuffError> { match msg { HotstuffMessage::Proposal(msg) => self.process_local_proposal(current_height, from, msg).await, HotstuffMessage::ForeignProposal(proposal) => { - if let Err(err) = self.check_proposal(&proposal.block).await { - return Ok(MessageValidationResult::Invalid { - from, - message: HotstuffMessage::ForeignProposal(proposal), - err, - }); - } - Ok(MessageValidationResult::Ready { - from, - message: HotstuffMessage::ForeignProposal(proposal), - }) + self.process_foreign_proposal(local_committee_info, from, proposal) + .await }, HotstuffMessage::MissingTransactionsResponse(msg) => { if !self.active_missing_transaction_requests.remove_element(&msg.request_id) { @@ -163,14 +154,14 @@ impl OnMessageValidate { }); } - self.handle_missing_transactions(from, block).await + self.handle_missing_transactions_local_block(from, block).await } - pub async fn update_parked_blocks( + pub fn update_local_parked_blocks( &self, current_height: NodeHeight, transaction_id: &TransactionId, - ) -> Result, HotStuffError> { + ) -> Result, HotStuffError> { let maybe_unparked_block = self .store .with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?; @@ -181,19 +172,28 @@ impl OnMessageValidate { info!(target: LOG_TARGET, "♻️ all transactions for block {unparked_block} are ready for consensus"); - let vn = self - .epoch_manager - .get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by()) - .await?; - let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady { block: unparked_block.as_leaf_block(), }); - Ok(Some(( - vn.address, - HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }), - ))) + Ok(Some(ProposalMessage { block: unparked_block })) + } + + pub fn update_foreign_parked_blocks( + &self, + transaction_id: &TransactionId, + ) -> Result, HotStuffError> { + let unparked_foreign_blocks = self + .store + .with_write_tx(|tx| ForeignParkedProposal::remove_by_transaction_id(tx, transaction_id))?; + + if unparked_foreign_blocks.is_empty() { + return Ok(vec![]); + }; + + info!(target: LOG_TARGET, "♻️ all transactions for {} foreign block(s) are ready for consensus", unparked_foreign_blocks.len()); + + Ok(unparked_foreign_blocks) } async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> { @@ -208,7 +208,7 @@ impl OnMessageValidate { Ok(()) } - async fn handle_missing_transactions( + async fn handle_missing_transactions_local_block( &mut self, from: TConsensusSpec::Addr, block: Block, @@ -270,6 +270,73 @@ impl OnMessageValidate { Ok(missing_tx_ids) } + + async fn process_foreign_proposal( + &mut self, + local_committee_info: &CommitteeInfo, + from: TConsensusSpec::Addr, + msg: ForeignProposalMessage, + ) -> Result, HotStuffError> { + info!( + target: LOG_TARGET, + "🧩 new unvalidated FOREIGN PROPOSAL message {} from {}", + msg, + from + ); + + if let Err(err) = self.check_proposal(&msg.block).await { + return Ok(MessageValidationResult::Invalid { + from, + message: HotstuffMessage::ForeignProposal(msg), + err, + }); + } + + if msg.block.commands().is_empty() { + debug!( + target: LOG_TARGET, + "✅ Block {} is empty (no missing transactions)", msg.block + ); + return Ok(MessageValidationResult::Ready { + from, + message: HotstuffMessage::ForeignProposal(msg), + }); + } + + self.store.with_write_tx(|tx| { + let missing_tx_ids = TransactionRecord::get_missing( + &**tx, + msg.block.all_transaction_ids_in_committee(local_committee_info), + )?; + + if missing_tx_ids.is_empty() { + debug!( + target: LOG_TARGET, + "✅ Foreign Block {} has no missing transactions", msg.block + ); + return Ok(MessageValidationResult::Ready { + from, + message: HotstuffMessage::ForeignProposal(msg), + }); + } + + info!( + target: LOG_TARGET, + "⏳ Foreign Block {} has {} missing transactions", msg.block, missing_tx_ids.len(), + ); + + let parked_block = ForeignParkedProposal::from(msg); + parked_block.insert(tx)?; + parked_block.add_missing_transactions(tx, &missing_tx_ids)?; + + Ok(MessageValidationResult::ParkedProposal { + block_id: *parked_block.block.id(), + epoch: parked_block.block.epoch(), + proposed_by: parked_block.block.proposed_by().clone(), + missing_txs: missing_tx_ids, + }) + }) + } } #[derive(Debug)] diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 17eb596230..9aa060283e 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -171,12 +171,14 @@ where TConsensusSpec: ConsensusSpec )?; // Add executions for this block - debug!( - target: LOG_TARGET, - "Adding {} executed transaction(s) to block {}", - executed_transactions.len(), - next_block.id() - ); + if !executed_transactions.is_empty() { + debug!( + target: LOG_TARGET, + "Saving {} executed transaction(s) for block {}", + executed_transactions.len(), + next_block.id() + ); + } for executed in executed_transactions.into_values() { executed.for_block(*next_block.id()).insert_if_required(tx)?; } @@ -645,7 +647,7 @@ pub fn get_non_local_shards(diff: &[SubstateChange], local_committee_info: &Comm .map(|ch| { ch.versioned_substate_id() .to_substate_address() - .to_shard(local_committee_info.num_shards()) + .to_shard(local_committee_info.num_preshards()) }) .filter(|shard| local_committee_info.shard_group().contains(shard)) .collect() diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 9e03e1fea0..b5ae2b7a86 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -6,13 +6,16 @@ use tari_dan_common_types::{committee::CommitteeInfo, optional::Optional, ShardG use tari_dan_storage::{ consensus_models::{ Block, + BlockId, BlockPledge, Command, ForeignProposal, ForeignReceiveCounters, LeafBlock, QuorumCertificate, + TransactionAtom, TransactionPool, + TransactionPoolRecord, TransactionPoolStage, }, StateStore, @@ -218,61 +221,6 @@ where TConsensusSpec: ConsensusSpec is_qc_saved = true; } - #[allow(clippy::mutable_key_type)] - let maybe_pledges = if remote_decision.is_commit() { - let pledges = block_pledge.remove_transaction_pledges(&atom.id).ok_or_else(|| { - HotStuffError::ForeignNodeOmittedTransactionPledges { - foreign_block_id: *block.id(), - transaction_id: atom.id, - } - })?; - - // Validate that provided evidence is correct - // TODO: there are a lot of validations to be done on evidence and the foreign block in general, - // this is here as a sanity check and should change to not be a fatal error in consensus - for pledge in &pledges { - let address = pledge.versioned_substate_id().to_substate_address(); - let evidence = atom.evidence.get(&address).ok_or_else(|| { - ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: format!("Pledge {pledge} for address {address} not found in evidence"), - } - })?; - if evidence.lock.is_output() && pledge.is_input() { - return Err(ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: format!( - "Pledge {pledge} is an input but evidence is an output for address {address}" - ), - } - .into()); - } - if !evidence.lock.is_output() && pledge.is_output() { - return Err(ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: format!( - "Pledge {pledge} is an output but evidence is an input for address {address}" - ), - } - .into()); - } - } - Some(pledges) - } else { - if block_pledge.remove_transaction_pledges(&atom.id).is_some() { - return Err(ProposalValidationError::ForeignInvalidPledge { - block_id: *block.id(), - transaction_id: atom.id, - details: "Remote decided ABORT but provided pledges".to_string(), - } - .into()); - } - None - }; - // Update the transaction record with any new information provided by this foreign block tx_rec.update_remote_data( tx, @@ -282,11 +230,14 @@ where TConsensusSpec: ConsensusSpec atom.evidence.clone(), )?; - if let Some(pledges) = maybe_pledges { - // If the foreign shard has committed the transaction, we can add the pledges to the transaction - // record - tx_rec.add_foreign_pledges(tx, foreign_committee_info.shard_group(), pledges)?; - } + self.validate_and_add_pledges( + tx, + &tx_rec, + block.id(), + atom, + &mut block_pledge, + foreign_committee_info, + )?; if tx_rec.current_stage().is_new() { info!( @@ -420,6 +371,15 @@ where TConsensusSpec: ConsensusSpec atom.evidence.clone(), )?; + self.validate_and_add_pledges( + tx, + &tx_rec, + block.id(), + atom, + &mut block_pledge, + foreign_committee_info, + )?; + // Good debug info // tx_rec.evidence().iter().for_each(|(addr, ev)| { // let includes_local = local_committee_info.includes_substate_address(addr); @@ -517,6 +477,76 @@ where TConsensusSpec: ConsensusSpec Ok(()) } + fn validate_and_add_pledges( + &self, + tx: &mut ::WriteTransaction<'_>, + tx_rec: &TransactionPoolRecord, + block_id: &BlockId, + atom: &TransactionAtom, + block_pledge: &mut BlockPledge, + foreign_committee_info: &CommitteeInfo, + ) -> Result<(), HotStuffError> { + #[allow(clippy::mutable_key_type)] + let maybe_pledges = if atom.decision.is_commit() { + let pledges = block_pledge.remove_transaction_pledges(&atom.id).ok_or_else(|| { + HotStuffError::ForeignNodeOmittedTransactionPledges { + foreign_block_id: *block_id, + transaction_id: atom.id, + } + })?; + + // Validate that provided evidence is correct + // TODO: there are a lot of validations to be done on evidence and the foreign block in general, + // this is here as a sanity check and should change to not be a fatal error in consensus + for pledge in &pledges { + let address = pledge.versioned_substate_id().to_substate_address(); + let evidence = + atom.evidence + .get(&address) + .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { + block_id: *block_id, + transaction_id: atom.id, + details: format!("Pledge {pledge} for address {address} not found in evidence"), + })?; + if evidence.lock.is_output() && pledge.is_input() { + return Err(ProposalValidationError::ForeignInvalidPledge { + block_id: *block_id, + transaction_id: atom.id, + details: format!("Pledge {pledge} is an input but evidence is an output for address {address}"), + } + .into()); + } + if !evidence.lock.is_output() && pledge.is_output() { + return Err(ProposalValidationError::ForeignInvalidPledge { + block_id: *block_id, + transaction_id: atom.id, + details: format!("Pledge {pledge} is an output but evidence is an input for address {address}"), + } + .into()); + } + } + Some(pledges) + } else { + if block_pledge.remove_transaction_pledges(&atom.id).is_some() { + return Err(ProposalValidationError::ForeignInvalidPledge { + block_id: *block_id, + transaction_id: atom.id, + details: "Remote decided ABORT but provided pledges".to_string(), + } + .into()); + } + None + }; + + if let Some(pledges) = maybe_pledges { + // If the foreign shard has committed the transaction, we can add the pledges to the transaction + // record + tx_rec.add_foreign_pledges(tx, foreign_committee_info.shard_group(), pledges)?; + } + + Ok(()) + } + fn validate_proposed_block( &self, from: &TConsensusSpec::Addr, diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index 6d9f1dfb3a..0143e579a9 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -5,11 +5,7 @@ use std::{collections::HashSet, marker::PhantomData}; use indexmap::IndexMap; use log::*; -use tari_dan_common_types::{ - committee::CommitteeInfo, - optional::{IsNotFoundError, Optional}, - Epoch, -}; +use tari_dan_common_types::{committee::CommitteeInfo, optional::IsNotFoundError, Epoch}; use tari_dan_storage::{ consensus_models::{ Decision, @@ -60,28 +56,20 @@ impl> let mut non_local_inputs = HashSet::new(); for input in transaction.all_inputs_iter() { + if !local_committee_info.includes_substate_id(&input.substate_id) { + non_local_inputs.insert(input); + continue; + } + match input.version() { Some(version) => { - if !local_committee_info.includes_substate_address( - &input.to_substate_address().expect("succeeds because version is Some"), - ) { - non_local_inputs.insert(input); - continue; - } - let id = VersionedSubstateId::new(input.substate_id, version); let substate = store.get(&id)?; info!(target: LOG_TARGET, "Resolved LOCAL substate: {id}"); resolved_substates.insert(id.substate_id, substate); }, None => { - let substate = match store.get_latest(&input.substate_id).optional()? { - Some(substate) => substate, - None => { - non_local_inputs.insert(input); - continue; - }, - }; + let substate = store.get_latest(&input.substate_id)?; info!(target: LOG_TARGET, "Resolved LOCAL unversioned substate: {input}"); resolved_substates.insert(input.substate_id, substate); }, diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index a4733ef2ac..b21a1a0abb 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -346,7 +346,7 @@ impl HotstuffWorker { match self .on_message_validate - .handle(current_height, from.clone(), msg) + .handle(current_height, local_committee_info, from.clone(), msg) .await? { MessageValidationResult::Ready { from, message: msg } => { @@ -368,17 +368,10 @@ impl HotstuffWorker { } => { let mut request_from_address = from; if request_from_address == self.local_validator_addr { - // let vn = self - // .epoch_manager - // .get_validator_node_by_public_key(epoch, &proposed_by) - // .await?; - // - // let mut request_from_address = vn.address; - // - // // (Yet another) Edge case: If we're catching up, we could be the proposer but we no longer have - // the // transaction (we deleted our database) In this case, request from - // another random VN // (TODO: not 100% reliable) - // if request_from_address == self.local_validator_addr { + // Edge case: If we're catching up, we could be the proposer but we no longer have + // the transaction (we deleted our database likely during development testing). + // In this case, request from another random VN. + // (TODO: not 100% reliable since we're just asking a single random committee member) let mut local_committee = self.epoch_manager.get_local_committee(epoch).await?; local_committee.shuffle(); @@ -463,24 +456,56 @@ impl HotstuffWorker { tx_id: &TransactionId, local_committee_info: &CommitteeInfo, ) -> Result { - match self + let mut is_any_block_unparked = false; + if let Some(msg) = self .on_message_validate - .update_parked_blocks(current_height, tx_id) - .await? + .update_local_parked_blocks(current_height, tx_id)? { - Some((from, msg)) => { - if let Err(e) = self - .dispatch_hotstuff_message(current_epoch, from, msg, local_committee_info) - .await - { - self.on_failure("on_new_transaction -> dispatch_hotstuff_message", &e) - .await; - return Err(e); - } - Ok(true) - }, - None => Ok(false), + let vn = self + .epoch_manager + .get_validator_node_by_public_key(msg.block.epoch(), msg.block.proposed_by()) + .await?; + + if let Err(e) = self + .dispatch_hotstuff_message( + current_epoch, + vn.address, + HotstuffMessage::Proposal(msg), + local_committee_info, + ) + .await + { + self.on_failure("on_new_transaction -> dispatch_hotstuff_message", &e) + .await; + return Err(e); + } + is_any_block_unparked = true; } + + let unparked_foreign_blocks = self.on_message_validate.update_foreign_parked_blocks(tx_id)?; + is_any_block_unparked |= !unparked_foreign_blocks.is_empty(); + for parked in unparked_foreign_blocks { + let vn = self + .epoch_manager + .get_validator_node_by_public_key(parked.block().epoch(), parked.block().proposed_by()) + .await?; + + if let Err(e) = self + .dispatch_hotstuff_message( + current_epoch, + vn.address, + HotstuffMessage::ForeignProposal(parked.into()), + local_committee_info, + ) + .await + { + self.on_failure("on_new_transaction -> dispatch_hotstuff_message", &e) + .await; + return Err(e); + } + } + + Ok(is_any_block_unparked) } async fn on_epoch_manager_event(&mut self, event: EpochManagerEvent) -> Result<(), HotStuffError> { diff --git a/dan_layer/consensus/src/messages/proposal.rs b/dan_layer/consensus/src/messages/proposal.rs index fe756103f8..b3f32d781b 100644 --- a/dan_layer/consensus/src/messages/proposal.rs +++ b/dan_layer/consensus/src/messages/proposal.rs @@ -4,7 +4,7 @@ use std::fmt::{Display, Formatter}; use serde::Serialize; -use tari_dan_storage::consensus_models::{Block, BlockPledge, QuorumCertificate}; +use tari_dan_storage::consensus_models::{Block, BlockPledge, ForeignParkedProposal, QuorumCertificate}; #[derive(Debug, Clone, Serialize)] pub struct ProposalMessage { @@ -24,6 +24,22 @@ pub struct ForeignProposalMessage { pub block_pledge: BlockPledge, } +impl From for ForeignParkedProposal { + fn from(msg: ForeignProposalMessage) -> Self { + ForeignParkedProposal::new(msg.block, msg.justify_qc, msg.block_pledge) + } +} + +impl From for ForeignProposalMessage { + fn from(block: ForeignParkedProposal) -> Self { + ForeignProposalMessage { + block: block.block, + justify_qc: block.justify_qc, + block_pledge: block.block_pledge, + } + } +} + impl Display for ForeignProposalMessage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index ba8efb026b..57eabb8437 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -180,17 +180,7 @@ async fn node_requests_missing_transaction_from_local_leader() { .send_transaction_to(&TestAddress::new("2"), Decision::Commit, 1, 5) .await; // All VNs will decide the same thing - test.create_execution_at_destination( - TestVnDestination::All, - create_execution_result_for_transaction( - BlockId::zero(), - *transaction.id(), - transaction.current_decision(), - 0, - transaction.resolved_inputs.clone().unwrap_or_default(), - transaction.resulting_outputs.clone().unwrap_or_default(), - ), - ); + test.create_execution_at_destination_for_transaction(TestVnDestination::All, &transaction); } test.start_epoch(Epoch(1)).await; loop { @@ -448,7 +438,6 @@ async fn multishard_local_inputs_foreign_outputs() { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore = "TODO: this test does not work because more work is needed on multi-sharded output-only involvement"] async fn multishard_local_inputs_and_outputs_foreign_outputs() { setup_logger(); let mut test = Test::builder() @@ -477,8 +466,13 @@ async fn multishard_local_inputs_and_outputs_foreign_outputs() { .collect(), outputs_0.into_iter().chain(outputs_2).collect(), ); - test.send_transaction_to_destination(TestVnDestination::All, tx1.clone()) + test.send_transaction_to_destination(TestVnDestination::Committee(0), tx1.clone()) + .await; + test.send_transaction_to_destination(TestVnDestination::Committee(1), tx1.clone()) .await; + // Just add the result for executing the transaction to committee 2, the transaction itself will be requested by + // consensus. + test.create_execution_at_destination_for_transaction(TestVnDestination::Committee(2), &tx1); test.start_epoch(Epoch(1)).await; @@ -722,10 +716,9 @@ async fn single_shard_input_conflict() { .build(); let tx2 = TransactionRecord::new(tx2); - test.create_execution_at_destination( + test.add_execution_at_destination( TestVnDestination::All, create_execution_result_for_transaction( - BlockId::zero(), *tx1.id(), Decision::Commit, 0, @@ -733,10 +726,9 @@ async fn single_shard_input_conflict() { vec![], ), ) - .create_execution_at_destination( + .add_execution_at_destination( TestVnDestination::All, create_execution_result_for_transaction( - BlockId::zero(), *tx2.id(), Decision::Commit, 0, @@ -961,10 +953,9 @@ async fn single_shard_unversioned_inputs() { test.send_transaction_to_destination(TestVnDestination::All, tx.clone()) .await; - test.create_execution_at_destination( + test.add_execution_at_destination( TestVnDestination::All, create_execution_result_for_transaction( - BlockId::zero(), *tx.id(), Decision::Commit, 0, diff --git a/dan_layer/consensus_tests/src/support/executions_store.rs b/dan_layer/consensus_tests/src/support/executions_store.rs index 6a58bdfffc..e7611c866a 100644 --- a/dan_layer/consensus_tests/src/support/executions_store.rs +++ b/dan_layer/consensus_tests/src/support/executions_store.rs @@ -6,10 +6,10 @@ use std::{ sync::{Arc, RwLock}, }; -use tari_dan_storage::consensus_models::BlockTransactionExecution; +use tari_dan_storage::consensus_models::TransactionExecution; use tari_transaction::TransactionId; -type TestExecutionStore = HashMap; +type TestExecutionStore = HashMap; #[derive(Debug, Clone, Default)] pub struct TestTransactionExecutionsStore { @@ -23,15 +23,15 @@ impl TestTransactionExecutionsStore { } } - pub fn insert(&self, execution: BlockTransactionExecution) -> &Self { + pub fn insert(&self, execution: TransactionExecution) -> &Self { self.transactions .write() .unwrap() - .insert(*execution.transaction_id(), execution); + .insert(execution.transaction_id, execution); self } - pub fn get(&self, transaction_id: &TransactionId) -> Option { + pub fn get(&self, transaction_id: &TransactionId) -> Option { self.transactions.read().unwrap().get(transaction_id).cloned() } } diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index b7e01957d5..4180c118a4 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -13,11 +13,11 @@ use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeHeigh use tari_dan_storage::{ consensus_models::{ BlockId, - BlockTransactionExecution, Decision, QcId, SubstateLockType, SubstateRecord, + TransactionExecution, TransactionRecord, VersionedSubstateIdLockIntent, }, @@ -114,25 +114,31 @@ impl Test { } pub async fn send_transaction_to_destination(&self, dest: TestVnDestination, transaction: TransactionRecord) { - self.create_execution_at_destination( - dest.clone(), - create_execution_result_for_transaction( - BlockId::zero(), - *transaction.id(), - transaction.current_decision(), - 0, - transaction.resolved_inputs.clone().unwrap_or_default(), - transaction.resulting_outputs.clone().unwrap_or_default(), - ), - ); + self.create_execution_at_destination_for_transaction(dest.clone(), &transaction); self.network.send_transaction(dest, transaction).await; } - pub fn create_execution_at_destination( + pub fn add_execution_at_destination(&self, dest: TestVnDestination, execution: TransactionExecution) -> &Self { + for vn in self.validators.values() { + if dest.is_for(&vn.address, vn.shard_group, vn.num_committees) { + vn.transaction_executions.insert(execution.clone()); + } + } + self + } + + pub fn create_execution_at_destination_for_transaction( &self, dest: TestVnDestination, - execution: BlockTransactionExecution, + transaction: &TransactionRecord, ) -> &Self { + let execution = create_execution_result_for_transaction( + *transaction.id(), + transaction.current_decision(), + 0, + transaction.resolved_inputs.clone().unwrap_or_default(), + transaction.resulting_outputs.clone().unwrap_or_default(), + ); for vn in self.validators.values() { if dest.is_for(&vn.address, vn.shard_group, vn.num_committees) { vn.transaction_executions.insert(execution.clone()); diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index c61babdf4b..670eff726b 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -6,10 +6,9 @@ use std::{iter, time::Duration}; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use tari_common_types::types::PrivateKey; use tari_dan_storage::consensus_models::{ - BlockId, - BlockTransactionExecution, Decision, ExecutedTransaction, + TransactionExecution, TransactionRecord, VersionedSubstateIdLockIntent, }; @@ -35,30 +34,22 @@ pub fn build_transaction_from( tx.set_abort_reason(RejectReason::ExecutionFailure("Test aborted".to_string())); } - let execution = create_execution_result_for_transaction( - // We're just building the execution here for DRY purposes, so genesis block id isn't used - BlockId::zero(), - *tx.id(), - decision, - fee, - resolved_inputs, - resulting_outputs.clone(), - ); + let execution = + create_execution_result_for_transaction(*tx.id(), decision, fee, resolved_inputs, resulting_outputs.clone()); - tx.execution_result = Some(execution.execution.result); - tx.resulting_outputs = Some(execution.execution.resulting_outputs); - tx.resolved_inputs = Some(execution.execution.resolved_inputs); + tx.execution_result = Some(execution.result); + tx.resulting_outputs = Some(execution.resulting_outputs); + tx.resolved_inputs = Some(execution.resolved_inputs); tx } pub fn create_execution_result_for_transaction( - block_id: BlockId, tx_id: TransactionId, decision: Decision, fee: u64, resolved_inputs: Vec, resulting_outputs: Vec, -) -> BlockTransactionExecution { +) -> TransactionExecution { let result = if decision.is_commit() { let mut diff = SubstateDiff::new(); for output in &resulting_outputs { @@ -89,8 +80,7 @@ pub fn create_execution_result_for_transaction( )) }; - BlockTransactionExecution::new( - block_id, + TransactionExecution::new( tx_id, ExecuteResult { finalize: FinalizeResult::new(tx_id.into_array().into(), vec![], vec![], result, FeeReceipt { diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 9c8358f05f..5aedae350d 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -68,6 +68,39 @@ create table parked_blocks -- block_id must be unique. Optimise fetching by block_id create unique index parked_blocks_uniq_idx_id on parked_blocks (block_id); +CREATE TABLE missing_transactions +( + id integer not NULL primary key AUTOINCREMENT, + block_id text not NULL, + block_height bigint not NULL, + transaction_id text not NULL, + is_awaiting_execution boolean not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (block_id) REFERENCES parked_blocks (block_id) +); + +create table foreign_parked_blocks +( + id integer not null primary key AUTOINCREMENT, + block_id text not NULL, + block text not NULL, + block_pledges text not NULL, + justify_qc text not NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- block_id must be unique. Optimise fetching by block_id +create unique index foreign_parked_blocks_uniq_idx_id on parked_blocks (block_id); + +CREATE TABLE foreign_missing_transactions +( + id integer not NULL primary key AUTOINCREMENT, + parked_block_id integer not NULL, + transaction_id text not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (parked_block_id) REFERENCES foreign_parked_blocks (id) +); + create table leaf_blocks ( id integer not null primary key AUTOINCREMENT, @@ -313,18 +346,6 @@ create table votes created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); - -CREATE TABLE missing_transactions -( - id integer not NULL primary key AUTOINCREMENT, - block_id text not NULL, - block_height bigint not NULL, - transaction_id text not NULL, - is_awaiting_execution boolean not NULL, - created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (block_id) REFERENCES parked_blocks (block_id) -); - CREATE TABLE foreign_proposals ( id integer not NULL primary key AUTOINCREMENT, @@ -394,7 +415,6 @@ CREATE TABLE pending_state_tree_diffs CREATE UNIQUE INDEX pending_state_tree_diffs_uniq_idx_block_id_shard on pending_state_tree_diffs (block_id, shard); - CREATE TABLE epoch_checkpoints ( id integer not NULL primary key AUTOINCREMENT, diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index ad95259a70..087837abbc 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -53,6 +53,26 @@ diesel::table! { } } +diesel::table! { + foreign_missing_transactions (id) { + id -> Integer, + parked_block_id -> Integer, + transaction_id -> Text, + created_at -> Timestamp, + } +} + +diesel::table! { + foreign_parked_blocks (id) { + id -> Integer, + block_id -> Text, + block -> Text, + block_pledges -> Text, + justify_qc -> Text, + created_at -> Timestamp, + } +} + diesel::table! { foreign_proposals (id) { id -> Integer, @@ -408,10 +428,14 @@ diesel::table! { } } +diesel::joinable!(foreign_missing_transactions -> foreign_parked_blocks (parked_block_id)); + diesel::allow_tables_to_appear_in_same_query!( block_diffs, blocks, epoch_checkpoints, + foreign_missing_transactions, + foreign_parked_blocks, foreign_proposals, foreign_receive_counters, foreign_send_counters, diff --git a/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs b/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs new file mode 100644 index 0000000000..a7643f6127 --- /dev/null +++ b/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs @@ -0,0 +1,34 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use diesel::Queryable; +use tari_dan_storage::{consensus_models, StorageError}; +use time::PrimitiveDateTime; + +use crate::serialization::deserialize_json; + +#[derive(Debug, Clone, Queryable)] +pub struct ForeignParkedBlock { + pub id: i32, + pub block_id: String, + pub block: String, + pub block_pledges: String, + pub justify_qc: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::ForeignParkedProposal { + type Error = StorageError; + + fn try_from(value: ForeignParkedBlock) -> Result { + let block = deserialize_json(&value.block)?; + let block_pledge = deserialize_json(&value.block_pledges)?; + let justify_qc = deserialize_json(&value.justify_qc)?; + + Ok(consensus_models::ForeignParkedProposal::new( + block, + justify_qc, + block_pledge, + )) + } +} diff --git a/dan_layer/state_store_sqlite/src/sql_models/mod.rs b/dan_layer/state_store_sqlite/src/sql_models/mod.rs index b91b44b682..7c232f530f 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/mod.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/mod.rs @@ -5,6 +5,7 @@ mod block; mod block_diff; mod bookkeeping; mod epoch_checkpoint; +mod foreign_parked_block; mod foreign_substate_pledge; mod leaf_block; mod pending_state_tree_diff; @@ -21,6 +22,7 @@ pub use block::*; pub use block_diff::*; pub use bookkeeping::*; pub use epoch_checkpoint::*; +pub use foreign_parked_block::*; pub use foreign_substate_pledge::*; pub use leaf_block::*; pub use pending_state_tree_diff::*; diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 687767e210..9be6b2b753 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -26,6 +26,7 @@ use tari_dan_storage::{ Decision, EpochCheckpoint, Evidence, + ForeignParkedProposal, ForeignProposal, ForeignReceiveCounters, ForeignSendCounters, @@ -1195,16 +1196,16 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta operation: "missing_transactions_remove", source: e, })?; - let missing_transactions = missing_transactions::table - .select(missing_transactions::transaction_id) + let num_remaining = missing_transactions::table .filter(missing_transactions::block_id.eq(&block_id)) - .get_results::(self.connection()) + .count() + .get_result::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "missing_transactions_remove", source: e, })?; - if missing_transactions.is_empty() { + if num_remaining == 0 { // delete all entries that are for previous heights diesel::delete(missing_transactions::table) .filter(missing_transactions::block_height.lt(current_height.as_u64() as i64)) @@ -1220,6 +1221,110 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(None) } + fn foreign_parked_blocks_insert(&mut self, park_block: &ForeignParkedProposal) -> Result<(), StorageError> { + use crate::schema::foreign_parked_blocks; + + let values = ( + foreign_parked_blocks::block_id.eq(serialize_hex(park_block.block().id())), + foreign_parked_blocks::block.eq(serialize_json(park_block.block())?), + foreign_parked_blocks::block_pledges.eq(serialize_json(park_block.block_pledge())?), + foreign_parked_blocks::justify_qc.eq(serialize_json(park_block.justify_qc())?), + ); + + diesel::insert_into(foreign_parked_blocks::table) + .values(values) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_insert", + source: e, + })?; + + Ok(()) + } + + fn foreign_parked_blocks_insert_missing_transactions<'a, I: IntoIterator>( + &mut self, + park_block_id: &BlockId, + missing_transaction_ids: I, + ) -> Result<(), StorageError> { + use crate::schema::{foreign_missing_transactions, foreign_parked_blocks}; + + let parked_block_id = foreign_parked_blocks::table + .select(foreign_parked_blocks::id) + .filter(foreign_parked_blocks::block_id.eq(serialize_hex(park_block_id))) + .first::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_insert_missing_transactions", + source: e, + })?; + + let values = missing_transaction_ids + .into_iter() + .map(|tx_id| { + ( + foreign_missing_transactions::parked_block_id.eq(parked_block_id), + foreign_missing_transactions::transaction_id.eq(serialize_hex(tx_id)), + ) + }) + .collect::>(); + + diesel::insert_into(foreign_missing_transactions::table) + .values(values) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_insert_missing_transactions", + source: e, + })?; + + Ok(()) + } + + fn foreign_parked_blocks_remove_all_by_transaction( + &mut self, + transaction_id: &TransactionId, + ) -> Result, StorageError> { + use crate::schema::{foreign_missing_transactions, foreign_parked_blocks}; + + let transaction_id = serialize_hex(transaction_id); + + let removed_ids = diesel::delete(foreign_missing_transactions::table) + .filter(foreign_missing_transactions::transaction_id.eq(&transaction_id)) + .returning(foreign_missing_transactions::parked_block_id) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_remove_all_by_transaction", + source: e, + })?; + + if removed_ids.is_empty() { + return Ok(vec![]); + } + + let num_remaining = foreign_missing_transactions::table + .filter(foreign_missing_transactions::parked_block_id.eq_any(&removed_ids)) + .count() + .get_result::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_remove_all_by_transaction", + source: e, + })?; + + // If there are still missing transactions for the parked block, it is not yet unparked + if num_remaining > 0 { + return Ok(vec![]); + } + + let blocks = diesel::delete(foreign_parked_blocks::table) + .filter(foreign_parked_blocks::id.eq_any(&removed_ids)) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_remove_all_by_transaction", + source: e, + })?; + + blocks.into_iter().map(TryInto::try_into).collect() + } + fn votes_insert(&mut self, vote: &Vote) -> Result<(), StorageError> { use crate::schema::votes; diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index e08d5f14ee..8fe1c5ac78 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -15,6 +15,7 @@ use tari_common::configuration::Network; use tari_common_types::types::{FixedHash, FixedHashSizeError, PublicKey}; use tari_crypto::tari_utilities::epoch_time::EpochTime; use tari_dan_common_types::{ + committee::CommitteeInfo, hashing, optional::Optional, serde_with, @@ -344,6 +345,21 @@ impl Block { self.commands.iter().filter_map(|d| d.transaction().map(|t| t.id())) } + pub fn all_transaction_ids_in_committee<'a>( + &'a self, + committee_info: &'a CommitteeInfo, + ) -> impl Iterator + 'a { + self.commands + .iter() + .filter_map(|cmd| cmd.transaction()) + .filter(|t| { + t.evidence + .substate_addresses_iter() + .any(|addr| committee_info.includes_substate_address(addr)) + }) + .map(|t| t.id()) + } + pub fn all_committing_transactions_ids(&self) -> impl Iterator + '_ { self.commands.iter().filter_map(|d| d.committing()).map(|t| t.id()) } @@ -940,7 +956,11 @@ impl Block { tx: &TTx, ) -> Result { let mut pledges = BlockPledge::new(); - for atom in self.commands().iter().filter_map(|cmd| cmd.local_prepare()) { + for atom in self + .commands() + .iter() + .filter_map(|cmd| cmd.local_prepare().or_else(|| cmd.local_accept())) + { // No pledges for aborted transactions if atom.decision.is_abort() { continue; diff --git a/dan_layer/storage/src/consensus_models/block_pledges.rs b/dan_layer/storage/src/consensus_models/block_pledges.rs index e93dd914c1..8c19bd4fd1 100644 --- a/dan_layer/storage/src/consensus_models/block_pledges.rs +++ b/dan_layer/storage/src/consensus_models/block_pledges.rs @@ -7,14 +7,14 @@ use std::{ hash::Hash, }; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tari_engine_types::substate::{SubstateId, SubstateValue}; use tari_transaction::{SubstateRequirement, TransactionId, VersionedSubstateId}; use crate::consensus_models::{SubstateLockType, VersionedSubstateIdLockIntent}; #[allow(clippy::mutable_key_type)] pub type SubstatePledges = HashSet; -#[derive(Debug, Clone, Serialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct BlockPledge { pledges: HashMap, } @@ -59,7 +59,7 @@ impl FromIterator<(TransactionId, SubstatePledges)> for BlockPledge { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum SubstatePledge { Input { substate_id: VersionedSubstateId, @@ -149,9 +149,9 @@ impl Display for SubstatePledge { substate_id, is_write, .. } => { if *is_write { - write!(f, "Write input: {}", substate_id) + write!(f, "Write: {}", substate_id) } else { - write!(f, "Read input: {}", substate_id) + write!(f, "Read: {}", substate_id) } }, SubstatePledge::Output { substate_id } => write!(f, "Output: {}", substate_id), diff --git a/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs new file mode 100644 index 0000000000..15f9a64f1b --- /dev/null +++ b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs @@ -0,0 +1,77 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::fmt::Display; + +use tari_transaction::TransactionId; + +use crate::{ + consensus_models::{Block, BlockPledge, QuorumCertificate}, + StateStoreWriteTransaction, + StorageError, +}; + +#[derive(Debug, Clone)] +pub struct ForeignParkedProposal { + pub block: Block, + pub block_pledge: BlockPledge, + pub justify_qc: QuorumCertificate, +} + +impl ForeignParkedProposal { + pub fn new(block: Block, justify_qc: QuorumCertificate, block_pledge: BlockPledge) -> Self { + Self { + block, + block_pledge, + justify_qc, + } + } + + pub fn block(&self) -> &Block { + &self.block + } + + pub fn block_pledge(&self) -> &BlockPledge { + &self.block_pledge + } + + pub fn justify_qc(&self) -> &QuorumCertificate { + &self.justify_qc + } +} + +impl ForeignParkedProposal { + pub fn insert(&self, tx: &mut TTx) -> Result<(), StorageError> { + tx.foreign_parked_blocks_insert(self) + } + + pub fn add_missing_transactions<'a, TTx: StateStoreWriteTransaction, I: IntoIterator>( + &self, + tx: &mut TTx, + transaction_ids: I, + ) -> Result<(), StorageError> { + tx.foreign_parked_blocks_insert_missing_transactions(self.block.id(), transaction_ids) + } + + pub fn remove_by_transaction_id( + tx: &mut TTx, + transaction_id: &TransactionId, + ) -> Result, StorageError> { + tx.foreign_parked_blocks_remove_all_by_transaction(transaction_id) + } +} + +impl Display for ForeignParkedProposal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ForeignParkedBlock: block={}, qcs=", self.block)?; + for (_tx_id, pledges) in self.block_pledge().iter() { + write!(f, "{_tx_id}:[")?; + for pledge in pledges { + write!(f, "{pledge}, ")?; + } + write!(f, "],")?; + } + write!(f, "justify_qc={}", self.justify_qc)?; + Ok(()) + } +} diff --git a/dan_layer/storage/src/consensus_models/mod.rs b/dan_layer/storage/src/consensus_models/mod.rs index b2d9f3ba44..7c7161c59b 100644 --- a/dan_layer/storage/src/consensus_models/mod.rs +++ b/dan_layer/storage/src/consensus_models/mod.rs @@ -7,6 +7,7 @@ mod block_pledges; mod command; mod epoch_checkpoint; mod executed_transaction; +mod foreign_parked_proposal; mod foreign_proposal; mod foreign_receive_counters; mod foreign_send_counters; @@ -40,6 +41,7 @@ pub use block_pledges::*; pub use command::*; pub use epoch_checkpoint::*; pub use executed_transaction::*; +pub use foreign_parked_proposal::*; pub use foreign_proposal::*; pub use foreign_receive_counters::*; pub use foreign_send_counters::*; diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index 22649ef3a6..01d2e95a9f 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -606,7 +606,7 @@ impl TransactionPoolRecord { #[allow(clippy::mutable_key_type)] pub fn add_foreign_pledges( - &mut self, + &self, tx: &mut TTx, shard_group: ShardGroup, foreign_pledges: SubstatePledges, diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 0d54114aef..c3b5457a8d 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -27,6 +27,7 @@ use crate::{ Decision, EpochCheckpoint, Evidence, + ForeignParkedProposal, ForeignProposal, ForeignReceiveCounters, ForeignSendCounters, @@ -397,6 +398,8 @@ pub trait StateStoreWriteTransaction { tx_ids: I, ) -> Result<(), StorageError>; + // -------------------------------- Missing Transactions -------------------------------- // + fn missing_transactions_insert< 'a, IMissing: IntoIterator, @@ -414,6 +417,19 @@ pub trait StateStoreWriteTransaction { transaction_id: &TransactionId, ) -> Result, StorageError>; + fn foreign_parked_blocks_insert(&mut self, park_block: &ForeignParkedProposal) -> Result<(), StorageError>; + + fn foreign_parked_blocks_insert_missing_transactions<'a, I: IntoIterator>( + &mut self, + park_block_id: &BlockId, + missing_transaction_ids: I, + ) -> Result<(), StorageError>; + + fn foreign_parked_blocks_remove_all_by_transaction( + &mut self, + transaction_id: &TransactionId, + ) -> Result, StorageError>; + // -------------------------------- Votes -------------------------------- // fn votes_insert(&mut self, vote: &Vote) -> Result<(), StorageError>;