Skip to content

Commit

Permalink
feat: foreign proposal command (#792)
Browse files Browse the repository at this point in the history
Description
---
It's build on top of #757 
In current state everyone propose the foreign proposal command all the
time.
If I build on top of block where the foreign proposal is, then I can not
propose it and mark it as proposed.
But everyone else needs to check if there is a block on top of the block
where the command is to mark it as well. But the command is not in the
locked block. So it can be rollbacked, right?
For the transactions we have something similar where we keep track of
the changes until they are locked, in the
`transaction_pool_state_updates` table.

Motivation and Context
---

How Has This Been Tested?
---

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
Cifko authored Nov 29, 2023
1 parent f0dc999 commit 186b20d
Show file tree
Hide file tree
Showing 17 changed files with 720 additions and 291 deletions.
3 changes: 2 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
let all_qcs = child
.commands()
.iter()
.flat_map(|cmd| cmd.evidence().qc_ids_iter())
.filter_map(|cmd| cmd.transaction())
.flat_map(|transaction| transaction.evidence.qc_ids_iter())
.collect::<HashSet<_>>();
let certificates = QuorumCertificate::get_all(tx, all_qcs)?;
let updates = child.get_substate_updates(tx)?;
Expand Down
32 changes: 24 additions & 8 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use tari_dan_storage::{
consensus_models::{
Block,
Command,
ForeignProposal,
ForeignSendCounters,
HighQc,
LastProposed,
LeafBlock,
LockedBlock,
QuorumCertificate,
TransactionPool,
TransactionPoolStage,
Expand Down Expand Up @@ -197,17 +199,31 @@ where TConsensusSpec: ConsensusSpec
};

let mut total_leader_fee = 0;
let commands = batch
let locked_block = LockedBlock::get(tx)?;
let pending_proposals = ForeignProposal::get_all_pending(tx, locked_block.block_id(), parent_block.block_id())?;
let commands = ForeignProposal::get_all_new(tx)?
.into_iter()
.map(|t| match t.current_stage() {
.filter_map(|foreign_proposal| {
if pending_proposals.iter().any(|pending_proposal| {
pending_proposal.bucket == foreign_proposal.bucket &&
pending_proposal.block_id == foreign_proposal.block_id
}) {
None
} else {
Some(Ok(Command::ForeignProposal(
foreign_proposal.set_mined_at(parent_block.height().saturating_add(NodeHeight(1))),
)))
}
})
.chain(batch.into_iter().map(|t| match t.current_stage() {
// If the transaction is New, propose to Prepare it
TransactionPoolStage::New => Ok(Command::Prepare(t.get_local_transaction_atom())),
// The transaction is Prepared, this stage is only _ready_ once we know that all local nodes
// accepted Prepared so we propose LocalPrepared
TransactionPoolStage::Prepared => Ok(Command::LocalPrepared(t.get_local_transaction_atom())),
// The transaction is LocalPrepared, meaning that we know that all foreign and local nodes have
// prepared. We can now propose to Accept it. We also propose the decision change which everyone should
// agree with if they received the same foreign LocalPrepare.
// prepared. We can now propose to Accept it. We also propose the decision change which everyone
// should agree with if they received the same foreign LocalPrepare.
TransactionPoolStage::LocalPrepared => {
let involved = local_committee_shard.count_distinct_buckets(t.transaction().evidence.shards_iter());
let involved = NonZeroU64::new(involved as u64).ok_or_else(|| {
Expand All @@ -220,16 +236,16 @@ where TConsensusSpec: ConsensusSpec
total_leader_fee += leader_fee;
Ok(Command::Accept(t.get_final_transaction_atom(leader_fee)))
},
// Not reachable as there is nothing to propose for these stages. To confirm that all local nodes agreed
// with the Accept, more (possibly empty) blocks with QCs will be proposed and accepted,
// otherwise the Accept block will not be committed.
// Not reachable as there is nothing to propose for these stages. To confirm that all local nodes
// agreed with the Accept, more (possibly empty) blocks with QCs will be
// proposed and accepted, otherwise the Accept block will not be committed.
TransactionPoolStage::AllPrepared | TransactionPoolStage::SomePrepared => {
unreachable!(
"It is invalid for TransactionPoolStage::{} to be ready to propose",
t.current_stage()
)
},
})
}))
.collect::<Result<BTreeSet<_>, HotStuffError>>()?;

debug!(
Expand Down
530 changes: 275 additions & 255 deletions dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ use std::ops::DerefMut;
use log::*;
use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, shard_bucket::ShardBucket, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, ForeignReceiveCounters, LeafBlock, TransactionPool, TransactionPoolStage},
consensus_models::{
Block,
ForeignProposal,
ForeignReceiveCounters,
LeafBlock,
TransactionPool,
TransactionPoolStage,
},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
Expand Down Expand Up @@ -72,6 +79,7 @@ where TConsensusSpec: ConsensusSpec
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
ForeignProposal::new(committee_shard.bucket(), *block.id()).upsert(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;

Expand Down
31 changes: 17 additions & 14 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tari_dan_common_types::{
NodeHeight,
};
use tari_dan_storage::{
consensus_models::{Block, ForeignSendCounters, HighQc, TransactionPool, ValidBlock},
consensus_models::{Block, BlockId, ForeignSendCounters, HighQc, TransactionPool, ValidBlock},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
Expand Down Expand Up @@ -157,8 +157,9 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
num_committees: u32,
local_bucket: ShardBucket,
block: &Block<TConsensusSpec::Addr>,
justify_block: &BlockId,
) -> Result<bool, HotStuffError> {
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), block.parent())?;
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), justify_block)?;
let non_local_buckets = proposer::get_non_local_buckets(tx.deref_mut(), block, num_committees, local_bucket)?;
let mut foreign_indexes = HashMap::new();
for non_local_bucket in non_local_buckets {
Expand Down Expand Up @@ -226,6 +227,20 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {

let justify_block_height = justify_block.height();

if !self.check_foreign_indexes(
tx,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
&candidate_block,
justify_block.id(),
)? {
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}

if justify_block.id() != candidate_block.parent() {
let mut dummy_blocks =
Vec::with_capacity((candidate_block.height().as_u64() - justify_block_height.as_u64() - 1) as usize);
Expand Down Expand Up @@ -276,18 +291,6 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
}
.into());
}
if !self.check_foreign_indexes(
tx,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
&candidate_block,
)? {
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}

Ok(ValidBlock::new(candidate_block))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ CREATE TABLE missing_transactions
FOREIGN KEY (block_id) REFERENCES parked_blocks (block_id)
);

CREATE TABLE foreign_proposals
(
id integer not NULL primary key AUTOINCREMENT,
bucket bigint not NULL,
block_id text not NULL,
state text not NULL,
mined_at bigint NULL,
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (bucket, block_id)
);

CREATE TABLE foreign_send_counters
(
id integer not NULL primary key AUTOINCREMENT,
Expand Down
62 changes: 62 additions & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use tari_dan_storage::{
consensus_models::{
Block,
BlockId,
Command,
ForeignProposal,
ForeignReceiveCounters,
ForeignSendCounters,
HighQc,
Expand Down Expand Up @@ -372,6 +374,66 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
high_qc.try_into()
}

fn foreign_proposal_exists(&mut self, foreign_proposal: &ForeignProposal) -> Result<bool, StorageError> {
use crate::schema::foreign_proposals;

let foreign_proposals = foreign_proposals::table
.filter(foreign_proposals::bucket.eq(foreign_proposal.bucket.as_u32() as i32))
.filter(foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)))
.count()
.limit(1)
.get_result::<i64>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_exists",
source: e,
})?;

