Skip to content

Commit

Permalink
Merge branch 'development' into swarm-list-all-instances
Browse files Browse the repository at this point in the history
* development:
  feat(consensus): proposed block parked and ready event (#1055)
  fix(swarm): reuse allocated ports after restart (#1056)
  fix(indexer): reuse validator RPC sessions (#1057)
  fix(indexer): clippy fixes (#1052)
  • Loading branch information
sdbondi committed Jun 24, 2024
2 parents b156f44 + f61dcd4 commit aebd5e0
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions applications/tari_indexer/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ impl EventManager {
version: u64,
timestamp: u64,
) -> Result<(), anyhow::Error> {
let mut tx = self.substate_store.create_write_tx()?;
let new_event = NewEvent {
substate_id: Some(substate_id.to_string()),
template_address: template_address.to_string(),
tx_hash: tx_hash.to_string(),
topic,
payload: payload.to_json().expect("Failed to convert to JSON"),
version: version as i32,
timestamp: timestamp as i64,
};
tx.save_event(new_event)?;
tx.commit()?;
self.substate_store.with_write_tx(|tx| {
let new_event = NewEvent {
substate_id: Some(substate_id.to_string()),
template_address: template_address.to_string(),
tx_hash: tx_hash.to_string(),
topic,
payload: payload.to_json().expect("Failed to convert to JSON"),
version: version as i32,
timestamp: timestamp as i64,
};
tx.save_event(new_event)
})?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl EventScanner {
module_name,
timestamp: transaction.timestamp as i64,
};
info!(
debug!(
target: LOG_TARGET,
"Saving substate: {:?}",
substate_row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl<'a> SqliteSubstateStoreReadTransaction<'a> {
}
}

// TODO: remove the allow dead_code attributes as these become used.
pub trait SubstateStoreReadTransaction {
fn list_substates(
&mut self,
Expand All @@ -186,13 +187,17 @@ pub trait SubstateStoreReadTransaction {
offset: Option<u64>,
) -> Result<Vec<ListSubstateItem>, StorageError>;
fn get_substate(&mut self, address: &SubstateId) -> Result<Option<Substate>, StorageError>;
#[allow(dead_code)]
fn get_latest_version_for_substate(&mut self, address: &SubstateId) -> Result<Option<i64>, StorageError>;
#[allow(dead_code)]
fn get_all_addresses(&mut self) -> Result<Vec<(String, i64)>, StorageError>;
#[allow(dead_code)]
fn get_all_substates(&mut self) -> Result<Vec<Substate>, StorageError>;
fn get_non_fungible_collections(&mut self) -> Result<Vec<(String, i64)>, StorageError>;
fn get_non_fungible_count(&mut self, resource_address: String) -> Result<i64, StorageError>;
#[allow(dead_code)]
fn get_non_fungible_latest_index(&mut self, resource_address: String) -> Result<Option<i32>, StorageError>;
#[allow(dead_code)]
fn get_non_fungibles(
&mut self,
resource_address: String,
Expand Down Expand Up @@ -629,12 +634,16 @@ impl<'a> SqliteSubstateStoreWriteTransaction<'a> {
}
}

// TODO: remove the allow dead_code attributes as these become used.
pub trait SubstateStoreWriteTransaction {
fn commit(self) -> Result<(), StorageError>;
fn rollback(self) -> Result<(), StorageError>;
fn set_substate(&mut self, new_substate: NewSubstate) -> Result<(), StorageError>;
#[allow(dead_code)]
fn delete_substate(&mut self, address: String) -> Result<(), StorageError>;
#[allow(dead_code)]
fn clear_substates(&mut self) -> Result<(), StorageError>;
#[allow(dead_code)]
fn add_non_fungible_index(&mut self, new_nft_index: NewNonFungibleIndex) -> Result<(), StorageError>;
fn save_event(&mut self, new_event: NewEvent) -> Result<(), StorageError>;
fn save_scanned_block_id(&mut self, new_scanned_block_id: NewScannedBlockId) -> Result<(), StorageError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<'a> ProcessContext<'a> {
}

pub async fn get_free_port(&mut self, name: &'static str) -> anyhow::Result<u16> {
Ok(self.port_allocator.next_port(name).await)
Ok(self.port_allocator.get_or_next_port(name).await)
}

pub fn local_ip(&self) -> &IpAddr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
executables::{Executable, Executables},
port_allocator::PortAllocator,
processes::{MinoTariMinerProcess, MinoTariNodeProcess, MinoTariWalletProcess, ValidatorNodeProcess},
AllocatedPorts,
IndexerProcess,
Instance,
SignalingServerProcess,
Expand Down Expand Up @@ -98,7 +99,7 @@ impl InstanceManager {
extra_args: HashMap<String, String>,
) -> anyhow::Result<InstanceId> {
let instance_id = self.next_instance_id();
self.fork(instance_id, executable, instance_type, instance_name, extra_args)
self.fork(instance_id, executable, instance_type, instance_name, extra_args, None)
.await
}

Expand All @@ -110,6 +111,7 @@ impl InstanceManager {
instance_type: InstanceType,
instance_name: String,
extra_args: HashMap<String, String>,
ports: Option<AllocatedPorts>,
) -> anyhow::Result<InstanceId> {
let local_ip = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1]));
let definition = get_definition(instance_type);
Expand All @@ -121,7 +123,7 @@ impl InstanceManager {
executable.path.display()
);

let mut allocated_ports = self.port_allocator.create();
let mut allocated_ports = ports.unwrap_or_else(|| self.port_allocator.create());

let base_path = self
.base_path
Expand Down Expand Up @@ -279,9 +281,10 @@ impl InstanceManager {
let instance_type = instance.instance_type();
let instance_name = instance.name().to_string();
let extra_args = instance.extra_args().clone();
let ports = instance.allocated_ports().clone();

// This will just overwrite the previous instance
self.fork(id, executable, instance_type, instance_name, extra_args)
self.fork(id, executable, instance_type, instance_name, extra_args, Some(ports))
.await?;

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ impl AllocatedPorts {
self.ports
}

pub async fn next_port(&mut self, name: &'static str) -> u16 {
pub async fn get_or_next_port(&mut self, name: &'static str) -> u16 {
if let Some(port) = self.ports.get(name) {
return *port;
}
loop {
let port = self.current_port;
self.current_port += 1;
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl DanNode {
}

async fn handle_hotstuff_event(&self, event: HotstuffEvent) -> Result<(), anyhow::Error> {
info!(target: LOG_TARGET, "🔥 consensus event: {event}");

let HotstuffEvent::BlockCommitted { block_id, .. } = event else {
return Ok(());
};
Expand Down
18 changes: 13 additions & 5 deletions dan_layer/consensus/src/hotstuff/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::NodeHeight;
use tari_dan_storage::consensus_models::BlockId;
use tari_dan_storage::consensus_models::{BlockId, LeafBlock};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, thiserror::Error)]
pub enum HotstuffEvent {
/// A block has been committed
#[error("Block {block_id} has been committed at height {height}")]
BlockCommitted { block_id: BlockId, height: NodeHeight },
/// A critical failure occurred in consensus
#[error("Consensus failure: {message}")]
Failure { message: String },
/// A leader has timed out
#[error("Leader timeout: new height {new_height}")]
LeaderTimeout { new_height: NodeHeight },
#[error("Block {block} has been parked ({num_missing_txs} missing, {num_awaiting_txs} awaiting execution)")]
ProposedBlockParked {
block: LeafBlock,
num_missing_txs: usize,
num_awaiting_txs: usize,
},
#[error("Parked block {block} is ready")]
ParkedBlockReady { block: LeafBlock },
}
20 changes: 18 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use tari_dan_storage::{
};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::{sync::mpsc, time};
use tokio::{
sync::{broadcast, mpsc},
time,
};

use super::config::HotstuffConfig;
use crate::{
Expand All @@ -32,7 +35,7 @@ use crate::{
check_quorum_certificate,
check_signature,
},
hotstuff::error::HotStuffError,
hotstuff::{error::HotStuffError, HotstuffEvent},
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
};
Expand All @@ -52,6 +55,7 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
tx_events: broadcast::Sender<HotstuffEvent>,
}

impl<TConsensusSpec> OnInboundMessage<TConsensusSpec>
Expand All @@ -66,6 +70,7 @@ where TConsensusSpec: ConsensusSpec
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
tx_events: broadcast::Sender<HotstuffEvent>,
) -> Self {
let (tx_msg_ready, rx_msg_ready) = mpsc::unbounded_channel();
Self {
Expand All @@ -79,6 +84,7 @@ where TConsensusSpec: ConsensusSpec
tx_msg_ready,
message_buffer: MessageBuffer::new(rx_msg_ready),
transaction_pool,
tx_events,
}
}

Expand Down Expand Up @@ -223,6 +229,10 @@ where TConsensusSpec: ConsensusSpec
.get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by())
.await?;

let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady {
block: unparked_block.as_leaf_block(),
});

self.report_message_ready(
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
Expand Down Expand Up @@ -250,6 +260,12 @@ where TConsensusSpec: ConsensusSpec
"🔥 Block {} has {} missing transactions and {} awaiting execution", block, missing_tx_ids.len(), awaiting_execution.len(),
);

let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked {
block: block.as_leaf_block(),
num_missing_txs: missing_tx_ids.len(),
num_awaiting_txs: awaiting_execution.len(),
});

if !missing_tx_ids.is_empty() {
let block_id = *block.id();
let epoch = block.epoch();
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ where TConsensusSpec: ConsensusSpec
if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? {
info!(
target: LOG_TARGET,
"🌿 RE-BROADCASTING locally block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}",
"🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}",
next_block.id(),
next_block.height(),
local_committee.len(),
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
signing_service.clone(),
outbound_messaging.clone(),
transaction_pool.clone(),
tx_events.clone(),
),

on_next_sync_view: OnNextSyncViewHandler::new(
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ impl Test {
match event {
HotstuffEvent::BlockCommitted { block_id, height } => return (address, block_id, height),
HotstuffEvent::Failure { message } => panic!("[{}] Consensus failure: {}", address, message),
HotstuffEvent::LeaderTimeout { new_height } => {
log::info!("[{address}] Leader timeout. New height {new_height}");
other => {
log::info!("[{}] Ignoring event: {:?}", address, other);
continue;
},
}
Expand Down
1 change: 1 addition & 0 deletions dan_layer/validator_node_rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async-trait = { workspace = true }
prost = { workspace = true }
serde = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["sync"] }

[build-dependencies]
proto_builder = { workspace = true }
Expand Down
Loading

0 comments on commit aebd5e0

Please sign in to comment.