Skip to content

Commit

Permalink
feat: request missing foreign proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 23, 2023
1 parent 7434f4b commit 09a1129
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 10 deletions.
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,6 @@ pub enum ProposalValidationError {
},
#[error("Proposed block {block_id} {height} already has been processed")]
BlockAlreadyProcessed { block_id: BlockId, height: NodeHeight },
#[error("Internal channel send error when {context}")]
InternalChannelClosed { context: &'static str },
}
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod on_ready_to_vote_on_local_block;
mod on_receive_foreign_proposal;
mod on_receive_local_proposal;
mod on_receive_new_view;
mod on_receive_request_missing_foreign_blocks;
mod on_receive_request_missing_transactions;
mod on_receive_requested_transactions;
mod on_receive_vote;
Expand Down
29 changes: 24 additions & 5 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use tari_dan_storage::{
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use crate::{
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError},
messages::ProposalMessage,
messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage},
traits::ConsensusSpec,
};

Expand All @@ -24,6 +25,7 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -35,13 +37,15 @@ where TConsensusSpec: ConsensusSpec
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
tx_leader,
}
}

Expand All @@ -67,7 +71,8 @@ where TConsensusSpec: ConsensusSpec
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())
.await?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
Expand Down Expand Up @@ -145,7 +150,7 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

fn validate_proposed_block(
async fn validate_proposed_block(
&self,
from: &TConsensusSpec::Addr,
candidate_block: &Block<TConsensusSpec::Addr>,
Expand All @@ -164,8 +169,22 @@ where TConsensusSpec: ConsensusSpec
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was {incoming_index}", expected_index = current_index + 1);
if current_index < incoming_index {
self.tx_leader
.send((
from.clone(),
HotstuffMessage::RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage {
epoch: candidate_block.epoch(),
from: incoming_index + 1,
to: current_index,
}),
))
.await
.map_err(|_| ProposalValidationError::InternalChannelClosed {
context: "tx_leader in OnNextSyncViewHandler::send_to_leader",
})?;
}
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_dan_storage::{StateStore, StateStoreReadTransaction};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use crate::{
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage},
traits::ConsensusSpec,
};

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_receive_request_missing_transactions";

pub struct OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec: ConsensusSpec> {
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
}

impl<TConsensusSpec> OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec>
where TConsensusSpec: ConsensusSpec
{
pub fn new(
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
) -> Self {
Self {
store,
epoch_manager,
tx_request_missing_foreign_blocks,
}
}

pub async fn handle(
&mut self,
from: TConsensusSpec::Addr,
msg: RequestMissingForeignBlocksMessage,
) -> Result<(), HotStuffError> {
debug!(target: LOG_TARGET, "{} is requesting {}..{} missing blocks from epoch {}", from, msg.from, msg.to, msg.epoch);
let foreign_shard = self
.epoch_manager
.get_committee_shard_by_validator_address(msg.epoch, &from)
.await?;
let missing_blocks = self
.store
.with_read_tx(|tx| tx.blocks_get_foreign_ids(foreign_shard.bucket(), msg.from, msg.to))?;
for block in missing_blocks {
// We send the proposal back to the requester via hotstuff, so they follow the normal path including
// validation.
self.tx_request_missing_foreign_blocks
.send((
from.clone(),
HotstuffMessage::ForeignProposal(ProposalMessage { block: block.clone() }),
))
.await
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_leader in OnReceiveRequestMissingForeignBlocksHandler::handle",
})?;
}
Ok(())
}
}
17 changes: 16 additions & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ use tari_shutdown::ShutdownSignal;
use tari_transaction::{Transaction, TransactionId};
use tokio::sync::{broadcast, mpsc};