Ok(foreign_proposals > 0)
}

fn foreign_proposal_get_all_new(&mut self) -> Result<Vec<ForeignProposal>, StorageError> {
use crate::schema::foreign_proposals;

let foreign_proposals = foreign_proposals::table
.filter(foreign_proposals::state.eq("New"))
.load::<sql_models::ForeignProposal>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_get_all",
source: e,
})?;

foreign_proposals.into_iter().map(|p| p.try_into()).collect()
}

fn foreign_proposal_get_all_pending(
&mut self,
from_block_id: &BlockId,
to_block_id: &BlockId,
) -> Result<Vec<ForeignProposal>, StorageError> {
use crate::schema::blocks;

let blocks = self.get_block_ids_that_change_state_between(from_block_id, to_block_id)?;

let all_commands: Vec<String> = blocks::table
.select(blocks::commands)
.filter(blocks::command_count.gt(0)) // if there is no command, then there is definitely no foreign proposal command
.filter(blocks::block_id.eq_any(blocks))
.load::<String>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_get_all",
source: e,
})?;
let all_commands = all_commands
.into_iter()
.map(|commands| deserialize_json(commands.as_str()))
.collect::<Result<Vec<Vec<Command>>, _>>()?;
let all_commands = all_commands.into_iter().flatten().collect::<Vec<_>>();
Ok(all_commands
.into_iter()
.filter_map(|command| command.foreign_proposal().cloned())
.collect::<Vec<ForeignProposal>>())
}

