Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus)!: fix block and leader fee proposals #1133

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 28 additions & 36 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ where TConsensusSpec: ConsensusSpec

// Leader thinks that all local nodes agree that all shard groups have prepared, we are ready to accept
// locally
TransactionPoolStage::AllPrepared => self.local_accept_transaction(local_committee_info, &mut tx_rec),
// Leader thinks local nodes are ready to accept an ABORT
TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(
self.get_current_transaction_atom(local_committee_info, &mut tx_rec)?,
TransactionPoolStage::AllPrepared => Ok(Some(Command::LocalAccept(
self.get_transaction_atom_with_leader_fee(local_committee_info, &mut tx_rec)?,
))),
// Leader thinks local nodes are ready to accept an ABORT
TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(tx_rec.get_current_transaction_atom()))),
// Leader thinks that all foreign ACCEPT pledges have been received and, we are ready to accept the result
// (COMMIT/ABORT)
TransactionPoolStage::LocalAccepted => {
Expand Down Expand Up @@ -641,11 +641,11 @@ where TConsensusSpec: ConsensusSpec
"🏠️ Transaction {} is local only, proposing LocalOnly",
tx_rec.transaction_id(),
);
let involved = NonZeroU64::new(1).expect("1 > 0");
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
let atom = tx_rec.get_current_transaction_atom();
if atom.decision.is_commit() {

if tx_rec.current_decision().is_commit() {
let involved = NonZeroU64::new(1).expect("1 > 0");
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
let diff = execution.result().finalize.result.accept().ok_or_else(|| {
HotStuffError::InvariantError(format!(
"prepare_transaction: Transaction {} has COMMIT decision but execution failed when \
Expand All @@ -668,6 +668,7 @@ where TConsensusSpec: ConsensusSpec

executed_transactions.insert(*tx_rec.transaction_id(), execution);

let atom = tx_rec.get_current_transaction_atom();
Command::LocalOnly(atom)
},
PreparedTransaction::LocalOnly(LocalPreparedTransaction::EarlyAbort { transaction, .. }) => {
Expand Down Expand Up @@ -743,7 +744,7 @@ where TConsensusSpec: ConsensusSpec
) -> Result<Option<Command>, HotStuffError> {
// Only set to abort if either the local or one or more foreign shards decided to ABORT
if tx_rec.current_decision().is_abort() {
return Ok(Some(Command::SomePrepare(tx_rec.get_local_transaction_atom())));
return Ok(Some(Command::SomePrepare(tx_rec.get_current_transaction_atom())));
}

let mut execution =
Expand Down Expand Up @@ -771,27 +772,15 @@ where TConsensusSpec: ConsensusSpec
tx_rec.update_from_execution(&execution);

executed_transactions.insert(*tx_rec.transaction_id(), execution);
return Ok(Some(Command::AllPrepare(
self.get_current_transaction_atom(local_committee_info, tx_rec)?,
)));
return Ok(Some(Command::AllPrepare(tx_rec.get_current_transaction_atom())));
}

tx_rec.update_from_execution(&execution);

let atom = self.get_current_transaction_atom(local_committee_info, tx_rec)?;
executed_transactions.insert(*tx_rec.transaction_id(), execution);
// If we locally decided to ABORT, we are still saying that we think all prepared. When we enter the acceptance
// phase, we will propose SomeAccept for this case.
Ok(Some(Command::AllPrepare(atom)))
}

fn local_accept_transaction(
&self,
local_committee_info: &CommitteeInfo,
tx_rec: &mut TransactionPoolRecord,
) -> Result<Option<Command>, HotStuffError> {
let atom = self.get_current_transaction_atom(local_committee_info, tx_rec)?;
Ok(Some(Command::LocalAccept(atom)))
Ok(Some(Command::AllPrepare(tx_rec.get_current_transaction_atom())))
}

fn accept_transaction(
Expand Down Expand Up @@ -825,24 +814,27 @@ where TConsensusSpec: ConsensusSpec
*tx_rec.transaction_id(),
&filter_diff_for_committee(local_committee_info, diff),
)?;
Ok(Some(Command::AllAccept(tx_rec.get_current_transaction_atom())))
let atom = self.get_transaction_atom_with_leader_fee(local_committee_info, tx_rec)?;
Ok(Some(Command::AllAccept(atom)))
}

fn get_current_transaction_atom(
fn get_transaction_atom_with_leader_fee(
&self,
local_committee_info: &CommitteeInfo,
tx_rec: &mut TransactionPoolRecord,
) -> Result<TransactionAtom, HotStuffError> {
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"PROPOSE: Transaction {} involves zero shard groups",
tx_rec.transaction_id(),
))
})?;
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
if tx_rec.current_decision().is_commit() {
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"PROPOSE: Transaction {} involves zero shard groups",
tx_rec.transaction_id(),
))
})?;
let leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
tx_rec.set_leader_fee(leader_fee);
}
let atom = tx_rec.get_current_transaction_atom();
Ok(atom)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,14 @@ where TConsensusSpec: ConsensusSpec
}
},
Command::LocalAccept(atom) => {
if !self.evaluate_local_accept_command(tx, block, &locked_block, atom, proposed_block_change_set)? {
if !self.evaluate_local_accept_command(
tx,
block,
&locked_block,
atom,
local_committee_info,
proposed_block_change_set,
)? {
proposed_block_change_set.no_vote();
return Ok(());
}
Expand All @@ -374,6 +381,7 @@ where TConsensusSpec: ConsensusSpec
local_committee_info,
&mut substate_store,
proposed_block_change_set,
&mut total_leader_fee,
)? {
proposed_block_change_set.no_vote();
return Ok(());
Expand Down Expand Up @@ -919,10 +927,12 @@ where TConsensusSpec: ConsensusSpec
err
);

tx_rec.set_local_decision(Decision::Abort);
execution.set_abort_reason(RejectReason::FailedToLockOutputs(err.to_string()));

tx_rec.set_local_decision(Decision::Abort);
tx_rec.set_transaction_fee(0);
tx_rec.set_next_stage(TransactionPoolStage::AllPrepared, true)?;

proposed_block_change_set
.set_next_transaction_update(tx_rec)?
.add_transaction_execution(execution)?;
Expand Down Expand Up @@ -1062,6 +1072,7 @@ where TConsensusSpec: ConsensusSpec
block: &Block,
locked_block: &LockedBlock,
atom: &TransactionAtom,
local_committee_info: &CommitteeInfo,
proposed_block_change_set: &mut ProposedBlockChangeSet,
) -> Result<bool, HotStuffError> {
let Some(mut tx_rec) =
Expand Down Expand Up @@ -1115,6 +1126,39 @@ where TConsensusSpec: ConsensusSpec
return Ok(false);
}

