Skip to content

Commit

Permalink
Merge pull request #1465 from dusk-network/fix-1456
Browse files Browse the repository at this point in the history
Any message that cannot be validated should be repropagated
  • Loading branch information
goshawk-3 authored Feb 29, 2024
2 parents 6c863ea + 5a254d0 commit 8e3f87c
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 16 deletions.
4 changes: 4 additions & 0 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
Err(ConsensusError::FutureEvent) => {
trace!("future msg {:?}", msg);

self.outbound.send(msg.clone()).await.unwrap_or_else(|err| {
error!("unable to re-publish a handled msg {:?}", err)
});

self.future_msgs.lock().await.put_event(
msg.header.round,
msg.get_step(),
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/quorum/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<'p, D: Database> Executor<'p, D> {
0,
msg.clone(),
);

self.publish(msg.clone()).await;
}
Status::Present => {
if let Some(block) = self.collect_inbound_msg(msg).await
Expand Down
9 changes: 7 additions & 2 deletions node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,13 @@ pub struct AsyncQueue<M: Clone> {
sender: async_channel::Sender<M>,
}

impl<M: Clone> Default for AsyncQueue<M> {
fn default() -> Self {
impl<M: Clone> AsyncQueue<M> {
pub fn bounded(cap: usize) -> Self {
let (sender, receiver) = async_channel::bounded(cap);
Self { receiver, sender }
}

pub fn unbounded() -> Self {
let (sender, receiver) = async_channel::unbounded();
Self { receiver, sender }
}
Expand Down
4 changes: 2 additions & 2 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
| Payload::Ratification(_)
| Payload::Quorum(_) => {
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Unable to reroute_msg to the acceptor: {e}");
warn!("msg discarded: {e}");
}
}
_ => warn!("invalid inbound message"),
Expand Down Expand Up @@ -216,7 +216,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
pub fn new(keys_path: String) -> Self {
Self {
inbound: Default::default(),
inbound: AsyncQueue::bounded(consensus::QUEUE_LIMIT),
keys_path,
acceptor: None,
}
Expand Down
24 changes: 19 additions & 5 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,20 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_) => {
self.task.read().await.main_inbound.send(msg).await?;
let task = self.task.read().await;
if !task.is_running() {
broadcast(&self.network, &msg).await;
}

task.main_inbound.send(msg).await?;
}
Payload::Quorum(_) => {
self.task.read().await.quorum_inbound.send(msg).await?;
let task = self.task.read().await;
if !task.is_running() {
broadcast(&self.network, &msg).await;
}

task.quorum_inbound.send(msg).await?;
}
_ => warn!("invalid inbound message"),
}
Expand Down Expand Up @@ -455,9 +465,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// (Re)broadcast a fully valid block before any call to
// get_provisioners to speed up its propagation
if let Some(msg) = msg {
let _ = self.network.read().await.broadcast(msg).await.map_err(
|err| warn!("Unable to broadcast accepted block: {err}"),
);
broadcast(&self.network, msg).await;
}

self.log_missing_iterations(
Expand Down Expand Up @@ -762,6 +770,12 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}
}

async fn broadcast<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
let _ = network.read().await.broadcast(msg).await.map_err(|err| {
warn!("Unable to broadcast msg: {:?} {err} ", msg.topic())
});
}

/// Performs full verification of block header against prev_block header where
/// prev_block is usually the blockchain tip
///
Expand Down
14 changes: 10 additions & 4 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use node_data::{ledger, Serializable, StepName};
use std::sync::Arc;
use std::time::Duration;

pub(crate) const QUEUE_LIMIT: usize = 10_000;

/// Consensus Service Task is responsible for running the consensus layer.
///
/// It manages consensus lifecycle and provides a way to interact with it.
Expand Down Expand Up @@ -66,10 +68,10 @@ impl Task {
);

Ok(Self {
quorum_inbound: AsyncQueue::default(),
main_inbound: AsyncQueue::default(),
outbound: AsyncQueue::default(),
result: AsyncQueue::default(),
quorum_inbound: AsyncQueue::bounded(QUEUE_LIMIT),
main_inbound: AsyncQueue::bounded(QUEUE_LIMIT),
outbound: AsyncQueue::bounded(QUEUE_LIMIT),
result: AsyncQueue::bounded(QUEUE_LIMIT),
running_task: None,
task_id: 0,
keys,
Expand Down Expand Up @@ -157,6 +159,10 @@ impl Task {
};
}
}

pub(crate) fn is_running(&self) -> bool {
self.running_task.is_some()
}
}

#[derive(Debug, Default)]
Expand Down
15 changes: 15 additions & 0 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
blk: &Block,
msg: &Message,
) -> anyhow::Result<()> {
if blk.header().height
> self.acc.read().await.get_curr_height().await + 1
{
// Rebroadcast a block from future
let _ =
self.network
.read()
.await
.broadcast(msg)
.await
.map_err(|err| {
warn!("Unable to broadcast accepted block: {err}")
});
}

// Filter out blocks that have already been marked as
// blacklisted upon successful fallback execution.
if self
Expand Down
2 changes: 1 addition & 1 deletion node/src/databroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl DataBrokerSrv {
let permits = conf.max_ongoing_requests;
Self {
conf,
requests: AsyncQueue::default(),
requests: AsyncQueue::bounded(permits),
limit_ongoing_requests: Arc::new(Semaphore::new(permits)),
}
}
Expand Down
10 changes: 9 additions & 1 deletion node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::sync::RwLock;
use tracing::{error, warn};

const TOPICS: &[u8] = &[Topics::Tx as u8];
const MAX_PENDING_TX: usize = 10000;

#[derive(Debug, Error)]
enum TxAcceptanceError {
Expand All @@ -36,11 +37,18 @@ impl From<anyhow::Error> for TxAcceptanceError {
}
}

#[derive(Default)]
pub struct MempoolSrv {
inbound: AsyncQueue<Message>,
}

impl Default for MempoolSrv {
fn default() -> Self {
Self {
inbound: AsyncQueue::bounded(MAX_PENDING_TX),
}
}
}

pub struct TxFilter {}
impl crate::Filter for TxFilter {
fn filter(&mut self, _msg: &Message) -> anyhow::Result<()> {
Expand Down
4 changes: 3 additions & 1 deletion node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tracing::{error, info, trace};

mod frame;

const MAX_QUEUE_SIZE: usize = 1000;

type RoutesList<const N: usize> = [Option<AsyncQueue<Message>>; N];
type FilterList<const N: usize> = [Option<BoxedFilter>; N];

Expand Down Expand Up @@ -244,7 +246,7 @@ impl<const N: usize> crate::Network for Kadcast<N> {
self.remove_route(response_msg_topic.into()).await;

let res = {
let queue = AsyncQueue::default();
let queue = AsyncQueue::bounded(MAX_QUEUE_SIZE);
// register a temporary route that will be unregister on drop
self.add_route(response_msg_topic.into(), queue.clone())
.await?;
Expand Down

0 comments on commit 8e3f87c

Please sign in to comment.