From 281b26887043a8105cb649500a006129393d5649 Mon Sep 17 00:00:00 2001 From: Cifko Date: Fri, 27 Oct 2023 12:06:13 +0200 Subject: [PATCH] feat: add reliability counter --- .../tari_validator_node/src/bootstrap.rs | 4 +- .../tari_validator_node/src/consensus/mod.rs | 4 +- dan_layer/consensus/src/hotstuff/error.rs | 4 ++ .../consensus/src/hotstuff/on_propose.rs | 24 +++++++- .../hotstuff/on_receive_foreign_proposal.rs | 41 +++++++++++-- .../src/hotstuff/on_receive_local_proposal.rs | 58 ++++++++++++++++--- dan_layer/consensus/src/hotstuff/proposer.rs | 19 +++--- dan_layer/consensus/src/hotstuff/worker.rs | 4 +- .../src/support/validator/builder.rs | 3 +- .../up.sql | 17 ++++++ dan_layer/state_store_sqlite/src/reader.rs | 41 +++++++++++++ dan_layer/state_store_sqlite/src/schema.rs | 19 ++++++ .../src/sql_models/block.rs | 4 ++ .../src/sql_models/bookkeeping.rs | 35 +++++++++++ dan_layer/state_store_sqlite/src/writer.rs | 46 +++++++++++++++ dan_layer/state_store_sqlite/tests/tests.rs | 3 + .../storage/src/consensus_models/block.rs | 48 ++++++++++++++- .../foreign_receive_counters.rs | 47 +++++++++++++++ .../consensus_models/foreign_send_counters.rs | 50 ++++++++++++++++ dan_layer/storage/src/consensus_models/mod.rs | 4 ++ dan_layer/storage/src/state_store/mod.rs | 13 +++++ .../validator_node_rpc/proto/consensus.proto | 3 +- .../src/conversions/consensus.rs | 2 + 23 files changed, 459 insertions(+), 34 deletions(-) create mode 100644 dan_layer/storage/src/consensus_models/foreign_receive_counters.rs create mode 100644 dan_layer/storage/src/consensus_models/foreign_send_counters.rs diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 715c0e833..080e0900a 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -46,7 +46,7 @@ use tari_dan_app_utilities::{ use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId}; use tari_dan_engine::fees::FeeTable; use tari_dan_storage::{ - consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord}, + consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord}, global::GlobalDb, StateStore, StateStoreReadTransaction, @@ -206,6 +206,7 @@ pub async fn spawn_services( // Consensus let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10); + let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?; let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn( state_store.clone(), node_identity.clone(), @@ -214,6 +215,7 @@ pub async fn spawn_services( rx_consensus_message, outbound_messaging.clone(), validator_node_client_factory.clone(), + foreign_receive_counter, shutdown.clone(), ) .await; diff --git a/applications/tari_validator_node/src/consensus/mod.rs b/applications/tari_validator_node/src/consensus/mod.rs index a497f3090..2d9c8004c 100644 --- a/applications/tari_validator_node/src/consensus/mod.rs +++ b/applications/tari_validator_node/src/consensus/mod.rs @@ -12,7 +12,7 @@ use tari_consensus::{ }; use tari_dan_common_types::committee::Committee; use tari_dan_p2p::{Message, OutboundService}; -use tari_dan_storage::consensus_models::TransactionPool; +use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; @@ -50,6 +50,7 @@ pub async fn spawn( rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage)>, outbound_messaging: OutboundMessaging, client_factory: TariCommsValidatorNodeClientFactory, + foreign_receive_counter: ForeignReceiveCounters, shutdown_signal: ShutdownSignal, ) -> ( JoinHandle>, @@ -81,6 +82,7 @@ pub async fn spawn( tx_leader, tx_hotstuff_events.clone(), tx_mempool, + foreign_receive_counter, shutdown_signal.clone(), ); diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index f094e6724..972d3ca0c 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -92,6 +92,10 @@ pub enum ProposalValidationError { }, #[error("Node proposed by {proposed_by} with hash {hash} did not satisfy the safeNode predicate")] NotSafeBlock { proposed_by: String, hash: BlockId }, + #[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")] + MissingForeignCounters { proposed_by: String, hash: BlockId }, + #[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")] + InvalidForeignCounters { proposed_by: String, hash: BlockId }, #[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")] ProposingGenesisBlock { proposed_by: String, hash: BlockId }, #[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")] diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index e03e3435a..9ab9bea0d 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -1,7 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::BTreeSet, num::NonZeroU64}; +use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut}; use log::*; use tari_dan_common_types::{ @@ -14,6 +14,7 @@ use tari_dan_storage::{ consensus_models::{ Block, Command, + ForeignSendCounters, HighQc, LastProposed, LeafBlock, @@ -29,7 +30,7 @@ use tokio::sync::mpsc; use super::common::CommitteeAndMessage; use crate::{ - hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError}, + hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError, proposer}, messages::{HotstuffMessage, ProposalMessage}, traits::ConsensusSpec, }; @@ -109,7 +110,7 @@ where TConsensusSpec: ConsensusSpec let mut tx = self.store.create_write_tx()?; let high_qc = HighQc::get(&mut *tx)?; let high_qc = high_qc.get_quorum_certificate(&mut *tx)?; - + let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?; next_block = self.build_next_block( &mut tx, epoch, @@ -120,6 +121,7 @@ where TConsensusSpec: ConsensusSpec // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this // is a good idea. is_newview_propose, + &mut foreign_counters, )?; next_block.as_last_proposed().set(&mut tx)?; @@ -184,6 +186,7 @@ where TConsensusSpec: ConsensusSpec proposed_by: ::Addr, local_committee_shard: &CommitteeShard, empty_block: bool, + foreign_counters: &mut ForeignSendCounters, ) -> Result, HotStuffError> { // TODO: Configure const TARGET_BLOCK_SIZE: usize = 1000; @@ -235,6 +238,20 @@ where TConsensusSpec: ConsensusSpec commands.iter().map(|c| c.to_string()).collect::>().join(",") ); + let non_local_buckets = proposer::get_non_local_buckets_from_commands( + tx, + &commands, + local_committee_shard.num_committees(), + local_committee_shard.bucket(), + )?; + + // let mut foreign_indexes = HashMap::new(); + + let foreign_indexes = non_local_buckets + .iter() + .map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket))) + .collect(); + let next_block = Block::new( *parent_block.block_id(), high_qc, @@ -243,6 +260,7 @@ where TConsensusSpec: ConsensusSpec proposed_by, commands, total_leader_fee, + foreign_indexes, ); Ok(next_block) diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 0304dd9c7..9ee1098c1 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -3,9 +3,9 @@ use std::ops::DerefMut; use log::*; -use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, NodeHeight}; +use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, shard_bucket::ShardBucket, NodeHeight}; use tari_dan_storage::{ - consensus_models::{Block, LeafBlock, TransactionPool, TransactionPoolStage}, + consensus_models::{Block, ForeignReceiveCounters, LeafBlock, TransactionPool, TransactionPoolStage}, StateStore, }; use tari_epoch_manager::EpochManagerReader; @@ -23,6 +23,7 @@ pub struct OnReceiveForeignProposalHandler { epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, + foreign_receive_counter: ForeignReceiveCounters, } impl OnReceiveForeignProposalHandler @@ -33,17 +34,19 @@ where TConsensusSpec: ConsensusSpec epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, + foreign_receive_counter: ForeignReceiveCounters, ) -> Self { Self { store, epoch_manager, transaction_pool, pacemaker, + foreign_receive_counter, } } pub async fn handle( - &self, + &mut self, from: TConsensusSpec::Addr, message: ProposalMessage, ) -> Result<(), HotStuffError> { @@ -63,9 +66,14 @@ where TConsensusSpec: ConsensusSpec .epoch_manager .get_committee_shard(block.epoch(), vn.shard_key) .await?; - self.validate_proposed_block(&from, &block)?; - self.store - .with_write_tx(|tx| self.on_receive_foreign_block(tx, &block, &committee_shard))?; + let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; + self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?; + // Is this ok? Can foreign node send invalid block that should still increment the counter? + self.foreign_receive_counter.increment(&committee_shard.bucket()); + self.store.with_write_tx(|tx| { + self.foreign_receive_counter.save(tx)?; + self.on_receive_foreign_block(tx, &block, &committee_shard) + })?; // We could have ready transactions at this point, so if we're the leader for the next block we can propose self.pacemaker.beat(); @@ -141,7 +149,28 @@ where TConsensusSpec: ConsensusSpec &self, from: &TConsensusSpec::Addr, candidate_block: &Block, + foreign_bucket: ShardBucket, + local_bucket: ShardBucket, ) -> Result<(), ProposalValidationError> { + let incoming_index = match candidate_block.get_foreign_index(&local_bucket) { + Some(i) => *i, + None => { + debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}"); + return Err(ProposalValidationError::MissingForeignCounters { + proposed_by: from.to_string(), + hash: *candidate_block.id(), + }); + }, + }; + let current_index = self.foreign_receive_counter.get_index(&foreign_bucket); + if current_index + 1 != incoming_index { + debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was + {incoming_index}", expected_index = current_index + 1); + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: from.to_string(), + hash: *candidate_block.id(), + }); + } if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() { return Err(ProposalValidationError::ProposingGenesisBlock { proposed_by: from.to_string(), diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index bef36edd5..f7b95c360 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -5,16 +5,23 @@ // ----[foreign:LocalPrepared]--->(LocalPrepared, true) ----cmd:AllPrepare ---> (AllPrepared, true) ---cmd:Accept ---> // Complete +use std::{collections::HashMap, ops::DerefMut}; + use log::*; -use tari_dan_common_types::{committee::Committee, optional::Optional, NodeHeight}; +use tari_dan_common_types::{ + committee::{Committee, CommitteeShard}, + optional::Optional, + shard_bucket::ShardBucket, + NodeHeight, +}; use tari_dan_storage::{ - consensus_models::{Block, HighQc, TransactionPool, ValidBlock}, + consensus_models::{Block, ForeignSendCounters, HighQc, TransactionPool, ValidBlock}, StateStore, }; use tari_epoch_manager::EpochManagerReader; use tokio::sync::{broadcast, mpsc}; -use super::proposer::Proposer; +use super::proposer::{self, Proposer}; use crate::{ hotstuff::{ error::HotStuffError, @@ -128,9 +135,10 @@ impl OnReceiveProposalHandler { .epoch_manager .get_committee_by_validator_address(block.epoch(), block.proposed_by()) .await?; + let local_committee_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; // First save the block in one db transaction - self.store.with_read_tx(|tx| { - match self.validate_local_proposed_block(tx, block, &local_committee) { + self.store.with_write_tx(|tx| { + match self.validate_local_proposed_block(tx, block, &local_committee, &local_committee_shard) { Ok(validated) => Ok(Some(validated)), // Validation errors should not cause a FAILURE state transition Err(HotStuffError::ProposalValidationError(err)) => { @@ -143,15 +151,35 @@ impl OnReceiveProposalHandler { }) } + fn check_foreign_indexes( + &self, + tx: &mut ::WriteTransaction<'_>, + num_committees: u32, + local_bucket: ShardBucket, + block: &Block, + ) -> Result { + let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), block.parent())?; + let non_local_buckets = proposer::get_non_local_buckets(tx.deref_mut(), block, num_committees, local_bucket)?; + let mut foreign_indexes = HashMap::new(); + for non_local_bucket in non_local_buckets { + foreign_indexes + .entry(non_local_bucket) + .or_insert(foreign_counters.increment_counter(non_local_bucket)); + } + foreign_counters.set(tx, block.id())?; + Ok(foreign_indexes == *block.get_foreign_indexes()) + } + /// Perform final block validations (TODO: implement all validations) /// We assume at this point that initial stateless validations have been done (in inbound messages) fn validate_local_proposed_block( &self, - tx: &mut ::ReadTransaction<'_>, + tx: &mut ::WriteTransaction<'_>, candidate_block: Block, local_committee: &Committee, + local_committee_shard: &CommitteeShard, ) -> Result, HotStuffError> { - if Block::has_been_processed(tx, candidate_block.id())? { + if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? { return Err(ProposalValidationError::BlockAlreadyProcessed { block_id: *candidate_block.id(), height: candidate_block.height(), @@ -160,7 +188,7 @@ impl OnReceiveProposalHandler { } // Check that details included in the justify match previously added blocks - let Some(justify_block) = candidate_block.justify().get_block(tx).optional()? else { + let Some(justify_block) = candidate_block.justify().get_block(tx.deref_mut()).optional()? else { // This will trigger a sync return Err(ProposalValidationError::JustifyBlockNotFound { proposed_by: candidate_block.proposed_by().to_string(), @@ -241,13 +269,25 @@ impl OnReceiveProposalHandler { // Now that we have all dummy blocks (if any) in place, we can check if the candidate block is safe. // Specifically, it should extend the locked block via the dummy blocks. - if !candidate_block.is_safe(tx)? { + if !candidate_block.is_safe(tx.deref_mut())? { return Err(ProposalValidationError::NotSafeBlock { proposed_by: candidate_block.proposed_by().to_string(), hash: *candidate_block.id(), } .into()); } + if !self.check_foreign_indexes( + tx, + local_committee_shard.num_committees(), + local_committee_shard.bucket(), + &candidate_block, + )? { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: candidate_block.proposed_by().to_string(), + hash: *candidate_block.id(), + } + .into()); + } Ok(ValidBlock::new(candidate_block)) } diff --git a/dan_layer/consensus/src/hotstuff/proposer.rs b/dan_layer/consensus/src/hotstuff/proposer.rs index 418339cac..a9f50d439 100644 --- a/dan_layer/consensus/src/hotstuff/proposer.rs +++ b/dan_layer/consensus/src/hotstuff/proposer.rs @@ -1,12 +1,12 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use log::{debug, info}; use tari_dan_common_types::shard_bucket::ShardBucket; use tari_dan_storage::{ - consensus_models::{Block, ExecutedTransaction}, + consensus_models::{Block, Command, ExecutedTransaction}, StateStore, StateStoreReadTransaction, }; @@ -95,11 +95,16 @@ pub fn get_non_local_buckets( num_committees: u32, local_bucket: ShardBucket, ) -> Result, HotStuffError> { - let prepared_iter = block - .commands() - .iter() - .filter_map(|cmd| cmd.local_prepared()) - .map(|t| &t.id); + get_non_local_buckets_from_commands(tx, block.commands(), num_committees, local_bucket) +} + +pub fn get_non_local_buckets_from_commands( + tx: &mut TTx, + commands: &BTreeSet, + num_committees: u32, + local_bucket: ShardBucket, +) -> Result, HotStuffError> { + let prepared_iter = commands.iter().filter_map(|cmd| cmd.local_prepared()).map(|t| &t.id); let prepared_txs = ExecutedTransaction::get_involved_shards(tx, prepared_iter)?; let non_local_buckets = prepared_txs .into_iter() diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 6e7e7a6f6..635afff0f 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -10,7 +10,7 @@ use std::{ use log::*; use tari_dan_common_types::{optional::Optional, NodeHeight}; use tari_dan_storage::{ - consensus_models::{Block, HighQc, LastSentVote, LastVoted, LeafBlock, TransactionPool}, + consensus_models::{Block, ForeignReceiveCounters, HighQc, LastSentVote, LastVoted, LeafBlock, TransactionPool}, StateStore, StateStoreWriteTransaction, }; @@ -86,6 +86,7 @@ impl HotstuffWorker { tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, tx_events: broadcast::Sender, tx_mempool: mpsc::UnboundedSender, + foreign_receive_counter: ForeignReceiveCounters, shutdown: ShutdownSignal, ) -> Self { let pacemaker = PaceMaker::new(); @@ -137,6 +138,7 @@ impl HotstuffWorker { epoch_manager.clone(), transaction_pool.clone(), pacemaker.clone_handle(), + foreign_receive_counter, ), on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()), on_receive_new_view: OnReceiveNewViewHandler::new( diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index cb5122280..85ef1a918 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -3,7 +3,7 @@ use tari_consensus::hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker}; use tari_dan_common_types::{shard_bucket::ShardBucket, ShardId}; -use tari_dan_storage::consensus_models::TransactionPool; +use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; use tokio::sync::{broadcast, mpsc, watch}; @@ -103,6 +103,7 @@ impl ValidatorBuilder { tx_leader, tx_events.clone(), tx_mempool, + ForeignReceiveCounters::default(), shutdown_signal.clone(), ); diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 738a0cf1d..ab909b974 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -25,6 +25,7 @@ create table blocks is_committed boolean not NULL default '0', is_processed boolean not NULL, is_dummy boolean not NULL, + foreign_indexes text not NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (qc_id) REFERENCES quorum_certificates (qc_id) ); @@ -44,6 +45,7 @@ create table parked_blocks command_count bigint not NULL, commands text not NULL, total_leader_fee bigint not NULL, + foreign_indexes text not NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -243,6 +245,21 @@ CREATE TABLE missing_transactions FOREIGN KEY (block_id) REFERENCES parked_blocks (block_id) ); +CREATE TABLE foreign_send_counters +( + id integer not NULL primary key AUTOINCREMENT, + block_id text not NULL, + counters text not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE foreign_receive_counters +( + id integer not NULL primary key AUTOINCREMENT, + counters text not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP +); + -- Debug Triggers CREATE TABLE transaction_pool_history ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index b1dfcce8b..c0b787f44 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -17,6 +17,7 @@ use diesel::{ ExpressionMethods, JoinOnDsl, NullableExpressionMethods, + OptionalExtension, QueryDsl, QueryableByName, RunQueryDsl, @@ -30,6 +31,8 @@ use tari_dan_storage::{ consensus_models::{ Block, BlockId, + ForeignReceiveCounters, + ForeignSendCounters, HighQc, LastExecuted, LastProposed, @@ -369,6 +372,44 @@ impl StateStoreReadTransa high_qc.try_into() } + fn foreign_send_counters_get(&mut self, block_id: &BlockId) -> Result { + use crate::schema::foreign_send_counters; + + let counter = foreign_send_counters::table + .filter(foreign_send_counters::block_id.eq(serialize_hex(block_id))) + .first::(self.connection()) + .optional() + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_send_counters_get", + source: e, + })?; + + if let Some(counter) = counter { + counter.try_into() + } else { + Ok(ForeignSendCounters::default()) + } + } + + fn foreign_receive_counters_get(&mut self) -> Result { + use crate::schema::foreign_receive_counters; + + let counter = foreign_receive_counters::table + .order_by(foreign_receive_counters::id.desc()) + .first::(self.connection()) + .optional() + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_receive_counters_get", + source: e, + })?; + + if let Some(counter) = counter { + counter.try_into() + } else { + Ok(ForeignReceiveCounters::default()) + } + } + fn transactions_get(&mut self, tx_id: &TransactionId) -> Result { use crate::schema::transactions; diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 0032a90c6..5a289cc96 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -15,6 +15,24 @@ diesel::table! { is_committed -> Bool, is_processed -> Bool, is_dummy -> Bool, + foreign_indexes -> Text, + created_at -> Timestamp, + } +} + +diesel::table! { + foreign_receive_counters (id) { + id -> Integer, + counters -> Text, + created_at -> Timestamp, + } +} + +diesel::table! { + foreign_send_counters (id) { + id -> Integer, + block_id -> Text, + counters -> Text, created_at -> Timestamp, } } @@ -119,6 +137,7 @@ diesel::table! { command_count -> BigInt, commands -> Text, total_leader_fee -> BigInt, + foreign_indexes -> Text, created_at -> Timestamp, } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/block.rs b/dan_layer/state_store_sqlite/src/sql_models/block.rs index 7a4f007a7..5cba3c9fe 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/block.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/block.rs @@ -28,6 +28,7 @@ pub struct Block { pub is_committed: bool, pub is_processed: bool, pub is_dummy: bool, + pub foreign_indexes: String, pub created_at: PrimitiveDateTime, } @@ -52,6 +53,7 @@ impl Block { self.is_dummy, self.is_processed, self.is_committed, + deserialize_json(&self.foreign_indexes)?, self.created_at, )) } @@ -69,6 +71,7 @@ pub struct ParkedBlock { pub command_count: i64, pub commands: String, pub total_leader_fee: i64, + pub foreign_indexes: String, pub created_at: PrimitiveDateTime, } @@ -92,6 +95,7 @@ impl TryFrom for consensus_models::Block for consensus_models::HighQc { } } +#[derive(Debug, Clone, Queryable)] +pub struct ForeignSendCounters { + pub id: i32, + pub block_id: String, + pub counters: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::ForeignSendCounters { + type Error = StorageError; + + fn try_from(value: ForeignSendCounters) -> Result { + Ok(Self { + counters: deserialize_json(&value.counters)?, + }) + } +} + +#[derive(Debug, Clone, Queryable)] +pub struct ForeignReceiveCounters { + pub id: i32, + pub counters: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::ForeignReceiveCounters { + type Error = StorageError; + + fn try_from(value: ForeignReceiveCounters) -> Result { + Ok(Self { + counters: deserialize_json(&value.counters)?, + }) + } +} + #[derive(Debug, Clone, Queryable)] pub struct LockedBlock { pub id: i32, diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 39489a01f..ee53cc4ff 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -16,6 +16,8 @@ use tari_dan_storage::{ BlockId, Decision, Evidence, + ForeignReceiveCounters, + ForeignSendCounters, HighQc, LastExecuted, LastProposed, @@ -142,6 +144,7 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { parked_blocks::commands.eq(serialize_json(block.commands())?), parked_blocks::total_leader_fee.eq(block.total_leader_fee() as i64), parked_blocks::justify.eq(serialize_json(block.justify())?), + parked_blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), ); diesel::insert_into(parked_blocks::table) @@ -186,6 +189,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit blocks::qc_id.eq(serialize_hex(block.justify().id())), blocks::is_dummy.eq(block.is_dummy()), blocks::is_processed.eq(block.is_processed()), + blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), ); diesel::insert_into(blocks::table) @@ -563,6 +567,48 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit Ok(()) } + fn foreign_send_counters_set( + &mut self, + foreign_send_counter: &ForeignSendCounters, + block_id: &BlockId, + ) -> Result<(), StorageError> { + use crate::schema::foreign_send_counters; + + let insert = ( + foreign_send_counters::block_id.eq(serialize_hex(block_id)), + foreign_send_counters::counters.eq(serialize_json(&foreign_send_counter.counters)?), + ); + + diesel::insert_into(foreign_send_counters::table) + .values(insert) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_send_counters_set", + source: e, + })?; + + Ok(()) + } + + fn foreign_receive_counters_set( + &mut self, + foreign_receive_counter: &ForeignReceiveCounters, + ) -> Result<(), StorageError> { + use crate::schema::foreign_receive_counters; + + let insert = (foreign_receive_counters::counters.eq(serialize_json(&foreign_receive_counter.counters)?),); + + diesel::insert_into(foreign_receive_counters::table) + .values(insert) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_receive_counters_set", + source: e, + })?; + + Ok(()) + } + fn transactions_insert(&mut self, transaction: &Transaction) -> Result<(), StorageError> { use crate::schema::transactions; diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 26ed1a5ae..4573adb7d 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -30,6 +30,8 @@ fn create_tx_atom() -> TransactionAtom { mod confirm_all_transitions { + use std::collections::HashMap; + use super::*; #[test] @@ -55,6 +57,7 @@ mod confirm_all_transitions { // cannot cause a state change without any commands [Command::Prepare(atom1.clone())].into_iter().collect(), Default::default(), + HashMap::new(), ); block1.insert(&mut tx).unwrap(); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 6fc3849d3..81d656a44 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -2,15 +2,25 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, fmt::{Debug, Display, Formatter}, + hash::Hash, ops::{DerefMut, RangeInclusive}, }; use log::*; use serde::{Deserialize, Serialize}; use tari_common_types::types::{FixedHash, FixedHashSizeError}; -use tari_dan_common_types::{hashing, optional::Optional, serde_with, Epoch, NodeAddressable, NodeHeight, ShardId}; +use tari_dan_common_types::{ + hashing, + optional::Optional, + serde_with, + shard_bucket::ShardBucket, + Epoch, + NodeAddressable, + NodeHeight, + ShardId, +}; use tari_transaction::TransactionId; use time::PrimitiveDateTime; @@ -59,6 +69,8 @@ pub struct Block { is_processed: bool, /// Flag that indicates that the block has been committed. is_committed: bool, + /// Counter for each foreign shard for reliable broadcast. + foreign_indexes: HashMap, /// Timestamp when was this stored. stored_at: Option, } @@ -72,6 +84,7 @@ impl Block { proposed_by: TAddr, commands: BTreeSet, total_leader_fee: u64, + foreign_indexes: HashMap, ) -> Self { let mut block = Self { id: BlockId::genesis(), @@ -87,6 +100,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: false, + foreign_indexes, stored_at: None, }; block.id = block.calculate_hash().into(); @@ -105,6 +119,7 @@ impl Block { is_dummy: bool, is_processed: bool, is_committed: bool, + foreign_indexes: HashMap, created_at: PrimitiveDateTime, ) -> Self { Self { @@ -121,6 +136,7 @@ impl Block { is_dummy, is_processed, is_committed, + foreign_indexes, stored_at: Some(created_at), } } @@ -134,6 +150,7 @@ impl Block { TAddr::zero(), Default::default(), 0, + HashMap::new(), ) } @@ -152,6 +169,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: true, + foreign_indexes: HashMap::new(), stored_at: None, } } @@ -163,7 +181,16 @@ impl Block { high_qc: QuorumCertificate, epoch: Epoch, ) -> Self { - let mut block = Self::new(parent, high_qc, node_height, epoch, proposed_by, Default::default(), 0); + let mut block = Self::new( + parent, + high_qc, + node_height, + epoch, + proposed_by, + Default::default(), + 0, + HashMap::new(), + ); block.is_dummy = true; block.is_processed = false; block @@ -178,6 +205,13 @@ impl Block { .chain(&self.proposed_by) .chain(&self.merkle_root) .chain(&self.commands) + .chain( + &self + .foreign_indexes + .iter() + .collect::>() + .sort(), + ) .result() } } @@ -281,6 +315,14 @@ impl Block { pub fn is_committed(&self) -> bool { self.is_committed } + + pub fn get_foreign_index(&self, bucket: &ShardBucket) -> Option<&u64> { + self.foreign_indexes.get(bucket) + } + + pub fn get_foreign_indexes(&self) -> &HashMap { + &self.foreign_indexes + } } impl Block { diff --git a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs new file mode 100644 index 000000000..436d2c730 --- /dev/null +++ b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs @@ -0,0 +1,47 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::collections::HashMap; + +use tari_dan_common_types::shard_bucket::ShardBucket; + +use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; + +#[derive(Debug, Clone)] +pub struct ForeignReceiveCounters { + pub counters: HashMap, +} + +impl Default for ForeignReceiveCounters { + fn default() -> Self { + Self::new() + } +} + +impl ForeignReceiveCounters { + pub fn new() -> Self { + Self { + counters: HashMap::new(), + } + } + + pub fn increment(&mut self, bucket: &ShardBucket) { + *self.counters.entry(*bucket).or_default() += 1; + } + + // If we haven't received any messages from this shard yet, return 0 + pub fn get_index(&self, bucket: &ShardBucket) -> u64 { + self.counters.get(bucket).copied().unwrap_or_default() + } +} + +impl ForeignReceiveCounters { + pub fn save(&self, tx: &mut TTx) -> Result<(), StorageError> { + tx.foreign_receive_counters_set(self)?; + Ok(()) + } + + pub fn get(tx: &mut TTx) -> Result { + tx.foreign_receive_counters_get() + } +} diff --git a/dan_layer/storage/src/consensus_models/foreign_send_counters.rs b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs new file mode 100644 index 000000000..9072029fc --- /dev/null +++ b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs @@ -0,0 +1,50 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::collections::HashMap; + +use tari_dan_common_types::shard_bucket::ShardBucket; + +use super::BlockId; +use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; + +#[derive(Debug, Clone)] +pub struct ForeignSendCounters { + pub counters: HashMap, +} + +impl Default for ForeignSendCounters { + fn default() -> Self { + Self::new() + } +} + +impl ForeignSendCounters { + pub fn new() -> Self { + Self { + counters: HashMap::new(), + } + } + + pub fn increment_counter(&mut self, bucket: ShardBucket) -> u64 { + *self.counters.entry(bucket).and_modify(|v| *v += 1).or_insert(1) + } +} + +impl ForeignSendCounters { + pub fn set( + &self, + tx: &mut TTx, + block_id: &BlockId, + ) -> Result<(), StorageError> { + tx.foreign_send_counters_set(self, block_id)?; + Ok(()) + } + + pub fn get( + tx: &mut TTx, + block_id: &BlockId, + ) -> Result { + tx.foreign_send_counters_get(block_id) + } +} diff --git a/dan_layer/storage/src/consensus_models/mod.rs b/dan_layer/storage/src/consensus_models/mod.rs index 5cb8c097c..3a4d43d11 100644 --- a/dan_layer/storage/src/consensus_models/mod.rs +++ b/dan_layer/storage/src/consensus_models/mod.rs @@ -4,6 +4,8 @@ mod block; mod command; mod executed_transaction; +mod foreign_receive_counters; +mod foreign_send_counters; mod high_qc; mod last_executed; mod last_proposed; @@ -26,6 +28,8 @@ mod vote_signature; pub use block::*; pub use command::*; pub use executed_transaction::*; +pub use foreign_receive_counters::*; +pub use foreign_send_counters::*; pub use high_qc::*; pub use last_executed::*; pub use last_proposed::*; diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 07b49bf45..d81037f92 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -18,6 +18,8 @@ use crate::{ BlockId, Decision, Evidence, + ForeignReceiveCounters, + ForeignSendCounters, HighQc, LastExecuted, LastProposed, @@ -90,6 +92,8 @@ pub trait StateStoreReadTransaction { fn locked_block_get(&mut self) -> Result; fn leaf_block_get(&mut self) -> Result; fn high_qc_get(&mut self) -> Result; + fn foreign_send_counters_get(&mut self, block_id: &BlockId) -> Result; + fn foreign_receive_counters_get(&mut self) -> Result; fn transactions_get(&mut self, tx_id: &TransactionId) -> Result; fn transactions_exists(&mut self, tx_id: &TransactionId) -> Result; @@ -251,6 +255,15 @@ pub trait StateStoreWriteTransaction { fn leaf_block_set(&mut self, leaf_node: &LeafBlock) -> Result<(), StorageError>; fn locked_block_set(&mut self, locked_block: &LockedBlock) -> Result<(), StorageError>; fn high_qc_set(&mut self, high_qc: &HighQc) -> Result<(), StorageError>; + fn foreign_send_counters_set( + &mut self, + foreign_send_counter: &ForeignSendCounters, + block_id: &BlockId, + ) -> Result<(), StorageError>; + fn foreign_receive_counters_set( + &mut self, + foreign_send_counter: &ForeignReceiveCounters, + ) -> Result<(), StorageError>; // -------------------------------- Transaction -------------------------------- // fn transactions_insert(&mut self, transaction: &Transaction) -> Result<(), StorageError>; diff --git a/dan_layer/validator_node_rpc/proto/consensus.proto b/dan_layer/validator_node_rpc/proto/consensus.proto index 5dfa78f51..96f3bb5e4 100644 --- a/dan_layer/validator_node_rpc/proto/consensus.proto +++ b/dan_layer/validator_node_rpc/proto/consensus.proto @@ -49,6 +49,7 @@ message Block { bytes merkle_root = 7; repeated Command commands = 8; uint64 total_leader_fee = 9; + bytes foreign_indexes = 10; } message Command { @@ -117,7 +118,6 @@ message TariDanPayload { tari.dan.transaction.Transaction transaction = 1; } - enum QuorumDecision { QUORUM_DECISION_UNKNOWN = 0; QUORUM_DECISION_ACCEPT = 1; @@ -204,4 +204,3 @@ message FullBlock { repeated QuorumCertificate qcs = 2; repeated tari.dan.transaction.Transaction transactions = 3; } - diff --git a/dan_layer/validator_node_rpc/src/conversions/consensus.rs b/dan_layer/validator_node_rpc/src/conversions/consensus.rs index fb3add433..b362fd8c0 100644 --- a/dan_layer/validator_node_rpc/src/conversions/consensus.rs +++ b/dan_layer/validator_node_rpc/src/conversions/consensus.rs @@ -256,6 +256,7 @@ impl From<&tari_dan_storage::consensus_models::Block TryFrom .map(TryInto::try_into) .collect::>()?, value.total_leader_fee, + decode_exact(&value.foreign_indexes)?, )) } }