Skip to content

Commit

Permalink
fix(consensus)!: read-only resource + other fixes (tari-project#1134)
Browse files Browse the repository at this point in the history
Description
---
- Resources are locked as read-only in consensus
- Greatly reduce the size of the fee breakdown in transaction receipt
- Fix mempool incorrect involvement detection
- Fix consensus crash if versioned substate requirement is already DOWN
- Fix off-by-one error when sending state transitions in state sync

Motivation and Context
---
Resources are highly contended and therefore are read-only after
creation. Any transaction that mutates the resource will fail however
the template API has not changed in this PR to reflect that.

Any mints/burns after the initial mint will succeed only in the
local-only case, multi-committee mints/burns will fail.

Ideas for improvements:
1. Easy way: Allow user to specify read/write for resources (or perhaps
substates in general). Cons: poorer UX
2. Complex: initially locked as read-only until AllPrepared phase.
Transaction is executed and if the transaction results in a write for
the resource. The write lock is queued and no further progress can be
made on the transaction until all preceding write/read locks are
released. Once this occurs the transaction is re-executed and
`LocalAccept` proposed with as a write.
3. To improve concurrency, a resource issuer may lock their resource for
a period guaranteeing that no minting/burning or access rule changes can
occur until that period expires.
4. Automatically and implicitly read lock a resource for a guaranteed
period of time (epochs). A user may submit a transaction that updates
the resource, but that transaction will not be sequenced until the lock
expires.
5. Add a resource mint/burn instruction which will only involve a single
shard. This will naturally "park" the transaction until any multishard
locks are released and a local-only mint/burn/access rule update is
performed.
6. Eventual consistency: Total supply and access rule changes will
eventually reflect. All resource operations are logged and eventually
applied in an atomic way across shard groups, perhaps with a special
signed cross-shard message that will be locally proposed.
7. Apply point (1) and check access rules for the resource. If the
signer is not permitted to write the resource _at locking/pledging time
(before execution)_, the transaction is aborted. This is more related to
security, not concurrency.

How Has This Been Tested?
---
Manually

What process can a PR reviewer use to test or verify this change?
---
Submit transactions that concurrently read from the resource address. 

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Sep 16, 2024
1 parent b603394 commit f80d338
Show file tree
Hide file tree
Showing 51 changed files with 692 additions and 797 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl ExecutionOutput {
inputs
.iter()
.map(|(substate_req, substate)| {
let requested_specific_version = substate_req.version().is_some();
let lock_flag = if diff.down_iter().any(|(id, _)| id == substate_req.substate_id()) {
// Update all inputs that were DOWNed to be write locked
SubstateLockType::Write
Expand All @@ -64,6 +65,7 @@ impl ExecutionOutput {
VersionedSubstateIdLockIntent::new(
VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()),
lock_flag,
requested_specific_version,
)
})
.collect()
Expand All @@ -76,6 +78,7 @@ impl ExecutionOutput {
VersionedSubstateIdLockIntent::new(
VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()),
SubstateLockType::Read,
true,
)
})
.collect()
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_swarm_daemon/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use fern::FormatCallback;
pub fn init_logger() -> Result<(), log::SetLoggerError> {
fn should_skip(target: &str) -> bool {
const SKIP: [&str; 3] = ["hyper::", "h2::", "tower::"];
SKIP.iter().any(|s| target.starts_with(s))
target.is_empty() || SKIP.iter().any(|s| target.starts_with(s))
}

let colors = fern::colors::ColoredLevelConfig::new().info(fern::colors::Color::Green);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const LOG_TARGET: &str = "tari::validator_node::mempool::service";

#[derive(Debug)]
pub struct MempoolService<TValidator> {
num_preshards: NumPreshards,
transactions: HashSet<TransactionId>,
mempool_requests: mpsc::Receiver<MempoolRequest>,
epoch_manager: EpochManagerHandle<PeerAddress>,
Expand All @@ -82,7 +81,6 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
#[cfg(feature = "metrics")] metrics: PrometheusMempoolMetrics,
) -> Self {
Self {
num_preshards,
gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), gossip),
transactions: Default::default(),
mempool_requests,
Expand Down Expand Up @@ -158,9 +156,7 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
}
info!(
target: LOG_TARGET,
"🎱 Received NEW transaction from local: {} {:?}",
transaction.id(),
transaction
"🎱 Received NEW transaction from local: {transaction}",
);

self.handle_new_transaction(transaction, vec![], None).await?;
Expand Down Expand Up @@ -200,34 +196,34 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
);