use super::{on_receive_requested_transactions::OnReceiveRequestedTransactions, proposer::Proposer};
use super::{
on_receive_request_missing_foreign_blocks::OnReceiveRequestMissingForeignBlocksHandler,
on_receive_requested_transactions::OnReceiveRequestedTransactions,
proposer::Proposer,
};
use crate::{
hotstuff::{
common::CommitteeAndMessage,
Expand Down Expand Up @@ -56,6 +60,7 @@ pub struct HotstuffWorker<TConsensusSpec: ConsensusSpec> {
on_receive_foreign_proposal: OnReceiveForeignProposalHandler<TConsensusSpec>,
on_receive_vote: OnReceiveVoteHandler<TConsensusSpec>,
on_receive_new_view: OnReceiveNewViewHandler<TConsensusSpec>,
on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec>,
on_receive_request_missing_txs: OnReceiveRequestMissingTransactions<TConsensusSpec>,
on_receive_requested_txs: OnReceiveRequestedTransactions<TConsensusSpec>,
on_propose: OnPropose<TConsensusSpec>,
Expand Down Expand Up @@ -139,6 +144,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
transaction_pool.clone(),
pacemaker.clone_handle(),
foreign_receive_counter,
tx_leader.clone(),
),
on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()),
on_receive_new_view: OnReceiveNewViewHandler::new(
Expand All @@ -148,6 +154,11 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
pacemaker.clone_handle(),
vote_receiver,
),
on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler::new(
state_store.clone(),
epoch_manager.clone(),
tx_leader.clone(),
),
on_receive_request_missing_txs: OnReceiveRequestMissingTransactions::new(
state_store.clone(),
tx_leader.clone(),
Expand Down Expand Up @@ -474,6 +485,10 @@ where TConsensusSpec: ConsensusSpec
);
Ok(())
},
HotstuffMessage::RequestMissingForeignBlocks(msg) => log_err(
"on_receive_request_missing_foreign_blocks",
self.on_receive_request_missing_foreign_blocks.handle(from, msg).await,
),
}
}

Expand Down
14 changes: 13 additions & 1 deletion dan_layer/consensus/src/messages/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ use std::fmt::Display;
use serde::Serialize;
use tari_dan_common_types::Epoch;

use super::{NewViewMessage, ProposalMessage, RequestedTransactionMessage, VoteMessage};
use super::{
NewViewMessage,
ProposalMessage,
RequestMissingForeignBlocksMessage,
RequestedTransactionMessage,
VoteMessage,
};
use crate::messages::{RequestMissingTransactionsMessage, SyncRequestMessage, SyncResponseMessage};

// Serialize is implemented for the message logger
Expand All @@ -20,6 +26,7 @@ pub enum HotstuffMessage<TAddr> {
RequestedTransaction(RequestedTransactionMessage),
SyncRequest(SyncRequestMessage),
SyncResponse(SyncResponseMessage<TAddr>),
RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage),
}

impl<TAddr> HotstuffMessage<TAddr> {
Expand All @@ -33,6 +40,7 @@ impl<TAddr> HotstuffMessage<TAddr> {
HotstuffMessage::RequestedTransaction(_) => "RequestedTransaction",
HotstuffMessage::SyncRequest(_) => "SyncRequest",
HotstuffMessage::SyncResponse(_) => "SyncResponse",
HotstuffMessage::RequestMissingForeignBlocks(_) => "RequestMissingForeignBlocks",
}
}

Expand All @@ -46,6 +54,7 @@ impl<TAddr> HotstuffMessage<TAddr> {
Self::RequestedTransaction(msg) => msg.epoch,
Self::SyncRequest(msg) => msg.epoch,
Self::SyncResponse(msg) => msg.epoch,
Self::RequestMissingForeignBlocks(msg) => msg.epoch,
}
}

