diff --git a/node/src/chain.rs b/node/src/chain.rs index 1a99ef6ddf..3c2e946ffc 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -89,7 +89,7 @@ impl // NB. After restart, state_root returned by VM is always the last // finalized one. - let mut state_root = vm.read().await.get_state_root()?; + let state_root = vm.read().await.get_state_root()?; info!( event = "VM state loaded", @@ -168,7 +168,9 @@ impl Payload::NewBlock(_) | Payload::Reduction(_) | Payload::Agreement(_) => { - acc.read().await.reroute_msg(msg).await; + if let Err(e) = acc.read().await.reroute_msg(msg).await { + warn!("Unable to reroute_msg to the acceptor: {e}"); + } } _ => warn!("invalid inbound message"), } @@ -176,7 +178,9 @@ impl // Re-routes messages originated from Consensus (upper) layer to the network layer. recv = &mut outbound_chan.recv() => { let msg = recv?; - network.read().await.broadcast(&msg).await; + if let Err(e) = network.read().await.broadcast(&msg).await { + warn!("Unable to re-route message {e}"); + } }, // Handles accept_block_timeout event _ = sleep_until(timeout) => { @@ -220,8 +224,8 @@ impl ChainSrv { // either malformed or empty. let genesis_blk = genesis::generate_state(); - /// Persist genesis block - t.store_block(genesis_blk.header(), &[]); + // Persist genesis block + t.store_block(genesis_blk.header(), &[])?; genesis_blk } }; @@ -246,7 +250,7 @@ impl ChainSrv { } Ok(()) - }); + })?; tracing::info!( event = "Ledger block loaded", diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 89a232591d..8d408c7b0d 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -25,6 +25,7 @@ use dusk_consensus::config::{self, SELECTION_COMMITTEE_SIZE}; use super::consensus::Task; +#[allow(dead_code)] pub(crate) enum RevertTarget { LastFinalizedState = 0, LastEpoch = 1, @@ -69,7 +70,7 @@ impl Acceptor { network: Arc>, vm: Arc>, ) -> Self { - let mut acc = Self { + let acc = Self { mrb: RwLock::new(mrb.clone()), provisioners_list: RwLock::new(provisioners_list.clone()), db: db.clone(), @@ -90,16 +91,20 @@ impl Acceptor { acc } // Re-route message to consensus task - pub(crate) async fn reroute_msg(&self, msg: Message) { + pub(crate) async fn reroute_msg( + &self, + msg: Message, + ) -> Result<(), async_channel::SendError> { match &msg.payload { Payload::NewBlock(_) | Payload::Reduction(_) => { - self.task.read().await.main_inbound.send(msg).await; + self.task.read().await.main_inbound.send(msg).await?; } Payload::Agreement(_) => { - self.task.read().await.agreement_inbound.send(msg).await; + self.task.read().await.agreement_inbound.send(msg).await?; } _ => warn!("invalid inbound message"), } + Ok(()) } pub fn needs_update(blk: &Block, txs: &[SpentTransaction]) -> bool { @@ -127,7 +132,6 @@ impl Acceptor { blk: &Block, ) -> anyhow::Result<()> { let mut task = self.task.write().await; - let (_, public_key) = task.keys.clone(); let mut mrb = self.mrb.write().await; let mut provisioners_list = self.provisioners_list.write().await; @@ -206,7 +210,7 @@ impl Acceptor { } pub(crate) async fn try_accept_block( - &self, + &mut self, blk: &Block, enable_consensus: bool, ) -> anyhow::Result<()> { @@ -280,7 +284,7 @@ impl Acceptor { // Delete from mempool any transaction already included in the block self.db.read().await.update(|update| { for tx in blk.txs().iter() { - database::Mempool::delete_tx(update, tx.hash()); + database::Mempool::delete_tx(update, tx.hash())?; } Ok(()) })?; @@ -322,7 +326,6 @@ impl Acceptor { /// This incorporates both VM state revert and Ledger state revert. pub async fn try_revert(&self, target: RevertTarget) -> Result<()> { let curr_height = self.get_curr_height().await; - let curr_iteration = self.get_curr_iteration().await; let target_state_hash = match target { RevertTarget::LastFinalizedState => { @@ -336,7 +339,7 @@ impl Acceptor { anyhow::Ok(state_hash) } - RevertTarget::LastEpoch => panic!("not implemented"), + _ => unimplemented!(), }?; // Delete any block until we reach the target_state_hash, the @@ -369,9 +372,9 @@ impl Acceptor { // Attempt to resubmit transactions back to mempool. // An error here is not considered critical. for tx in blk.txs().iter() { - Mempool::add_tx(t, tx).map_err(|err| { - error!("failed to resubmit transactions") - }); + if let Err(e) = Mempool::add_tx(t, tx) { + warn!("failed to resubmit transactions: {e}") + }; } height -= 1; @@ -407,10 +410,6 @@ impl Acceptor { self.last_finalized.read().await.clone() } - pub(crate) async fn get_curr_timestamp(&self) -> i64 { - self.mrb.read().await.header().timestamp - } - pub(crate) async fn get_curr_iteration(&self) -> u8 { self.mrb.read().await.header().iteration } diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index 1b2b041738..6ae78feead 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -106,15 +106,16 @@ impl Task { ); let id = self.task_id; - let mut result_queue = self.result.clone(); + let result_queue = self.result.clone(); let provisioners = provisioners.clone(); let (cancel_tx, cancel_rx) = oneshot::channel::(); self.running_task = Some(( tokio::spawn(async move { - result_queue - .send(c.spin(ru, provisioners, cancel_rx).await) - .await; + let cons_result = c.spin(ru, provisioners, cancel_rx).await; + if let Err(e) = result_queue.send(cons_result).await { + error!("Unable to send consensus result to queue {e}") + } trace!("terminate consensus task: {}", id); id @@ -126,14 +127,20 @@ impl Task { /// Aborts the running consensus task and waits for its termination. pub(crate) async fn abort_with_wait(&mut self) { if let Some((handle, cancel_chan)) = self.running_task.take() { - cancel_chan.send(0); - handle.await; + if cancel_chan.send(0).is_err() { + warn!("Unable to send cancel for abort_with_wait") + } + if let Err(e) = handle.await { + warn!("Unable to wait for abort {e}") + } } } pub(crate) fn abort(&mut self) { - if let Some((handle, cancel_chan)) = self.running_task.take() { - cancel_chan.send(0); + if let Some((_, cancel_chan)) = self.running_task.take() { + if cancel_chan.send(0).is_err() { + warn!("Unable to send cancel for abort") + }; } } } @@ -159,8 +166,15 @@ impl dusk_consensus::commons::Database fn store_candidate_block(&mut self, b: Block) { tracing::trace!("store candidate block: {:?}", b); - if let Ok(db) = self.db.try_read() { - db.update(|t| t.store_candidate_block(b)); + match self.db.try_read() { + Ok(db) => { + if let Err(e) = db.update(|t| t.store_candidate_block(b)) { + warn!("Unable to store candidate block: {e}"); + }; + } + Err(e) => { + warn!("Cannot acquire lock to store candidate block: {e}"); + } } } @@ -220,8 +234,15 @@ impl dusk_consensus::commons::Database } fn delete_candidate_blocks(&mut self) { - if let Ok(db) = self.db.try_read() { - db.update(|t| t.clear_candidates()); + match self.db.try_read() { + Ok(db) => { + if let Err(e) = db.update(|t| t.clear_candidates()) { + warn!("Unable to cleare candidates: {e}"); + }; + } + Err(e) => { + warn!("Cannot acquire lock to clear_candidate: {e}"); + } } } } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 29e8bbae89..cd667d3912 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -59,7 +59,7 @@ impl SimpleFSM { } } - pub async fn on_idle(&mut self, timeout: Duration) -> anyhow::Result<()> { + pub async fn on_idle(&mut self, timeout: Duration) { let acc = self.acc.read().await; let height = acc.get_curr_height().await; let iter = acc.get_curr_iteration().await; @@ -77,18 +77,18 @@ impl SimpleFSM { self.blacklisted_blocks.write().await.clear(); // Request missing blocks since my last finalized block - self.network - .write() + let get_blocks = Message::new_get_blocks(GetBlocks { + locator: last_finalized.header().hash, + }); + if let Err(e) = self + .network + .read() .await - .send_to_alive_peers( - &Message::new_get_blocks(GetBlocks { - locator: last_finalized.header().hash, - }), - REDUNDANCY_PEER_FACTOR, - ) - .await; - - Ok(()) + .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) + .await + { + warn!("Unable to request GetBlocks {e}"); + } } pub async fn on_event( @@ -117,8 +117,8 @@ impl SimpleFSM { match &mut self.curr { State::InSync(ref mut curr) => { if curr.on_event(blk, msg).await? { - /// Transition from InSync to OutOfSync state - curr.on_exiting(); + // Transition from InSync to OutOfSync state + curr.on_exiting().await; // Enter new state let mut next = OutOfSyncImpl::new( @@ -131,8 +131,8 @@ impl SimpleFSM { } State::OutOfSync(ref mut curr) => { if curr.on_event(blk, msg).await? { - /// Transition from OutOfSync to InSync state - curr.on_exiting(); + // Transition from OutOfSync to InSync state + curr.on_exiting().await; // Enter new state let mut next = InSyncImpl::new( @@ -140,7 +140,10 @@ impl SimpleFSM { self.network.clone(), self.blacklisted_blocks.clone(), ); - next.on_entering(blk).await; + next.on_entering(blk).await.map_err(|e| { + error!("Unable to enter in_sync state: {e}"); + e + })?; self.curr = State::InSync(next); } } @@ -171,7 +174,7 @@ impl InSyncImpl { /// performed when entering the state async fn on_entering(&mut self, blk: &Block) -> anyhow::Result<()> { - let acc = self.acc.write().await; + let mut acc = self.acc.write().await; let curr_h = acc.get_curr_height().await; if blk.header().height == curr_h + 1 { @@ -191,7 +194,7 @@ impl InSyncImpl { blk: &Block, msg: &Message, ) -> anyhow::Result { - let acc = self.acc.write().await; + let mut acc = self.acc.write().await; let h = blk.header().height; let curr_h = acc.get_curr_height().await; let iter = acc.get_curr_iteration().await; @@ -263,7 +266,9 @@ impl InSyncImpl { // When accepting block from the wire in inSync state, we // rebroadcast it - self.network.write().await.broadcast(msg).await; + if let Err(e) = self.network.write().await.broadcast(msg).await { + warn!("Unable to broadcast accepted block: {e}"); + } return Ok(false); } @@ -273,7 +278,7 @@ impl InSyncImpl { } fn allow_transition(&self, msg: &Message) -> anyhow::Result<()> { - let recv_peer = msg + let _recv_peer = msg .metadata .as_ref() .map(|m| m.src_addr) @@ -334,11 +339,15 @@ impl // Request missing blocks from source peer let gb_msg = Message::new_get_blocks(GetBlocks { locator }); - self.network - .write() + if let Err(e) = self + .network + .read() .await .send_to_peer(&gb_msg, dest_addr) - .await; + .await + { + warn!("Unable to send GetBlocks: {e}") + }; // add to the pool let key = blk.header().height; @@ -364,7 +373,7 @@ impl blk: &Block, msg: &Message, ) -> anyhow::Result { - let acc = self.acc.write().await; + let mut acc = self.acc.write().await; let h = blk.header().height; if self diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index 95f68d5c58..9b13836274 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -28,11 +28,6 @@ use std::vec; use tracing::info; -enum TxType { - ReadWrite, - ReadOnly, -} - const CF_LEDGER_HEADER: &str = "cf_ledger_header"; const CF_LEDGER_TXS: &str = "cf_ledger_txs"; const CF_LEDGER_HEIGHT: &str = "cf_ledger_height"; @@ -52,13 +47,10 @@ pub struct Backend { } impl Backend { - fn begin_tx( - &self, - access_type: TxType, - ) -> DBTransaction<'_, OptimisticTransactionDB> { + fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> { // Create a new RocksDB transaction let write_options = WriteOptions::default(); - let mut tx_options = OptimisticTransactionOptions::default(); + let tx_options = OptimisticTransactionOptions::default(); let inner = self.rocksdb.transaction_opt(&write_options, &tx_options); @@ -102,7 +94,6 @@ impl Backend { DBTransaction::<'_, OptimisticTransactionDB> { inner, - access_type, candidates_cf, ledger_cf, ledger_txs_cf, @@ -166,7 +157,7 @@ impl DB for Backend { F: for<'a> FnOnce(Self::P<'a>) -> T, { // Create a new read-only transaction - let tx = self.begin_tx(TxType::ReadOnly); + let tx = self.begin_tx(); // Execute all read-only transactions in isolation f(tx) @@ -177,7 +168,7 @@ impl DB for Backend { F: for<'a> FnOnce(&Self::P<'a>) -> Result, { // Create read-write transaction - let tx = self.begin_tx(TxType::ReadWrite); + let tx = self.begin_tx(); // If f returns err, no commit will be applied into backend // storage @@ -194,7 +185,6 @@ impl DB for Backend { pub struct DBTransaction<'db, DB: DBAccess> { inner: rocksdb_lib::Transaction<'db, DB>, - access_type: TxType, // TODO: pack all column families into a single array // Candidates column family @@ -233,7 +223,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { .map(|t| t.inner.hash()) .collect::>(), } - .write(&mut buf); + .write(&mut buf)?; self.inner.put_cf(cf, header.hash, buf)?; @@ -305,7 +295,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { let mut txs = vec![]; for buf in txs_buffers { - let mut buf = buf?.unwrap(); + let buf = buf?.unwrap(); let tx = ledger::SpentTransaction::read(&mut &buf.to_vec()[..])?; txs.push(tx.inner); @@ -371,7 +361,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { /// Returns stored register data fn get_register(&self) -> Result> { - if let Some(mut data) = + if let Some(data) = self.snapshot.get_cf(self.ledger_cf, REGISTER_KEY)? { return Ok(Some(Register::read(&mut &data[..])?)); @@ -436,11 +426,9 @@ impl<'db, DB: DBAccess> Candidate for DBTransaction<'db, DB> { .iterator_cf(self.candidates_cf, IteratorMode::Start); // Iterate through the CF_CANDIDATES column family and delete all items - iter.map(Result::unwrap) - .map(|(key, _)| { - self.inner.delete_cf(self.candidates_cf, key); - }) - .collect::>(); + for (key, _) in iter.map(Result::unwrap) { + self.inner.delete_cf(self.candidates_cf, key)?; + } Ok(()) } @@ -453,11 +441,9 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> { let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start); // Iterate through the CF_LEDGER column family and delete all items - iter.map(Result::unwrap) - .map(|(key, _)| { - self.inner.delete_cf(self.ledger_cf, key); - }) - .collect::>(); + for (key, _) in iter.map(Result::unwrap) { + self.inner.delete_cf(self.ledger_cf, key)?; + } self.clear_candidates()?; Ok(()) @@ -567,8 +553,7 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> { // Iterate all keys from the end in reverse lexicographic order while iter.valid() { if let Some(key) = iter.key() { - let (read_gp, tx_hash) = - deserialize_fee_key(&mut &key.to_vec()[..])?; + let (_, tx_hash) = deserialize_fee_key(&mut &key.to_vec()[..])?; txs_list.push(tx_hash); } @@ -582,7 +567,6 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> { pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> { iter: DBRawIteratorWithThreadMode<'db, rocksdb_lib::Transaction<'db, DB>>, - db: &'db Transaction<'db, DB>, mempool: &'db M, } @@ -594,7 +578,7 @@ impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> { ) -> Self { let mut iter = db.raw_iterator_cf(fees_cf); iter.seek_to_last(); - MemPoolIterator { db, iter, mempool } + MemPoolIterator { iter, mempool } } } @@ -687,7 +671,7 @@ struct HeaderRecord { transactions_ids: Vec<[u8; 32]>, } -impl Serializable for HeaderRecord { +impl node_data::Serializable for HeaderRecord { fn write(&self, w: &mut W) -> io::Result<()> { // Write block header self.header.write(w)?; @@ -719,7 +703,7 @@ impl Serializable for HeaderRecord { // Read transactions hashes let mut transactions_ids = vec![]; - for pos in 0..len { + for _ in 0..len { let mut tx_id = [0u8; 32]; r.read_exact(&mut tx_id[..])?; diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 4f0b868416..9b8a6aa6b2 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -97,7 +97,7 @@ impl &mut self, network: Arc>, db: Arc>, - vm: Arc>, + _vm: Arc>, ) -> anyhow::Result { if self.conf.max_ongoing_requests == 0 { return Err(anyhow!("max_ongoing_requests must be greater than 0")); @@ -115,8 +115,8 @@ impl info!("data_broker service started"); loop { - /// Wait until we can process a new request. We limit the number of - /// concurrent requests to mitigate a DoS attack. + // Wait until we can process a new request. We limit the number of + // concurrent requests to mitigate a DoS attack. let permit = self.limit_ongoing_requests.clone().acquire_owned().await?; @@ -129,15 +129,15 @@ impl // Spawn a task to handle the request asynchronously. tokio::spawn(async move { - match Self::handle_request(&network, &db, &msg, &conf).await { + match Self::handle_request(&db, &msg, &conf).await { Ok(resp) => { // Send response + let net = network.read().await; for msg in resp.msgs { - network - .read() - .await - .send_to_peer(&msg, resp.recv_peer) - .await; + let send = net.send_to_peer(&msg, resp.recv_peer); + if let Err(e) = send.await { + warn!("Unable to send_to_peer {e}") + }; // Mitigate pressure on UDP buffers. // Needed only in localnet. @@ -168,14 +168,13 @@ impl impl DataBrokerSrv { /// Handles inbound messages. - async fn handle_request( - network: &Arc>, + async fn handle_request( db: &Arc>, msg: &Message, conf: &conf::Params, ) -> anyhow::Result { - /// source address of the request becomes the receiver address of the - /// response + // source address of the request becomes the receiver address of the + // response let recv_peer = msg .metadata .as_ref() @@ -185,37 +184,29 @@ impl DataBrokerSrv { match &msg.payload { // Handle GetCandidate requests Payload::GetCandidate(m) => { - let msg = Self::handle_get_candidate(network, db, m).await?; + let msg = Self::handle_get_candidate(db, m).await?; Ok(Response::new_from_msg(msg, recv_peer)) } // Handle GetBlocks requests Payload::GetBlocks(m) => { - let msg = Self::handle_get_blocks( - network, - db, - m, - conf.max_inv_entries, - ) - .await?; + let msg = Self::handle_get_blocks(db, m, conf.max_inv_entries) + .await?; Ok(Response::new_from_msg(msg, recv_peer)) } // Handle GetMempool requests - Payload::GetMempool(m) => { - let msg = Self::handle_get_mempool(network, db, m).await?; + Payload::GetMempool(_) => { + let msg = Self::handle_get_mempool(db).await?; Ok(Response::new_from_msg(msg, recv_peer)) } // Handle GetInv requests Payload::GetInv(m) => { - let msg = - Self::handle_inv(network, db, m, conf.max_inv_entries) - .await?; + let msg = Self::handle_inv(db, m, conf.max_inv_entries).await?; Ok(Response::new_from_msg(msg, recv_peer)) } // Handle GetData requests Payload::GetData(m) => { let msgs = - Self::handle_get_data(network, db, m, conf.max_inv_entries) - .await?; + Self::handle_get_data(db, m, conf.max_inv_entries).await?; Ok(Response::new(msgs, recv_peer)) } _ => Err(anyhow::anyhow!("unhandled message payload")), @@ -225,8 +216,7 @@ impl DataBrokerSrv { /// Handles GetCandidate requests. /// /// Message flow: GetCandidate -> CandidateResp - async fn handle_get_candidate( - network: &Arc>, + async fn handle_get_candidate( db: &Arc>, m: &payload::GetCandidate, ) -> Result { @@ -248,10 +238,8 @@ impl DataBrokerSrv { /// Handles GetMempool requests. /// Message flow: GetMempool -> Inv -> GetData -> Tx - async fn handle_get_mempool( - network: &Arc>, + async fn handle_get_mempool( db: &Arc>, - _m: &payload::GetMempool, ) -> Result { let mut inv = payload::Inv::default(); @@ -276,8 +264,7 @@ impl DataBrokerSrv { /// Handles GetBlocks message request. /// /// Message flow: GetBlocks -> Inv -> GetData -> Block - async fn handle_get_blocks( - network: &Arc>, + async fn handle_get_blocks( db: &Arc>, m: &payload::GetBlocks, max_entries: usize, @@ -329,8 +316,7 @@ impl DataBrokerSrv { /// wire message, and sends it back to request the items in full. /// /// An item is a block or a transaction. - async fn handle_inv( - network: &Arc>, + async fn handle_inv( db: &Arc>, m: &node_data::message::payload::Inv, max_entries: usize, @@ -370,8 +356,7 @@ impl DataBrokerSrv { /// /// The response to a GetData message is a vector of messages, each of which /// could be either topics.Block or topics.Tx. - async fn handle_get_data( - network: &Arc>, + async fn handle_get_data( db: &Arc>, m: &node_data::message::payload::GetData, max_entries: usize, diff --git a/node/src/lib.rs b/node/src/lib.rs index e74228a563..e6e8467886 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -4,7 +4,6 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. - pub mod chain; pub mod database; pub mod databroker; diff --git a/node/src/mempool.rs b/node/src/mempool.rs index fc6bc5e887..6bd9151a33 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -12,7 +12,7 @@ use node_data::ledger::Transaction; use node_data::message::{AsyncQueue, Payload, Topics}; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::warn; +use tracing::{error, warn}; const TOPICS: &[u8] = &[Topics::Tx as u8]; @@ -21,7 +21,7 @@ enum TxAcceptanceError { AlreadyExistsInMempool, AlreadyExistsInLedger, NullifierExistsInMempool, - VerificationFailed, + VerificationFailed(String), } impl std::error::Error for TxAcceptanceError {} @@ -35,8 +35,8 @@ impl std::fmt::Display for TxAcceptanceError { Self::AlreadyExistsInLedger => { write!(f, "this transaction exists in the ledger") } - Self::VerificationFailed => { - write!(f, "this transaction is invalid") + Self::VerificationFailed(inner) => { + write!(f, "this transaction is invalid {inner}") } Self::NullifierExistsInMempool => { write!(f, "this transaction's input(s) exists in the mempool") @@ -52,7 +52,7 @@ pub struct MempoolSrv { pub struct TxFilter {} impl crate::Filter for TxFilter { - fn filter(&mut self, msg: &Message) -> anyhow::Result<()> { + fn filter(&mut self, _msg: &Message) -> anyhow::Result<()> { // TODO: Ensure transaction does not exist in the mempool state // TODO: Ensure transaction does not exist in blockchain // TODO: Check Nullifier @@ -92,15 +92,18 @@ impl if let Ok(msg) = self.inbound.recv().await { match &msg.payload { Payload::Transaction(tx) => { - if let Err(e) = - self.accept_tx::(&db, &vm, tx).await - { - tracing::error!("{}", e); - } else { - network.read().await.broadcast(&msg).await; + let accept = self.accept_tx::(&db, &vm, tx); + if let Err(e) = accept.await { + error!("{}", e); + continue; } + + let network = network.read().await; + if let Err(e) = network.broadcast(&msg).await { + warn!("Unable to broadcast accepted tx: {e}") + }; } - _ => tracing::error!("invalid inbound message payload"), + _ => error!("invalid inbound message payload"), } } } @@ -152,7 +155,11 @@ impl MempoolSrv { })?; // VM Preverify call - vm.read().await.preverify(tx)?; + vm.read().await.preverify(tx).map_err(|e| { + anyhow::anyhow!(TxAcceptanceError::VerificationFailed(format!( + "{e:?}" + ))) + })?; tracing::info!( event = "transaction accepted", diff --git a/node/src/network.rs b/node/src/network.rs index 2717b7f3e7..40aaece9bb 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -35,7 +35,9 @@ impl Listener { tokio::spawn(async move { if let Some(Some(queue)) = routes.read().await.get(topic as usize) { - queue.send(msg.clone()).await; + if let Err(e) = queue.send(msg.clone()).await { + error!("Unable to reroute message with topic {topic}: {e}"); + }; }; }); @@ -124,7 +126,9 @@ impl Kadcast { tokio::spawn(async move { if let Some(Some(queue)) = routes.read().await.get(topic) { - queue.send(msg.clone()).await; + if let Err(e) = queue.send(msg.clone()).await { + error!("Unable to route_internal message with topic {topic}: {e}"); + }; }; }); } @@ -225,7 +229,7 @@ impl crate::Network for Kadcast { ) -> anyhow::Result<()> { let mut guard = self.routes.write().await; - let mut route = guard + let route = guard .get_mut(topic as usize) .ok_or_else(|| anyhow::anyhow!("topic out of range: {}", topic))?; @@ -243,13 +247,13 @@ impl crate::Network for Kadcast { timeout_millis: u64, recv_peers_count: usize, ) -> anyhow::Result { - self.remove_route(response_msg_topic.into()).await; + self.remove_route(response_msg_topic.into()).await?; let res = { let queue = AsyncQueue::default(); // register a temporary route that will be unregister on drop self.add_route(response_msg_topic.into(), queue.clone()) - .await; + .await?; self.send_to_alive_peers(request_msg, recv_peers_count) .await?; @@ -268,7 +272,7 @@ impl crate::Network for Kadcast { } }; - self.remove_route(response_msg_topic.into()).await; + self.remove_route(response_msg_topic.into()).await?; res } @@ -279,7 +283,7 @@ impl crate::Network for Kadcast { ) -> anyhow::Result<()> { let mut guard = self.filters.write().await; - let mut filter = guard + let filter = guard .get_mut(msg_type as usize) .expect("should be valid type");