diff --git a/node-data/src/encoding.rs b/node-data/src/encoding.rs index 0f9cafa486..75677b9994 100644 --- a/node-data/src/encoding.rs +++ b/node-data/src/encoding.rs @@ -424,11 +424,9 @@ mod tests { /// Asserts if encoding/decoding of a serializable type runs properly. fn assert_serializable + Eq + Serializable>() { let obj: S = Faker.fake(); - let mut buf = vec![]; - obj.write(&mut buf).expect("should be writable"); + let data = obj.write_to_vec(); - assert!(obj - .eq(&S::read(&mut &buf.to_vec()[..]).expect("should be readable"))); + assert!(obj.eq(&S::read(&mut &data[..]).expect("should be readable"))); } #[test] diff --git a/node-data/src/ledger/faults.rs b/node-data/src/ledger/faults.rs index b6ed207831..a23e7a18fb 100644 --- a/node-data/src/ledger/faults.rs +++ b/node-data/src/ledger/faults.rs @@ -65,9 +65,8 @@ impl From for InvalidFault { impl Fault { /// Hash the serialized form pub fn hash(&self) -> [u8; 32] { - let mut b = vec![]; - self.write(&mut b).expect("Write to a vec shall not fail"); - BlsScalar::hash_to_scalar(&b[..]).to_bytes() + let data = self.write_to_vec(); + BlsScalar::hash_to_scalar(&data).to_bytes() } pub fn same(&self, other: &Fault) -> bool { diff --git a/node-data/src/lib.rs b/node-data/src/lib.rs index 53927bc807..6089625c5e 100644 --- a/node-data/src/lib.rs +++ b/node-data/src/lib.rs @@ -32,6 +32,13 @@ pub trait Serializable { where Self: Sized; + fn write_to_vec(&self) -> Vec { + let mut buffer = vec![]; + self.write(&mut buffer) + .expect("Writing to vec should succeed"); + buffer + } + fn read_bytes(r: &mut R) -> io::Result<[u8; N]> { let mut buffer = [0u8; N]; r.read_exact(&mut buffer)?; diff --git a/node-data/src/message.rs b/node-data/src/message.rs index aa40815f9f..53765c6610 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -332,9 +332,7 @@ impl ConsensusHeader { } pub fn signable(&self) -> Vec { - let mut buf = vec![]; - self.write(&mut buf).expect("Writing to vec should succeed"); - buf + self.write_to_vec() } } @@ -1387,8 +1385,8 @@ mod tests { } fn assert_serialize(v: S) { - let mut buf = vec![]; - assert!(v.write(&mut buf).is_ok()); + let buf = v.write_to_vec(); + let dup = S::read(&mut &buf[..]).expect("deserialize is ok"); assert_eq!( v, diff --git a/node/src/chain.rs b/node/src/chain.rs index d25e4ad8ca..ff3d322641 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -210,9 +210,7 @@ impl }; } - if let Err(e) = network.read().await.broadcast(&msg).await { - warn!("Unable to re-route message {e}"); - } + network.read().await.broadcast(&msg).await; }, // Handles accept_block_timeout event _ = sleep_until(timeout) => { diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index f06c4599e7..5bc7fc2e0f 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -1009,9 +1009,7 @@ impl Acceptor { } async fn broadcast(network: &Arc>, msg: &Message) { - let _ = network.read().await.broadcast(msg).await.map_err(|err| { - warn!("Unable to broadcast msg: {:?} {err} ", msg.topic()) - }); + network.read().await.broadcast(msg).await } /// Performs full verification of block header against prev_block header where diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index d2838cad01..d9fce85ce0 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -352,10 +352,7 @@ impl Operations for Executor { metric.push_back(elapsed); debug!(event = "avg_updated", ?step_name, metric = ?metric); - let mut bytes = Vec::new(); - metric.write(&mut bytes)?; - - t.op_write(db_key, bytes) + t.op_write(db_key, metric.write_to_vec()) }) .map_err(Error::MetricsUpdate)?; diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 0d86668390..ca58de67a5 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -24,7 +24,7 @@ use std::time::Duration; use std::{sync::Arc, time::SystemTime}; use tokio::sync::RwLock; use tokio::time::Instant; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; @@ -122,15 +122,11 @@ impl SimpleFSM { let get_blocks = Message::new_get_blocks(GetBlocks { locator: last_finalized.header().hash, }); - if let Err(e) = self - .network + self.network .read() .await .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) - .await - { - warn!("Unable to request GetBlocks {e}"); - } + .await; } else { error!("could not request blocks"); } @@ -674,14 +670,11 @@ impl InSyncImpl { let req = GetResource::new(inv, this_peer, u64::MAX, 1); debug!(event = "request block by height", ?req, ?peer_addr); - if let Err(err) = network + network .read() .await .send_to_peer(&Message::new_get_resource(req), peer_addr) .await - { - warn!("could not request block {err}") - } } async fn on_heartbeat(&mut self) -> anyhow::Result { @@ -747,15 +740,11 @@ impl // Request missing blocks from source peer let gb_msg = Message::new_get_blocks(GetBlocks { locator }); - if let Err(e) = self - .network + self.network .read() .await .send_to_peer(&gb_msg, dest_addr) - .await - { - warn!("Unable to send GetBlocks: {e}") - }; + .await; // add to the pool let key = blk.header().height; @@ -873,12 +862,9 @@ impl async fn flood_request(network: &Arc>, inv: &Inv) { debug!(event = "flood_request", ?inv); - if let Err(err) = network + network .read() .await .flood_request(inv, None, DEFAULT_HOPS_LIMIT) - .await - { - warn!("could not request block {err}") - }; + .await; } diff --git a/node/src/chain/metrics.rs b/node/src/chain/metrics.rs index 5e8044bdd6..bb4d1ce28d 100644 --- a/node/src/chain/metrics.rs +++ b/node/src/chain/metrics.rs @@ -82,8 +82,7 @@ mod tests { assert_eq!(metric.average().expect("positive number"), expected); // Marshal/Unmarshal - let mut buf = Vec::new(); - metric.write(&mut buf).expect("all written"); + let buf = metric.write_to_vec(); assert_eq!( AverageElapsedTime::read(&mut &buf[..]) diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index d1cd6937ea..f2a33664a4 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -290,19 +290,17 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { { let cf = self.ledger_cf; - let mut buf = vec![]; - LightBlock { + let block = LightBlock { header: header.clone(), transactions_ids: txs .iter() .map(|t| t.inner.id()) .collect::>(), - faults_ids: faults.iter().map(|f| f.hash()).collect::>(), - } - .write(&mut buf)?; + faults_ids: faults.iter().map(|f| f.hash()).collect(), + }; - self.put_cf(cf, header.hash, buf)?; + self.put_cf(cf, header.hash, block.write_to_vec())?; } // Update metadata values @@ -315,8 +313,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { // store all block transactions for tx in txs { - let mut d = vec![]; - tx.write(&mut d)?; + let d = tx.write_to_vec(); self.put_cf(cf, tx.inner.id(), d)?; } } @@ -327,9 +324,9 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { // store all block faults for f in faults { - let mut d = vec![]; - f.write(&mut d)?; - self.put_cf(cf, f.hash(), d)?; + let key = f.hash(); + let value = f.write_to_vec(); + self.put_cf(cf, key, value)?; } } self.store_block_label(header.height, &header.hash, label)?; @@ -570,8 +567,7 @@ impl<'db, DB: DBAccess> Candidate for DBTransaction<'db, DB> { /// Returns `Ok(())` if the block is successfully stored, or an error if the /// operation fails. fn store_candidate_block(&self, b: ledger::Block) -> Result<()> { - let mut serialized = vec![]; - b.write(&mut serialized)?; + let serialized = b.write_to_vec(); self.inner .put_cf(self.candidates_cf, b.header().hash, serialized)?; @@ -681,10 +677,9 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> { impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> { fn add_tx(&self, tx: &ledger::Transaction) -> Result<()> { // Map Hash to serialized transaction - let mut tx_data = vec![]; - tx.write(&mut tx_data)?; - + let tx_data = tx.write_to_vec(); let hash = tx.id(); + self.put_cf(self.mempool_cf, hash, tx_data)?; // Add Secondary indexes // diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 3377721973..18af68b6b5 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -140,10 +140,7 @@ impl // Send response let net = network.read().await; for msg in resp.msgs { - let send = net.send_to_peer(&msg, resp.recv_peer); - if let Err(e) = send.await { - warn!("Unable to send_to_peer {e}") - }; + net.send_to_peer(&msg, resp.recv_peer).await; // Mitigate pressure on UDP buffers. // Needed only in localnet. diff --git a/node/src/lib.rs b/node/src/lib.rs index 240a6ccbdb..9d6a7bb7d2 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -40,7 +40,7 @@ pub type BoxedFilter = Box; #[async_trait] pub trait Network: Send + Sync + 'static { /// Broadcasts a fire-and-forget message. - async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>; + async fn broadcast(&self, msg: &Message); /// Broadcasts a request message async fn flood_request( @@ -48,21 +48,17 @@ pub trait Network: Send + Sync + 'static { msg_inv: &Inv, ttl_as_sec: Option, hops_limit: u16, - ) -> anyhow::Result<()>; + ); /// Sends a message to a specified peer. async fn send_to_peer( &self, msg: &Message, peer_addr: std::net::SocketAddr, - ) -> anyhow::Result<()>; + ); /// Sends to random set of alive peers. - async fn send_to_alive_peers( - &self, - msg: &Message, - amount: usize, - ) -> anyhow::Result<()>; + async fn send_to_alive_peers(&self, msg: &Message, amount: usize); /// Routes any message of the specified type to this queue. async fn add_route( diff --git a/node/src/mempool.rs b/node/src/mempool.rs index 45c1e7ef72..2f08ded5dc 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -15,7 +15,7 @@ use node_data::message::{AsyncQueue, Payload, Topics}; use std::sync::Arc; use thiserror::Error; use tokio::sync::RwLock; -use tracing::{error, info, warn}; +use tracing::{error, info}; const TOPICS: &[u8] = &[Topics::Tx as u8]; @@ -83,10 +83,7 @@ impl continue; } - let network = network.read().await; - if let Err(e) = network.broadcast(&msg).await { - warn!("Unable to broadcast accepted tx: {e}") - }; + network.read().await.broadcast(&msg).await; } _ => error!("invalid inbound message payload"), } diff --git a/node/src/network.rs b/node/src/network.rs index 676c6fce3b..e03a0a99ca 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -189,17 +189,14 @@ impl Kadcast { #[async_trait] impl crate::Network for Kadcast { - async fn broadcast(&self, msg: &Message) -> anyhow::Result<()> { + async fn broadcast(&self, msg: &Message) { let height = match msg.metadata { - Some(Metadata { height: 0, .. }) => return Ok(()), + Some(Metadata { height: 0, .. }) => return, Some(Metadata { height, .. }) => Some(height - 1), None => None, }; - let encoded = frame::Pdu::encode(msg, 0).map_err(|err| { - error!("could not encode message {msg:?}: {err}"); - anyhow::anyhow!("failed to broadcast: {err}") - })?; + let encoded = frame::Pdu::encode(msg, 0); counter!("dusk_bytes_cast").increment(encoded.len() as u64); counter!(format!("dusk_outbound_{:?}_size", msg.topic())) @@ -207,8 +204,6 @@ impl crate::Network for Kadcast { trace!("broadcasting msg ({:?})", msg.topic()); self.peer.broadcast(&encoded, height).await; - - Ok(()) } /// Broadcast a GetResource request. @@ -229,7 +224,7 @@ impl crate::Network for Kadcast { msg_inv: &Inv, ttl_as_sec: Option, hops_limit: u16, - ) -> anyhow::Result<()> { + ) { let ttl_as_sec = ttl_as_sec.map_or_else( || u64::MAX, |v| { @@ -254,31 +249,19 @@ impl crate::Network for Kadcast { } /// Sends an encoded message to a given peer. - async fn send_to_peer( - &self, - msg: &Message, - recv_addr: SocketAddr, - ) -> anyhow::Result<()> { + async fn send_to_peer(&self, msg: &Message, recv_addr: SocketAddr) { // rnd_count is added to bypass kadcast dupemap let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst); - let encoded = frame::Pdu::encode(msg, rnd_count) - .map_err(|err| anyhow::anyhow!("failed to send_to_peer: {err}"))?; + let encoded = frame::Pdu::encode(msg, rnd_count); let topic = msg.topic(); info!("sending msg ({topic:?}) to peer {recv_addr}"); - self.send_with_metrics(&encoded, recv_addr).await; - - Ok(()) + self.send_with_metrics(&encoded, recv_addr).await } /// Sends to random set of alive peers. - async fn send_to_alive_peers( - &self, - msg: &Message, - amount: usize, - ) -> anyhow::Result<()> { - let encoded = frame::Pdu::encode(msg, 0) - .map_err(|err| anyhow::anyhow!("failed to encode: {err}"))?; + async fn send_to_alive_peers(&self, msg: &Message, amount: usize) { + let encoded = frame::Pdu::encode(msg, 0); let topic = msg.topic(); counter!(format!("dusk_requests_{:?}", topic)).increment(1); @@ -287,8 +270,6 @@ impl crate::Network for Kadcast { trace!("sending msg ({topic:?}) to peer {recv_addr}"); self.send_with_metrics(&encoded, recv_addr).await; } - - Ok(()) } /// Route any message of the specified type to this queue. diff --git a/node/src/network/frame.rs b/node/src/network/frame.rs index 342cf817b9..edc60bc88e 100644 --- a/node/src/network/frame.rs +++ b/node/src/network/frame.rs @@ -26,19 +26,17 @@ pub struct Header { } impl Pdu { - pub fn encode(msg: &Message, reserved: u64) -> io::Result> { - let mut payload_buf = vec![]; - msg.write(&mut payload_buf)?; + pub fn encode(msg: &Message, reserved: u64) -> Vec { + let payload_buf = msg.write_to_vec(); - let mut header_buf = vec![]; - Header { - checksum: calc_checksum(&payload_buf[..]), + let header_buf = Header { + checksum: calc_checksum(&payload_buf), version: PROTOCOL_VERSION, reserved, } - .write(&mut header_buf)?; + .write_to_vec(); - Ok([header_buf, payload_buf].concat()) + [header_buf, payload_buf].concat() } pub fn decode(r: &mut R) -> io::Result