Expand All @@ -70,6 +79,9 @@ impl<TAddr> Display for HotstuffMessage<TAddr> {
HotstuffMessage::RequestedTransaction(msg) => write!(f, "RequestedTransaction({})", msg.transactions.len()),
HotstuffMessage::SyncRequest(msg) => write!(f, "SyncRequest({})", msg.high_qc),
HotstuffMessage::SyncResponse(msg) => write!(f, "SyncResponse({} block(s))", msg.blocks.len()),
HotstuffMessage::RequestMissingForeignBlocks(msg) => {
write!(f, "RequestMissingForeignBlocks({}..{})", msg.from, msg.to)
},
}
}
}
3 changes: 3 additions & 0 deletions dan_layer/consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub use proposal::*;
mod vote;
pub use vote::*;

mod request_missing_foreign_blocks;
pub use request_missing_foreign_blocks::*;

mod request_missing_transaction;
pub use request_missing_transaction::*;

Expand Down
12 changes: 12 additions & 0 deletions dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use serde::Serialize;
use tari_dan_common_types::Epoch;

#[derive(Debug, Clone, Serialize)]
pub struct RequestMissingForeignBlocksMessage {
pub epoch: Epoch,
pub from: u64,
pub to: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ CREATE TABLE foreign_receive_counters
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE blocks_foreign_id_mapping
(
id integer not NULL primary key AUTOINCREMENT,
foreign_bucket integer not NULL,
foreign_index integer not NULL,
block_id text not NULL,
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (block_id) REFERENCES blocks (block_id)
);

-- Debug Triggers
CREATE TABLE transaction_pool_history
(
Expand Down
40 changes: 39 additions & 1 deletion dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use diesel::{
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_common_types::types::FixedHash;
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_common_types::{shard_bucket::ShardBucket, Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_storage::{
consensus_models::{
Block,
Expand Down Expand Up @@ -518,6 +518,44 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
block.try_convert(qc)
}

fn blocks_get_foreign_ids(
&mut self,
bucket: ShardBucket,
from: u64,
to: u64,
) -> Result<Vec<Block<TAddr>>, StorageError> {
use crate::schema::{blocks, blocks_foreign_id_mapping, quorum_certificates};
// TODO: how slow is this? is it worth splitting into 2 queries?
let results = blocks::table
.left_join(blocks_foreign_id_mapping::table.on(blocks::block_id.eq(blocks_foreign_id_mapping::block_id)))
.left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id)))
.filter(blocks_foreign_id_mapping::foreign_bucket.eq(i64::from(bucket.as_u32())))
.filter(blocks_foreign_id_mapping::foreign_index.ge(from as i64))
.filter(blocks_foreign_id_mapping::foreign_index.le(to as i64))
.select((blocks::all_columns, quorum_certificates::all_columns.nullable()))
.order_by(blocks_foreign_id_mapping::foreign_index.asc())
.get_results::<(sql_models::Block, Option<sql_models::QuorumCertificate>)>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "blocks_all_after_height",
source: e,
})?;

results
.into_iter()
.map(|(block, qc)| {
let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency {
operation: "blocks_get_foreign_ids",
details: format!(
"block {} references non-existent quorum certificate {}",
block.block_id, block.qc_id
),
})?;

block.try_convert(qc)
})
.collect()
}

fn blocks_get_tip(&mut self) -> Result<Block<TAddr>, StorageError> {
use crate::schema::{blocks, quorum_certificates};

Expand Down
11 changes: 11 additions & 0 deletions dan_layer/state_store_sqlite/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ diesel::table! {
}
}

diesel::table! {
blocks_foreign_id_mapping (id) {
id -> Integer,
foreign_bucket -> BigInt,
foreign_index -> BigInt,
block_id -> Text,
created_at -> Timestamp,
}
}

diesel::table! {
last_executed (id) {
id -> Integer,
Expand Down Expand Up @@ -267,6 +277,7 @@ diesel::table! {

diesel::allow_tables_to_appear_in_same_query!(
blocks,
blocks_foreign_id_mapping,
high_qcs,
last_executed,
last_proposed,
Expand Down
Loading

0 comments on commit 09a1129

Please sign in to comment.