From 7a0ac0c0461697bcc81bfb1d2c54464e8111ab2a Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 27 Feb 2024 16:32:44 +0200 Subject: [PATCH 1/7] consensus: Broadcast any consensus FutureEvent --- consensus/src/execution_ctx.rs | 4 ++++ consensus/src/quorum/task.rs | 2 ++ 2 files changed, 6 insertions(+) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 3289b0df5a..30c0469cdd 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -299,6 +299,10 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { Err(ConsensusError::FutureEvent) => { trace!("future msg {:?}", msg); + self.outbound.send(msg.clone()).await.unwrap_or_else(|err| { + error!("unable to re-publish a handled msg {:?}", err) + }); + self.future_msgs.lock().await.put_event( msg.header.round, msg.get_step(), diff --git a/consensus/src/quorum/task.rs b/consensus/src/quorum/task.rs index f217de3edb..117108faa7 100644 --- a/consensus/src/quorum/task.rs +++ b/consensus/src/quorum/task.rs @@ -121,6 +121,8 @@ impl<'p, D: Database> Executor<'p, D> { 0, msg.clone(), ); + + self.publish(msg.clone()).await; } Status::Present => { if let Some(block) = self.collect_inbound_msg(msg).await From c6c19930ab8c5831adeb96c0f125e6db05280d81 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 27 Feb 2024 16:33:50 +0200 Subject: [PATCH 2/7] node: Broadcast any future block --- node/src/chain/fsm.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index bce980f26c..94fac09dd4 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -133,6 +133,21 @@ impl SimpleFSM { blk: &Block, msg: &Message, ) -> anyhow::Result<()> { + if blk.header().height + > self.acc.read().await.get_curr_height().await + 1 + { + // Rebroadcast a block from future + let _ = + self.network + .read() + .await + .broadcast(msg) + .await + .map_err(|err| { + warn!("Unable to broadcast accepted block: {err}") + }); + } + // Filter out blocks that have already been marked as // blacklisted upon successful fallback execution. if self From 72c1abaee73a7d0f949fccaea5f936cff0bf47c8 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 28 Feb 2024 13:00:36 +0200 Subject: [PATCH 3/7] node: Broadcast any consensus msg if consensus task is not running --- node/src/chain/acceptor.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 801497af29..1cdd297c07 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -187,10 +187,36 @@ impl Acceptor { Payload::Candidate(_) | Payload::Validation(_) | Payload::Ratification(_) => { - self.task.read().await.main_inbound.send(msg).await?; + let task = self.task.read().await; + if !task.is_running() { + let _ = self + .network + .read() + .await + .broadcast(&msg) + .await + .map_err(|err| { + warn!("Unable to broadcast accepted block: {err}") + }); + } + + task.main_inbound.send(msg).await?; } Payload::Quorum(_) => { - self.task.read().await.quorum_inbound.send(msg).await?; + let task = self.task.read().await; + if !task.is_running() { + let _ = self + .network + .read() + .await + .broadcast(&msg) + .await + .map_err(|err| { + warn!("Unable to broadcast accepted block: {err}") + }); + } + + task.quorum_inbound.send(msg).await?; } _ => warn!("invalid inbound message"), } From f606519df9ef81a391873b84ba6298a176022503 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 28 Feb 2024 13:02:15 +0200 Subject: [PATCH 4/7] node-data: Add methods for bounded and unbounded AsyncQueue --- node-data/src/message.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index e7cd145b37..aa1f6370c1 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -972,8 +972,13 @@ pub struct AsyncQueue { sender: async_channel::Sender, } -impl Default for AsyncQueue { - fn default() -> Self { +impl AsyncQueue { + pub fn bounded(cap: usize) -> Self { + let (sender, receiver) = async_channel::bounded(cap); + Self { receiver, sender } + } + + pub fn unbounded() -> Self { let (sender, receiver) = async_channel::unbounded(); Self { receiver, sender } } From 4942aca004eec27f7d55c3fa2acb3e1d97ebe0d4 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 28 Feb 2024 13:23:34 +0200 Subject: [PATCH 5/7] node: Use bounded AsyncQueue instead of unbounded --- node/src/chain.rs | 4 ++-- node/src/chain/consensus.rs | 14 ++++++++++---- node/src/databroker.rs | 2 +- node/src/mempool.rs | 10 +++++++++- node/src/network.rs | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 086db131a4..55ffe21a42 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -177,7 +177,7 @@ impl | Payload::Ratification(_) | Payload::Quorum(_) => { if let Err(e) = acc.read().await.reroute_msg(msg).await { - warn!("Unable to reroute_msg to the acceptor: {e}"); + warn!("msg discarded: {e}"); } } _ => warn!("invalid inbound message"), @@ -216,7 +216,7 @@ impl impl ChainSrv { pub fn new(keys_path: String) -> Self { Self { - inbound: Default::default(), + inbound: AsyncQueue::bounded(consensus::QUEUE_LIMIT), keys_path, acceptor: None, } diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index 5952c2962e..3b296a1fe5 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -30,6 +30,8 @@ use node_data::{ledger, Serializable, StepName}; use std::sync::Arc; use std::time::Duration; +pub(crate) const QUEUE_LIMIT: usize = 10_000; + /// Consensus Service Task is responsible for running the consensus layer. /// /// It manages consensus lifecycle and provides a way to interact with it. @@ -66,10 +68,10 @@ impl Task { ); Self { - quorum_inbound: AsyncQueue::default(), - main_inbound: AsyncQueue::default(), - outbound: AsyncQueue::default(), - result: AsyncQueue::default(), + quorum_inbound: AsyncQueue::bounded(QUEUE_LIMIT), + main_inbound: AsyncQueue::bounded(QUEUE_LIMIT), + outbound: AsyncQueue::bounded(QUEUE_LIMIT), + result: AsyncQueue::bounded(QUEUE_LIMIT), running_task: None, task_id: 0, keys, @@ -157,6 +159,10 @@ impl Task { }; } } + + pub(crate) fn is_running(&self) -> bool { + self.running_task.is_some() + } } #[derive(Debug, Default)] diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 6706523a5e..c4b883e96d 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -83,7 +83,7 @@ impl DataBrokerSrv { let permits = conf.max_ongoing_requests; Self { conf, - requests: AsyncQueue::default(), + requests: AsyncQueue::bounded(permits), limit_ongoing_requests: Arc::new(Semaphore::new(permits)), } } diff --git a/node/src/mempool.rs b/node/src/mempool.rs index 0f85e466dd..08e39d15c3 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -15,6 +15,7 @@ use tokio::sync::RwLock; use tracing::{error, warn}; const TOPICS: &[u8] = &[Topics::Tx as u8]; +const MAX_PENDING_TX: usize = 10000; #[derive(Debug, Error)] enum TxAcceptanceError { @@ -36,11 +37,18 @@ impl From for TxAcceptanceError { } } -#[derive(Default)] pub struct MempoolSrv { inbound: AsyncQueue, } +impl Default for MempoolSrv { + fn default() -> Self { + Self { + inbound: AsyncQueue::bounded(MAX_PENDING_TX), + } + } +} + pub struct TxFilter {} impl crate::Filter for TxFilter { fn filter(&mut self, _msg: &Message) -> anyhow::Result<()> { diff --git a/node/src/network.rs b/node/src/network.rs index 222752393a..fb73508f7e 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -235,7 +235,7 @@ impl crate::Network for Kadcast { self.remove_route(response_msg_topic.into()).await; let res = { - let queue = AsyncQueue::default(); + let queue = AsyncQueue::bounded(1000); // register a temporary route that will be unregister on drop self.add_route(response_msg_topic.into(), queue.clone()) .await?; From f2ba3ce9eeae1507e4983aa516101232b1945e7e Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 28 Feb 2024 14:08:30 +0200 Subject: [PATCH 6/7] node: Fix error message in broadcast call --- node/src/chain/acceptor.rs | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 1cdd297c07..be8c9b722b 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -189,15 +189,7 @@ impl Acceptor { | Payload::Ratification(_) => { let task = self.task.read().await; if !task.is_running() { - let _ = self - .network - .read() - .await - .broadcast(&msg) - .await - .map_err(|err| { - warn!("Unable to broadcast accepted block: {err}") - }); + broadcast(&self.network, &msg).await; } task.main_inbound.send(msg).await?; @@ -205,15 +197,7 @@ impl Acceptor { Payload::Quorum(_) => { let task = self.task.read().await; if !task.is_running() { - let _ = self - .network - .read() - .await - .broadcast(&msg) - .await - .map_err(|err| { - warn!("Unable to broadcast accepted block: {err}") - }); + broadcast(&self.network, &msg).await; } task.quorum_inbound.send(msg).await?; @@ -481,9 +465,7 @@ impl Acceptor { // (Re)broadcast a fully valid block before any call to // get_provisioners to speed up its propagation if let Some(msg) = msg { - let _ = self.network.read().await.broadcast(msg).await.map_err( - |err| warn!("Unable to broadcast accepted block: {err}"), - ); + broadcast(&self.network, msg).await; } self.log_missing_iterations( @@ -788,6 +770,12 @@ impl Acceptor { } } +async fn broadcast(network: &Arc>, msg: &Message) { + let _ = network.read().await.broadcast(msg).await.map_err(|err| { + warn!("Unable to broadcast msg: {:?} {err} ", msg.topic()) + }); +} + /// Performs full verification of block header against prev_block header where /// prev_block is usually the blockchain tip /// From 844347f7b0f3a4681c618c39c36bf2564553d97a Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 28 Feb 2024 15:37:01 +0200 Subject: [PATCH 7/7] node: Use const for the temporary route --- node/src/network.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/src/network.rs b/node/src/network.rs index fb73508f7e..9245454852 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -21,6 +21,8 @@ use tracing::{error, info, trace}; mod frame; +const MAX_QUEUE_SIZE: usize = 1000; + type RoutesList = [Option>; N]; type FilterList = [Option; N]; @@ -235,7 +237,7 @@ impl crate::Network for Kadcast { self.remove_route(response_msg_topic.into()).await; let res = { - let queue = AsyncQueue::bounded(1000); + let queue = AsyncQueue::bounded(MAX_QUEUE_SIZE); // register a temporary route that will be unregister on drop self.add_route(response_msg_topic.into(), queue.clone()) .await?;