let current_epoch = self.consensus_handle.current_view().get_epoch();
let maybe_sender_shard_group = self
let maybe_sender_committee_info = self
.epoch_manager
.get_committee_info_by_validator_address(current_epoch, &from)
.await
.optional()?
.map(|c| c.shard_group());
.optional()?;

// Only input shards propagate transactions to output shards. Check that this is true.
if !unverified_output_shards.is_empty() {
let Some(sender_shard) = maybe_sender_shard_group else {
let Some(sender_committee_info) = maybe_sender_committee_info else {
debug!(target: LOG_TARGET, "Sender {from} isn't registered but tried to send a new transaction with
output shards");
return Ok(());
};

let is_input_shard = transaction
.all_inputs_iter()
.filter_map(|s| s.to_shard(self.num_preshards))
.any(|s| sender_shard.contains(&s));
let is_input_shard = transaction.is_involved_inputs(&sender_committee_info);
if !is_input_shard {
warn!(target: LOG_TARGET, "Sender {from} sent a message with output shards but was not an input
shard. Ignoring message.");
return Ok(());
}
}

self.handle_new_transaction(transaction, unverified_output_shards, maybe_sender_shard_group)
.await?;
self.handle_new_transaction(
transaction,
unverified_output_shards,
maybe_sender_committee_info.map(|c| c.shard_group()),
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -276,8 +272,7 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
let tx_substate_address = SubstateAddress::for_transaction_receipt(transaction.id().into_receipt_address());

let local_committee_shard = self.epoch_manager.get_local_committee_info(current_epoch).await?;
let transaction_inputs = transaction.all_inputs_iter().filter_map(|i| i.to_substate_address());
let is_input_shard = local_committee_shard.includes_any_address(transaction_inputs);
let is_input_shard = transaction.is_involved_inputs(&local_committee_shard);
let is_output_shard = local_committee_shard.includes_any_address(
// Known output shards
// This is to allow for the txreceipt output
Expand Down
3 changes: 1 addition & 2 deletions bindings/dist/types/FeeBreakdown.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { FeeSource } from "./FeeSource";
export interface FeeBreakdown {
source: FeeSource;
amount: number;
breakdown: Record<FeeSource, bigint>;
}
2 changes: 1 addition & 1 deletion bindings/dist/types/FeeCostBreakdown.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import type { Amount } from "./Amount";
import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeCostBreakdown {
total_fees_charged: Amount;
breakdown: Array<FeeBreakdown>;
breakdown: FeeBreakdown;
}
2 changes: 1 addition & 1 deletion bindings/dist/types/FeeReceipt.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeReceipt {
total_fee_payment: Amount;
total_fees_paid: Amount;
cost_breakdown: Array<FeeBreakdown>;
cost_breakdown: FeeBreakdown;
}
1 change: 1 addition & 0 deletions bindings/dist/types/VersionedSubstateIdLockIntent.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId";
export interface VersionedSubstateIdLockIntent {
versioned_substate_id: VersionedSubstateId;
lock_type: SubstateLockType;
require_version: boolean;
}
3 changes: 1 addition & 2 deletions bindings/src/types/FeeBreakdown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
import type { FeeSource } from "./FeeSource";

export interface FeeBreakdown {
source: FeeSource;
amount: number;
breakdown: Record<FeeSource, bigint>;
}
2 changes: 1 addition & 1 deletion bindings/src/types/FeeCostBreakdown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";

export interface FeeCostBreakdown {
total_fees_charged: Amount;
breakdown: Array<FeeBreakdown>;
breakdown: FeeBreakdown;
}
2 changes: 1 addition & 1 deletion bindings/src/types/FeeReceipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeReceipt {
total_fee_payment: Amount;
total_fees_paid: Amount;
cost_breakdown: Array<FeeBreakdown>;
cost_breakdown: FeeBreakdown;
}
1 change: 1 addition & 0 deletions bindings/src/types/VersionedSubstateIdLockIntent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId";
export interface VersionedSubstateIdLockIntent {
versioned_substate_id: VersionedSubstateId;
lock_type: SubstateLockType;
require_version: boolean;
}
7 changes: 4 additions & 3 deletions dan_layer/consensus/src/hotstuff/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn calculate_dummy_blocks<TAddr: NodeAddressable, TLeaderStrategy: LeaderStr
let mut dummies = Vec::new();
with_dummy_blocks(
candidate_block.network(),
justify_block.epoch(),
candidate_block.epoch(),
justify_block.shard_group(),
candidate_block.justify(),
*justify_block.merkle_root(),
Expand Down Expand Up @@ -141,7 +141,7 @@ fn with_dummy_blocks<TAddr, TLeaderStrategy, F>(
let mut parent_block = high_qc.as_leaf_block();
let mut current_height = high_qc.block_height() + NodeHeight(1);
if current_height > new_height {
warn!(
error!(
target: LOG_TARGET,
"BUG: 🍼 no dummy blocks to calculate. current height {} is greater than new height {}",
current_height,
Expand All @@ -152,7 +152,8 @@ fn with_dummy_blocks<TAddr, TLeaderStrategy, F>(

debug!(
target: LOG_TARGET,
"🍼 calculating dummy blocks from {} to {}",
"🍼 calculating dummy blocks in epoch {} from {} to {}",
epoch,
current_height,
new_height,
);
Expand Down
11 changes: 11 additions & 0 deletions dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use tari_dan_storage::{
TransactionAtom,
TransactionPoolRecord,
TransactionPoolStage,
TransactionRecord,
},
StateStoreReadTransaction,
};
use tari_engine_types::commit_result::RejectReason;
use tari_transaction::TransactionId;

use crate::hotstuff::{block_change_set::ProposedBlockChangeSet, error::HotStuffError, ProposalValidationError};
Expand Down Expand Up @@ -112,6 +114,15 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
"⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}",
tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf
);

// Add an abort execution since we previously decided to commit
let mut transaction = TransactionRecord::get(tx, tx_rec.transaction_id())?;
transaction.set_abort_reason(RejectReason::ForeignShardGroupDecidedToAbort(format!(
"Foreign shard group {} decided to abort the transaction",
foreign_committee_info.shard_group()
)));
let exec = transaction.into_execution().expect("ABORT set above");
proposed_block_change_set.add_transaction_execution(exec)?;
}

// We need to add the justify QC to the evidence because the all prepare block could not include it
Expand Down
72 changes: 39 additions & 33 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tari_dan_storage::{
use tari_engine_types::{commit_result::RejectReason, substate::Substate};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::task;

use crate::{
hotstuff::{
Expand Down Expand Up @@ -78,6 +79,7 @@ type NextBlock = (
HashMap<TransactionId, TransactionExecution>,
);

#[derive(Debug, Clone)]
pub struct OnPropose<TConsensusSpec: ConsensusSpec> {
config: HotstuffConfig,
store: TConsensusSpec::StateStore,
Expand Down Expand Up @@ -119,7 +121,7 @@ where TConsensusSpec: ConsensusSpec
&mut self,
epoch: Epoch,
local_committee: &Committee<TConsensusSpec::Addr>,
local_committee_info: &CommitteeInfo,
local_committee_info: CommitteeInfo,
leaf_block: LeafBlock,
is_newview_propose: bool,
propose_epoch_end: bool,
Expand Down Expand Up @@ -168,41 +170,45 @@ where TConsensusSpec: ConsensusSpec
let base_layer_block_hash = current_base_layer_block_hash;
let base_layer_block_height = current_base_layer_block_height;

let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| {
let high_qc = HighQc::get(&**tx, epoch)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;
let on_propose = self.clone();
let (next_block, foreign_proposals) = task::spawn_blocking(move || {
on_propose.store.with_write_tx(|tx| {
let high_qc = HighQc::get(&**tx, epoch)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;

let (next_block, foreign_proposals, executed_transactions) = self.build_next_block(
tx,
epoch,
&leaf_block,
high_qc_cert,
validator.public_key,
local_committee_info,
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
base_layer_block_height,
base_layer_block_hash,
propose_epoch_end,
)?;
let (next_block, foreign_proposals, executed_transactions) = on_propose.build_next_block(
tx,
epoch,
&leaf_block,
high_qc_cert,
validator.public_key,
&local_committee_info,
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if
// this is a good idea.
is_newview_propose,
base_layer_block_height,
base_layer_block_hash,
propose_epoch_end,
)?;

// Add executions for this block
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}
// Add executions for this block
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}

next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>((next_block, foreign_proposals))
})?;
next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>((next_block, foreign_proposals))
})
})
.await??;

info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,8 @@ where TConsensusSpec: ConsensusSpec
let execution = self.execute_transaction(tx, block.id(), block.epoch(), tx_rec.transaction_id())?;
let mut execution = execution.into_transaction_execution();

// TODO: check the diff is valid against the provided input evidence (correct locks etc).

// TODO: can we modify the locks at this point? For multi-shard input transactions, we locked all inputs
// as Write due to lack of information. We now know what locks are necessary, and this
// block has the correct evidence (TODO: verify the atom) so this should be fine.
Expand Down
Loading

0 comments on commit f80d338

Please sign in to comment.