From 16e18ae75b9120b1eeb533b2ba5fd1a4dd05cc05 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 10:34:02 +0300 Subject: [PATCH 1/9] node: Ensure frame checksum is correct --- node/src/network/frame.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/node/src/network/frame.rs b/node/src/network/frame.rs index 342cf817b9..12d485e2c4 100644 --- a/node/src/network/frame.rs +++ b/node/src/network/frame.rs @@ -6,7 +6,7 @@ use node_data::message::Message; use node_data::Serializable; -use std::io::{self, Read, Write}; +use std::io::{self, ErrorKind, Read, Write}; const PROTOCOL_VERSION: [u8; 8] = [0, 0, 0, 0, 1, 0, 0, 0]; @@ -46,8 +46,15 @@ impl Pdu { Self: Sized, { let header = Header::read(r)?; - let payload = Message::read(r)?; + let mut payload_buf = vec![]; + r.read_to_end(&mut payload_buf)?; + + if header.checksum != calc_checksum(&payload_buf[..]) { + return Err(io::Error::new(ErrorKind::Other, "Checksum is wrong")); + } + + let payload = Message::read(&mut &payload_buf[..])?; Ok(Pdu { header, payload }) } } From d23ea01955178ede352206368a1d649b3b8549fa Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 10:36:41 +0300 Subject: [PATCH 2/9] node: Use contains_key in bits --- consensus/src/user/committee.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/consensus/src/user/committee.rs b/consensus/src/user/committee.rs index e3f404a8ea..125efe0395 100644 --- a/consensus/src/user/committee.rs +++ b/consensus/src/user/committee.rs @@ -102,12 +102,9 @@ impl Committee { debug_assert!(self.members.len() <= mem::size_of_val(&bits) * 8); - for (pk, _) in voters.iter() { - for (pos, (member_pk, _)) in self.members.iter().enumerate() { - if member_pk.eq(pk) { - bits |= 1 << pos; // flip the i-th bit to 1 - break; - } + for (pos, (member_pk, _)) in self.members.iter().enumerate() { + if voters.contains_key(member_pk) { + bits |= 1 << pos; // flip the i-th bit to 1 } } From 40ba5f9accf6106950334b436e97a9652e0776ec Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 10:39:00 +0300 Subject: [PATCH 3/9] consensus: Check expected generator in the first place --- consensus/src/proposal/handler.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/consensus/src/proposal/handler.rs b/consensus/src/proposal/handler.rs index 56ad6df31d..7d2500a3cc 100644 --- a/consensus/src/proposal/handler.rs +++ b/consensus/src/proposal/handler.rs @@ -83,6 +83,11 @@ impl ProposalHandler { expected_generator: &PublicKeyBytes, ) -> Result<(), ConsensusError> { let p = Self::unwrap_msg(msg)?; + + if expected_generator != p.sign_info.signer.bytes() { + return Err(ConsensusError::NotCommitteeMember); + } + // Verify new_block msg signature p.verify_signature()?; @@ -104,10 +109,6 @@ impl ProposalHandler { return Err(ConsensusError::InvalidBlock); } - if expected_generator != p.sign_info.signer.bytes() { - return Err(ConsensusError::NotCommitteeMember); - } - Ok(()) } From 336c0bf3baf64d96402c45ed794759648c1c270f Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 10:39:41 +0300 Subject: [PATCH 4/9] node: Use Inv::max_entries accordingly --- node/src/databroker.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 3377721973..94b30186b3 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -332,6 +332,11 @@ impl DataBrokerSrv { max_entries: usize, requester_addr: SocketAddr, ) -> Result { + let mut max_entries = max_entries; + if m.max_entries > 0 { + max_entries = min(max_entries, m.max_entries as usize); + } + let inv = db.read().await.view(|t| { let mut inv = payload::Inv::default(); for i in &m.inv_list { From 8110ccff85bf03d045af6989c54ac2d4bc657084 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 11:09:34 +0300 Subject: [PATCH 5/9] node: Delete a mempool txn in read-write db operation --- node/src/mempool.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/node/src/mempool.rs b/node/src/mempool.rs index 45c1e7ef72..ba98672df2 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -117,11 +117,20 @@ impl MempoolSrv { // Perform basic checks on the transaction db.read().await.view(|view| { // ensure transaction does not exist in the mempool - if view.get_tx_exists(tx_id)? { return Err(TxAcceptanceError::AlreadyExistsInMempool); } + // ensure transaction does not exist in the blockchain + if view.get_ledger_tx_exists(&tx_id)? { + return Err(TxAcceptanceError::AlreadyExistsInLedger); + } + + Ok(()) + })?; + + // Try to add the transaction to the mempool + db.read().await.update(|db| { let nullifiers: Vec<_> = tx .inner .nullifiers() @@ -130,24 +139,20 @@ impl MempoolSrv { .collect(); // ensure nullifiers do not exist in the mempool - for m_tx_id in view.get_txs_by_nullifiers(&nullifiers) { - if let Some(m_tx) = view.get_tx(m_tx_id)? { + for m_tx_id in db.get_txs_by_nullifiers(&nullifiers) { + if let Some(m_tx) = db.get_tx(m_tx_id)? { if m_tx.inner.gas_price() < tx.inner.gas_price() { - view.delete_tx(m_tx_id)?; + db.delete_tx(m_tx_id)?; } else { return Err( - TxAcceptanceError::NullifierExistsInMempool, + TxAcceptanceError::NullifierExistsInMempool.into(), ); } } } - // ensure transaction does not exist in the blockchain - if view.get_ledger_tx_exists(&tx_id)? { - return Err(TxAcceptanceError::AlreadyExistsInLedger); - } - - Ok(()) + // Persist transaction in mempool storage + db.add_tx(tx) })?; tracing::info!( @@ -155,9 +160,6 @@ impl MempoolSrv { hash = hex::encode(tx_id) ); - // Add transaction to the mempool - db.read().await.update(|db| db.add_tx(tx))?; - Ok(()) } } From d35870026d5bff67d8837542a381cc7c2bc7bf08 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 11:52:13 +0300 Subject: [PATCH 6/9] node: Remove warning triggered by pending_senders threshold --- node/src/network.rs | 30 +++--------------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/node/src/network.rs b/node/src/network.rs index 676c6fce3b..d4d91758e2 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -18,12 +18,10 @@ use node_data::message::Metadata; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; -use tracing::{error, info, trace, warn}; +use tracing::{error, info, trace}; mod frame; -const MAX_PENDING_SENDERS: u64 = 1000; - /// Number of alive peers randomly selected which a `flood_request` is sent to const REDUNDANCY_PEER_COUNT: usize = 8; @@ -33,35 +31,16 @@ type FilterList = [Option; N]; pub struct Listener { routes: Arc>>, filters: Arc>>, - - /// Number of awaiting senders. - pending_senders: Arc, } impl Listener { - fn reroute(&self, topic: u8, msg: Message) -> anyhow::Result<()> { - if self.pending_senders.fetch_add(1, Ordering::Relaxed) - >= MAX_PENDING_SENDERS - { - // High value of this field means either a message consumer is - // blocked or it's too slow on processing a wire msg - self.pending_senders.store(0, Ordering::Relaxed); - warn!("too many sender jobs: {}", MAX_PENDING_SENDERS); - } - - let counter = self.pending_senders.clone(); + fn reroute(&self, topic: u8, msg: Message) { let routes = self.routes.clone(); - - // Sender task tokio::spawn(async move { if let Some(Some(queue)) = routes.read().await.get(topic as usize) { queue.try_send(msg); }; - - counter.fetch_sub(1, Ordering::Relaxed); }); - - Ok(()) } fn call_filters( @@ -104,9 +83,7 @@ impl kadcast::NetworkListen for Listener { } // Reroute message to the upper layer - if let Err(e) = self.reroute(msg.topic().into(), msg) { - error!("could not reroute due to {e}"); - } + self.reroute(msg.topic().into(), msg); } Err(err) => { // Dump message blob and topic number @@ -144,7 +121,6 @@ impl Kadcast { let listener = Listener { routes: routes.clone(), filters: filters.clone(), - pending_senders: Arc::new(AtomicU64::new(0)), }; let peer = Peer::new(conf.clone(), listener)?; let public_addr = conf From 392f610ddf9e6260f6160d2d048a4b39408dd9bb Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 16:37:26 +0300 Subject: [PATCH 7/9] node: Impl Mempool::txs_count method --- node/src/database.rs | 3 +++ node/src/database/rocksdb.rs | 50 ++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/node/src/database.rs b/node/src/database.rs index f237fae60e..ecc0cfda03 100644 --- a/node/src/database.rs +++ b/node/src/database.rs @@ -146,6 +146,9 @@ pub trait Mempool { /// Get all transactions hashes. fn get_txs_ids(&self) -> Result>; + + /// Number of persisted transactions + fn txs_count(&self) -> usize; } pub trait Metadata { diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index d1cd6937ea..bcf03bc761 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -790,6 +790,12 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> { Ok(txs_list) } + + fn txs_count(&self) -> usize { + self.inner + .iterator_cf(self.mempool_cf, IteratorMode::Start) + .count() + } } pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> { @@ -1217,6 +1223,50 @@ mod tests { }); } + #[test] + fn test_txs_count() { + TestWrapper::new("test_txs_count").run(|path| { + let db: Backend = + Backend::create_or_open(path, DatabaseOptions::default()); + + const N: usize = 100; + const D: usize = 50; + + let txs: Vec<_> = (0..N) + .map(|i| ledger::faker::gen_dummy_tx(i as u64)) + .collect(); + + db.update(|db| { + assert_eq!(db.txs_count(), 0); + txs.iter() + .for_each(|t| db.add_tx(&t).expect("tx should be added")); + Ok(()) + }) + .unwrap(); + + db.update(|db| { + // Ensure txs count is equal to the number of added tx + assert_eq!(db.txs_count(), N); + + txs.iter().take(D).for_each(|tx| { + assert!(db + .delete_tx(tx.id()) + .expect("transaction should be deleted")); + }); + + Ok(()) + }) + .unwrap(); + + // Ensure txs count is updated after the deletion + db.update(|db| { + assert_eq!(db.txs_count(), N - D); + Ok(()) + }) + .unwrap(); + }); + } + #[test] fn test_max_gas_limit() { TestWrapper::new("test_block_size_limit").run(|path| { From 22a32de3bc6ba4ca1ecf49f8481d31b46e1ce90e Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 16:39:42 +0300 Subject: [PATCH 8/9] node: Limit number of txs in mempool with conf param --- node/src/mempool.rs | 9 +++++++++ node/src/mempool/conf.rs | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/node/src/mempool.rs b/node/src/mempool.rs index ba98672df2..cc60f864e0 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -29,6 +29,8 @@ enum TxAcceptanceError { NullifierExistsInMempool, #[error("this transaction is invalid {0}")] VerificationFailed(String), + #[error("Maximum count of transactions exceeded {0}")] + MaxTxnCountExceeded(usize), #[error("A generic error occurred {0}")] Generic(anyhow::Error), } @@ -41,6 +43,7 @@ impl From for TxAcceptanceError { pub struct MempoolSrv { inbound: AsyncQueue, + conf: Params, } impl MempoolSrv { @@ -51,6 +54,7 @@ impl MempoolSrv { conf.max_queue_size, "mempool_inbound", ), + conf, } } } @@ -116,6 +120,11 @@ impl MempoolSrv { // Perform basic checks on the transaction db.read().await.view(|view| { + let count = view.txs_count(); + if count >= self.conf.max_mempool_txn_count { + return Err(TxAcceptanceError::MaxTxnCountExceeded(count)); + } + // ensure transaction does not exist in the mempool if view.get_tx_exists(tx_id)? { return Err(TxAcceptanceError::AlreadyExistsInMempool); diff --git a/node/src/mempool/conf.rs b/node/src/mempool/conf.rs index 3c0fd453f3..22bbf2c754 100644 --- a/node/src/mempool/conf.rs +++ b/node/src/mempool/conf.rs @@ -9,13 +9,18 @@ use std::fmt::Formatter; #[derive(Serialize, Deserialize, Copy, Clone)] pub struct Params { + /// Number of pending to be processed transactions pub max_queue_size: usize, + + /// Maximum number of transactions that can be accepted/stored in mempool + pub max_mempool_txn_count: usize, } impl Default for Params { fn default() -> Self { Self { max_queue_size: 1000, + max_mempool_txn_count: 10_000, } } } From a9dd36a0f763b8b77ea6996e3abc995ae989dba0 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 8 Aug 2024 16:40:53 +0300 Subject: [PATCH 9/9] node: Call preverify(tx) after basic checks pass --- node/src/mempool.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/node/src/mempool.rs b/node/src/mempool.rs index cc60f864e0..62497a2b11 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -111,11 +111,6 @@ impl MempoolSrv { vm: &Arc>, tx: &Transaction, ) -> Result<(), TxAcceptanceError> { - // VM Preverify call - if let Err(e) = vm.read().await.preverify(tx) { - Err(TxAcceptanceError::VerificationFailed(format!("{e:?}")))?; - } - let tx_id = tx.id(); // Perform basic checks on the transaction @@ -138,6 +133,11 @@ impl MempoolSrv { Ok(()) })?; + // VM Preverify call + if let Err(e) = vm.read().await.preverify(tx) { + Err(TxAcceptanceError::VerificationFailed(format!("{e:?}")))?; + } + // Try to add the transaction to the mempool db.read().await.update(|db| { let nullifiers: Vec<_> = tx