Skip to content

Commit

Permalink
fix(consensus)!: fix block and leader fee proposals (#1133)
Browse files Browse the repository at this point in the history
Description
---

Fixes leader and block fee proposals 

Motivation and Context
---
total_leader_fee was always 0 in blocks.

How Has This Been Tested?
---
Manually

What process can a PR reviewer use to test or verify this change?
---
Check the total_leader fee > 0 in blocks with LocalOnly/AllAccept

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Sep 5, 2024
1 parent d6bae3b commit 0ca3504
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 92 deletions.
64 changes: 28 additions & 36 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ where TConsensusSpec: ConsensusSpec

// Leader thinks that all local nodes agree that all shard groups have prepared, we are ready to accept
// locally
TransactionPoolStage::AllPrepared => self.local_accept_transaction(local_committee_info, &mut tx_rec),
// Leader thinks local nodes are ready to accept an ABORT
TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(
self.get_current_transaction_atom(local_committee_info, &mut tx_rec)?,
TransactionPoolStage::AllPrepared => Ok(Some(Command::LocalAccept(
self.get_transaction_atom_with_leader_fee(local_committee_info, &mut tx_rec)?,
))),
// Leader thinks local nodes are ready to accept an ABORT
TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(tx_rec.get_current_transaction_atom()))),
// Leader thinks that all foreign ACCEPT pledges have been received and, we are ready to accept the result
// (COMMIT/ABORT)
TransactionPoolStage::LocalAccepted => {
Expand Down Expand Up @@ -641,11 +641,11 @@ where TConsensusSpec: ConsensusSpec
"🏠️ Transaction {} is local only, proposing LocalOnly",
tx_rec.transaction_id(),
);
let involved = NonZeroU64::new(1).expect("1 > 0");
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
let atom = tx_rec.get_current_transaction_atom();
if atom.decision.is_commit() {

if tx_rec.current_decision().is_commit() {
let involved = NonZeroU64::new(1).expect("1 > 0");
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
let diff = execution.result().finalize.result.accept().ok_or_else(|| {
HotStuffError::InvariantError(format!(
"prepare_transaction: Transaction {} has COMMIT decision but execution failed when \
Expand All @@ -668,6 +668,7 @@ where TConsensusSpec: ConsensusSpec

executed_transactions.insert(*tx_rec.transaction_id(), execution);

let atom = tx_rec.get_current_transaction_atom();
Command::LocalOnly(atom)
},
PreparedTransaction::LocalOnly(LocalPreparedTransaction::EarlyAbort { transaction, .. }) => {
Expand Down Expand Up @@ -743,7 +744,7 @@ where TConsensusSpec: ConsensusSpec
) -> Result<Option<Command>, HotStuffError> {
// Only set to abort if either the local or one or more foreign shards decided to ABORT
if tx_rec.current_decision().is_abort() {
return Ok(Some(Command::SomePrepare(tx_rec.get_local_transaction_atom())));
return Ok(Some(Command::SomePrepare(tx_rec.get_current_transaction_atom())));
}

let mut execution =
Expand Down Expand Up @@ -771,27 +772,15 @@ where TConsensusSpec: ConsensusSpec
tx_rec.update_from_execution(&execution);

executed_transactions.insert(*tx_rec.transaction_id(), execution);
return Ok(Some(Command::AllPrepare(
self.get_current_transaction_atom(local_committee_info, tx_rec)?,
)));
return Ok(Some(Command::AllPrepare(tx_rec.get_current_transaction_atom())));
}

tx_rec.update_from_execution(&execution);

let atom = self.get_current_transaction_atom(local_committee_info, tx_rec)?;
executed_transactions.insert(*tx_rec.transaction_id(), execution);
// If we locally decided to ABORT, we are still saying that we think all prepared. When we enter the acceptance
// phase, we will propose SomeAccept for this case.
Ok(Some(Command::AllPrepare(atom)))
}

fn local_accept_transaction(
&self,
local_committee_info: &CommitteeInfo,
tx_rec: &mut TransactionPoolRecord,
) -> Result<Option<Command>, HotStuffError> {
let atom = self.get_current_transaction_atom(local_committee_info, tx_rec)?;
Ok(Some(Command::LocalAccept(atom)))
Ok(Some(Command::AllPrepare(tx_rec.get_current_transaction_atom())))
}

fn accept_transaction(
Expand Down Expand Up @@ -825,24 +814,27 @@ where TConsensusSpec: ConsensusSpec
*tx_rec.transaction_id(),
&filter_diff_for_committee(local_committee_info, diff),
)?;
Ok(Some(Command::AllAccept(tx_rec.get_current_transaction_atom())))
let atom = self.get_transaction_atom_with_leader_fee(local_committee_info, tx_rec)?;
Ok(Some(Command::AllAccept(atom)))
}

fn get_current_transaction_atom(
fn get_transaction_atom_with_leader_fee(
&self,
local_committee_info: &CommitteeInfo,
tx_rec: &mut TransactionPoolRecord,
) -> Result<TransactionAtom, HotStuffError> {
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"PROPOSE: Transaction {} involves zero shard groups",
tx_rec.transaction_id(),
))
})?;
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
if tx_rec.current_decision().is_commit() {
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"PROPOSE: Transaction {} involves zero shard groups",
tx_rec.transaction_id(),
))
})?;
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
}
let atom = tx_rec.get_current_transaction_atom();
Ok(atom)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,14 @@ where TConsensusSpec: ConsensusSpec
}
},
Command::LocalAccept(atom) => {
if !self.evaluate_local_accept_command(tx, block, &locked_block, atom, proposed_block_change_set)? {
if !self.evaluate_local_accept_command(
tx,
block,
&locked_block,
atom,
local_committee_info,
proposed_block_change_set,
)? {
proposed_block_change_set.no_vote();
return Ok(());
}
Expand All @@ -374,6 +381,7 @@ where TConsensusSpec: ConsensusSpec
local_committee_info,
&mut substate_store,
proposed_block_change_set,
&mut total_leader_fee,
)? {
proposed_block_change_set.no_vote();
return Ok(());
Expand Down Expand Up @@ -919,10 +927,12 @@ where TConsensusSpec: ConsensusSpec
err
);

tx_rec.set_local_decision(Decision::Abort);
execution.set_abort_reason(RejectReason::FailedToLockOutputs(err.to_string()));

tx_rec.set_local_decision(Decision::Abort);
tx_rec.set_transaction_fee(0);
tx_rec.set_next_stage(TransactionPoolStage::AllPrepared, true)?;

proposed_block_change_set
.set_next_transaction_update(tx_rec)?
.add_transaction_execution(execution)?;
Expand Down Expand Up @@ -1062,6 +1072,7 @@ where TConsensusSpec: ConsensusSpec
block: &Block,
locked_block: &LockedBlock,
atom: &TransactionAtom,
local_committee_info: &CommitteeInfo,
proposed_block_change_set: &mut ProposedBlockChangeSet,
) -> Result<bool, HotStuffError> {
let Some(mut tx_rec) =
Expand Down Expand Up @@ -1115,6 +1126,39 @@ where TConsensusSpec: ConsensusSpec
return Ok(false);
}