if atom.decision.is_commit() {
let Some(ref leader_fee) = atom.leader_fee else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} not set for AllAccept command in block {}",
atom.id,
block,
);
return Ok(false);
};

// Check the leader fee in the local accept phase. The fee only applied (is added to the block fee) for
// AllAccept
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let involved = NonZeroU64::new(num_involved_shard_groups as u64)
.ok_or_else(|| HotStuffError::InvariantError("Number of involved shard groups is 0".to_string()))?;
let calculated_leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
if calculated_leader_fee != *leader_fee {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: LocalAccept leader fee disagreement for block {}. Leader proposed {}, we calculated {}",
block,
atom.leader_fee.as_ref().expect("None already checked"),
calculated_leader_fee
);

return Ok(false);
}

tx_rec.set_leader_fee(calculated_leader_fee);
}

tx_rec.set_next_stage(
TransactionPoolStage::LocalAccepted,
tx_rec.evidence().all_addresses_justified(),
Expand All @@ -1133,11 +1177,12 @@ where TConsensusSpec: ConsensusSpec
local_committee_info: &CommitteeInfo,
substate_store: &mut PendingSubstateStore<TConsensusSpec::StateStore>,
proposed_block_change_set: &mut ProposedBlockChangeSet,
total_leader_fee: &mut u64,
) -> Result<bool, HotStuffError> {
if atom.decision.is_abort() {
warn!(
target: LOG_TARGET,
"❌ AllAccept command received for block {} but requires that the transaction is COMMIT",
"❌ NO VOTE: AllAccept command received for block {} but requires that the transaction is COMMIT",
block.id(),
);
return Ok(false);
Expand All @@ -1148,7 +1193,7 @@ where TConsensusSpec: ConsensusSpec
else {
warn!(
target: LOG_TARGET,
"⚠️ Local proposal received ({}) for transaction {} which is not in the pool. This is likely a previous transaction that has been re-proposed. Not voting on block.",
"⚠️ NO VOTE: Local proposal received ({}) for transaction {} which is not in the pool. This is likely a previous transaction that has been re-proposed. Not voting on block.",
block,
atom.id(),
);
Expand All @@ -1158,7 +1203,7 @@ where TConsensusSpec: ConsensusSpec
if !tx_rec.current_stage().is_local_accepted() {
warn!(
target: LOG_TARGET,
"{} ❌ AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}",
"{} ❌ NO VOTE: AllAccept Stage disagreement in block {} for transaction {}. Leader proposed AllAccept, but local stage is {}",
self.local_validator_pk,
block,
tx_rec.transaction_id(),
Expand All @@ -1170,7 +1215,7 @@ where TConsensusSpec: ConsensusSpec
if tx_rec.current_decision().is_abort() {
warn!(
target: LOG_TARGET,
"❌ AllAccept decision disagreement for transaction {} in block {}. Leader proposed COMMIT, we decided ABORT",
"❌ NO VOTE: AllAccept decision disagreement for transaction {} in block {}. Leader proposed COMMIT, we decided ABORT",
tx_rec.transaction_id(),
block,
);
Expand All @@ -1180,7 +1225,7 @@ where TConsensusSpec: ConsensusSpec
if tx_rec.transaction_fee() != atom.transaction_fee {
warn!(
target: LOG_TARGET,
"❌ AllAccept transaction fee disagreement tx {} in block {}. Leader proposed {}, we calculated {}",
"❌ NO VOTE: AllAccept transaction fee disagreement tx {} in block {}. Leader proposed {}, we calculated {}",
tx_rec.transaction_id(),
block,
atom.transaction_fee,
Expand All @@ -1189,6 +1234,36 @@ where TConsensusSpec: ConsensusSpec
return Ok(false);
}

let Some(ref leader_fee) = atom.leader_fee else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} not set for AllAccept command in block {}",
atom.id,
block,
);
return Ok(false);
};

let local_leader_fee = tx_rec.leader_fee().ok_or_else(|| {
HotStuffError::InvariantError(format!(
"evaluate_all_accept_command: Transaction {} has COMMIT decision and is at LocalAccepted stage but \
leader fee is missing",
tx_rec.transaction_id()
))
})?;

if local_leader_fee != leader_fee {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee disagreement for tx {} in block {}. Leader proposed {}, we calculated {}",
atom.id,
block,
leader_fee,
local_leader_fee
);
return Ok(false);
}

let execution = BlockTransactionExecution::get_pending_for_block(tx, tx_rec.transaction_id(), block.parent())
.optional()?
.ok_or_else(|| {
Expand All @@ -1205,6 +1280,8 @@ where TConsensusSpec: ConsensusSpec
))
})?;

*total_leader_fee += leader_fee.fee();

substate_store.put_diff(
*tx_rec.transaction_id(),
&filter_diff_for_committee(local_committee_info, diff),
Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,6 @@ async fn foreign_shard_group_decides_to_abort() {
async fn multishard_local_inputs_foreign_outputs() {
setup_logger();
let mut test = Test::builder()
// Test can take 11s, this could cut it a little fine - may indicate that we need to optimise
.with_test_timeout(Duration::from_secs(60))
.add_committee(0, vec!["1", "2"])
.add_committee(1, vec!["3", "4"])
.start()
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/engine_types/src/substate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ impl SubstateId {
pub fn is_transaction_receipt(&self) -> bool {
matches!(self, Self::TransactionReceipt(_))
}

pub fn is_read_only(&self) -> bool {
matches!(self, Self::TransactionReceipt(_) | Self::Resource(_))
}
}

impl From<ComponentAddress> for SubstateId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,20 @@ create unique index transaction_executions_uniq_block_id_transaction_id on trans

create table transaction_pool
(
id integer not null primary key AUTOINCREMENT,
transaction_id text not null,
original_decision text not null,
local_decision text null,
remote_decision text null,
evidence text null,
transaction_fee bigint null,
leader_fee bigint null,
global_exhaust_burn bigint null,
stage text not null,
pending_stage text null,
is_ready boolean not null,
confirm_stage text null,
updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
id integer not null primary key AUTOINCREMENT,
transaction_id text not null,
original_decision text not null,
local_decision text null,
remote_decision text null,
evidence text null,
transaction_fee bigint not null DEFAULT 0,
leader_fee text null,
stage text not null,
pending_stage text null,
is_ready boolean not null,
confirm_stage text null,
updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (transaction_id) REFERENCES transactions (transaction_id)
);
create unique index transaction_pool_uniq_idx_transaction_id on transaction_pool (transaction_id);
Expand All @@ -330,6 +329,8 @@ create table transaction_pool_state_updates
evidence text not null,
is_ready boolean not null,
local_decision text not null,
transaction_fee bigint not null,
leader_fee text null,
remote_decision text null,
is_applied boolean not null DEFAULT '0',
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -521,7 +522,6 @@ BEGIN
new_evidence,
transaction_fee,
leader_fee,
global_exhaust_burn,
stage,
new_stage,
pending_stage,
Expand All @@ -539,9 +539,8 @@ BEGIN
OLD.remote_decision,
OLD.evidence,
NEW.evidence,
OLD.transaction_fee,
OLD.leader_fee,
OLD.global_exhaust_burn,
NEW.transaction_fee,
NEW.leader_fee,
OLD.stage,
NEW.stage,
OLD.pending_stage,
Expand Down
2 changes: 2 additions & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState
evidence,
is_ready,
local_decision,
transaction_fee,
leader_fee,
remote_decision,
is_applied,
created_at
Expand Down
Loading
Loading