fn foreign_send_counters_get(&mut self, block_id: &BlockId) -> Result<ForeignSendCounters, StorageError> {
use crate::schema::foreign_send_counters;

Expand Down
11 changes: 11 additions & 0 deletions dan_layer/state_store_sqlite/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ diesel::table! {
}
}

diesel::table! {
foreign_proposals (id) {
id -> Integer,
bucket -> Integer,
block_id -> Text,
state -> Text,
mined_at -> Nullable<BigInt>,
created_at -> Timestamp,
}
}

diesel::table! {
foreign_receive_counters (id) {
id -> Integer,
Expand Down
27 changes: 25 additions & 2 deletions dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use diesel::Queryable;
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight};
use tari_dan_common_types::{shard_bucket::ShardBucket, Epoch, NodeAddressable, NodeHeight};
use tari_dan_storage::{
consensus_models::{self, QuorumDecision},
StorageError,
Expand All @@ -11,7 +11,7 @@ use time::PrimitiveDateTime;

use crate::{
error::SqliteStorageError,
serialization::{deserialize_hex_try_from, deserialize_json},
serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string},
};

#[derive(Debug, Clone, Queryable)]
Expand All @@ -35,6 +35,29 @@ impl TryFrom<HighQc> for consensus_models::HighQc {
}
}

#[derive(Debug, Clone, Queryable)]
pub struct ForeignProposal {
pub id: i32,
pub bucket: i32,
pub block_id: String,
pub state: String,
pub mined_at: Option<i64>,
pub created_at: PrimitiveDateTime,
}

impl TryFrom<ForeignProposal> for consensus_models::ForeignProposal {
type Error = StorageError;

fn try_from(value: ForeignProposal) -> Result<Self, Self::Error> {
Ok(Self {
bucket: ShardBucket::from(value.bucket as u32),
block_id: deserialize_hex_try_from(&value.block_id)?,
state: parse_from_string(&value.state)?,
mined_at: value.mined_at.map(|mined_at| NodeHeight(mined_at as u64)),
})
}
}

#[derive(Debug, Clone, Queryable)]
pub struct ForeignSendCounters {
pub id: i32,
Expand Down
39 changes: 39 additions & 0 deletions dan_layer/state_store_sqlite/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tari_dan_storage::{
BlockId,
Decision,
Evidence,
ForeignProposal,
ForeignReceiveCounters,
ForeignSendCounters,
HighQc,
Expand Down Expand Up @@ -567,6 +568,44 @@ impl<TAddr: NodeAddressable> StateStoreWriteTransaction for SqliteStateStoreWrit
Ok(())
}

fn foreign_proposal_upsert(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError> {
use crate::schema::foreign_proposals;

let values = (
foreign_proposals::bucket.eq(foreign_proposal.bucket.as_u32() as i32),
foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)),
foreign_proposals::state.eq(foreign_proposal.state.to_string()),
foreign_proposals::mined_at.eq(foreign_proposal.mined_at.map(|h| h.as_u64() as i64)),
);

diesel::insert_into(foreign_proposals::table)
.values(&values)
.on_conflict((foreign_proposals::bucket, foreign_proposals::block_id))
.do_update()
.set(values.clone())
.execute(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_set",
source: e,
})?;
Ok(())
}

fn foreign_proposal_delete(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError> {
use crate::schema::foreign_proposals;

diesel::delete(foreign_proposals::table)
.filter(foreign_proposals::bucket.eq(foreign_proposal.bucket.as_u32() as i32))
.filter(foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)))
.execute(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_delete",
source: e,
})?;

Ok(())
}

fn foreign_send_counters_set(
&mut self,
foreign_send_counter: &ForeignSendCounters,
Expand Down
Loading

0 comments on commit 186b20d

Please sign in to comment.