if atom.decision.is_commit() {
let Some(ref leader_fee) = atom.leader_fee else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} not set for AllAccept command in block {}",
atom.id,
block,
);
return Ok(false);
};

// Check the leader fee in the local accept phase. The fee only applied (is added to the block fee) for
// AllAccept
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64)
.ok_or_else(|| HotStuffError::InvariantError("Number of involved shard groups is 0".to_string()))?;
let calculated_leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
if calculated_leader_fee != *leader_fee {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: LocalAccept leader fee disagreement for block {}. Leader proposed {}, we calculated {}",
block,
atom.leader_fee.as_ref().expect("None already checked"),
calculated_leader_fee
);

return Ok(false);
}

tx_rec.set_leader_fee(calculated_leader_fee);
}

tx_rec.set_next_stage(
TransactionPoolStage::LocalAccepted,
tx_rec.evidence().all_addresses_justified(),
Expand All @@ -1133,11 +1177,12 @@ where TConsensusSpec: ConsensusSpec
local_committee_info: &CommitteeInfo,
substate_store: &mut PendingSubstateStore<TConsensusSpec::StateStore>,
proposed_block_change_set: &mut ProposedBlockChangeSet,
total_leader_fee: &mut u64,
) -> Result<bool, HotStuffError> {
if atom.decision.is_abort() {
warn!(
target: LOG_TARGET,
"❌ AllAccept command received for block {} but requires that the transaction is COMMIT",
"❌ NO VOTE: AllAccept command received for block {} but requires that the transaction is COMMIT",
block.id(),
);
return Ok(false);
Expand All @@ -1148,7 +1193,7 @@ where TConsensusSpec: ConsensusSpec
else {
warn!(
target: LOG_TARGET,
"⚠️ Local proposal received ({}) for transaction {} which is not in the pool. This is likely a previous transaction that has been re-proposed. Not voting on block.",
"⚠️ NO VOTE: Local proposal received ({}) for transaction {} which is not in the pool. This is likely a previous transaction that has been re-proposed. Not voting on block.",
block,
atom.id(),
);
Expand All @@ -1158,7 +1203,7 @@ where TConsensusSpec: ConsensusSpec
if !tx_rec.current_stage().is_local_accepted() {
warn!(
target: LOG_TARGET,
"{} ❌ AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}",
"{} ❌ NO VOTE: AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}",
self.local_validator_pk,
block,
tx_rec.transaction_id(),
Expand All @@ -1170,7 +1215,7 @@ where TConsensusSpec: ConsensusSpec
if tx_rec.current_decision().is_abort() {
warn!(
target: LOG_TARGET,
"❌ AllAccept decision disagreement for transaction {} in block {}. Leader proposed COMMIT, we decided ABORT",
"❌ NO VOTE: AllAccept decision disagreement for transaction {} in block {}. Leader proposed COMMIT, we decided ABORT",
tx_rec.transaction_id(),
block,
);
Expand All @@ -1180,7 +1225,7 @@ where TConsensusSpec: ConsensusSpec
if tx_rec.transaction_fee() != atom.transaction_fee {
warn!(
target: LOG_TARGET,
"❌ AllAccept transaction fee disagreement tx {} in block {}. Leader proposed {}, we calculated {}",
"❌ NO VOTE: AllAccept transaction fee disagreement tx {} in block {}. Leader proposed {}, we calculated {}",
tx_rec.transaction_id(),
block,
atom.transaction_fee,
Expand All @@ -1189,6 +1234,36 @@ where TConsensusSpec: ConsensusSpec
return Ok(false);
}

let Some(ref leader_fee) = atom.leader_fee else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} not set for AllAccept command in block {}",
atom.id,
block,
);
return Ok(false);
};

let local_leader_fee = tx_rec.leader_fee().ok_or_else(|| {
HotStuffError::InvariantError(format!(
"evaluate_all_accept_command: Transaction {} has COMMIT decision and is at LocalAccepted stage but \
leader fee is missing",
tx_rec.transaction_id()
))
})?;

if local_leader_fee != leader_fee {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee disagreement for tx {} in block {}. Leader proposed {}, we calculated {}",
atom.id,
block,
leader_fee,
local_leader_fee
);
return Ok(false);
}

let execution = BlockTransactionExecution::get_pending_for_block(tx, tx_rec.transaction_id(), block.parent())
.optional()?
.ok_or_else(|| {
Expand All @@ -1205,6 +1280,8 @@ where TConsensusSpec: ConsensusSpec
))
})?;

*total_leader_fee += leader_fee.fee();

substate_store.put_diff(
*tx_rec.transaction_id(),
&filter_diff_for_committee(local_committee_info, diff),
Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,6 @@ async fn foreign_shard_group_decides_to_abort() {
async fn multishard_local_inputs_foreign_outputs() {
setup_logger();
let mut test = Test::builder()
// Test can take 11s, this could cut it a little fine - may indicate that we need to optimise
.with_test_timeout(Duration::from_secs(60))
.add_committee(0, vec!["1", "2"])
.add_committee(1, vec!["3", "4"])
.start()
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/engine_types/src/substate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ impl SubstateId {
pub fn is_transaction_receipt(&self) -> bool {
matches!(self, Self::TransactionReceipt(_))
}

pub fn is_read_only(&self) -> bool {
matches!(self, Self::TransactionReceipt(_) | Self::Resource(_))
}
}

impl From<ComponentAddress> for SubstateId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,20 @@ create unique index transaction_executions_uniq_block_id_transaction_id on trans

create table transaction_pool
(
id integer not null primary key AUTOINCREMENT,
transaction_id text not null,
original_decision text not null,
local_decision text null,
remote_decision text null,
evidence text null,
transaction_fee bigint null,
leader_fee bigint null,
global_exhaust_burn bigint null,
stage text not null,
pending_stage text null,
is_ready boolean not null,
confirm_stage text null,
updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
id integer not null primary key AUTOINCREMENT,
transaction_id text not null,
original_decision text not null,
local_decision text null,
remote_decision text null,
evidence text null,
transaction_fee bigint not null DEFAULT 0,
leader_fee text null,
stage text not null,
pending_stage text null,
is_ready boolean not null,
confirm_stage text null,
updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (transaction_id) REFERENCES transactions (transaction_id)
);
create unique index transaction_pool_uniq_idx_transaction_id on transaction_pool (transaction_id);
Expand All @@ -330,6 +329,8 @@ create table transaction_pool_state_updates
evidence text not null,
is_ready boolean not null,
local_decision text not null,
transaction_fee bigint not null,
leader_fee text null,
remote_decision text null,
is_applied boolean not null DEFAULT '0',
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -521,7 +522,6 @@ BEGIN
new_evidence,
transaction_fee,
leader_fee,
global_exhaust_burn,
stage,
new_stage,
pending_stage,
Expand All @@ -539,9 +539,8 @@ BEGIN
OLD.remote_decision,
OLD.evidence,
NEW.evidence,
OLD.transaction_fee,
OLD.leader_fee,
OLD.global_exhaust_burn,
NEW.transaction_fee,
NEW.leader_fee,
OLD.stage,
NEW.stage,
OLD.pending_stage,
Expand Down
2 changes: 2 additions & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState
evidence,
is_ready,
local_decision,
transaction_fee,
leader_fee,
remote_decision,
is_applied,
created_at
Expand Down
Loading

0 comments on commit 0ca3504

Please sign in to comment.