Skip to content

Commit

Permalink
Merge pull request #2091 from dusk-network/report_issues
Browse files Browse the repository at this point in the history
node: Address a subset of reported issues
  • Loading branch information
goshawk-3 authored Aug 9, 2024
2 parents 904ca44 + a9dd36a commit dc0e3fc
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 58 deletions.
9 changes: 5 additions & 4 deletions consensus/src/proposal/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ impl<D: Database> ProposalHandler<D> {
expected_generator: &PublicKeyBytes,
) -> Result<(), ConsensusError> {
let p = Self::unwrap_msg(msg)?;

if expected_generator != p.sign_info.signer.bytes() {
return Err(ConsensusError::NotCommitteeMember);
}

// Verify new_block msg signature
p.verify_signature()?;

Expand All @@ -104,10 +109,6 @@ impl<D: Database> ProposalHandler<D> {
return Err(ConsensusError::InvalidBlock);
}

if expected_generator != p.sign_info.signer.bytes() {
return Err(ConsensusError::NotCommitteeMember);
}

Ok(())
}

Expand Down
9 changes: 3 additions & 6 deletions consensus/src/user/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,9 @@ impl Committee {

debug_assert!(self.members.len() <= mem::size_of_val(&bits) * 8);

for (pk, _) in voters.iter() {
for (pos, (member_pk, _)) in self.members.iter().enumerate() {
if member_pk.eq(pk) {
bits |= 1 << pos; // flip the i-th bit to 1
break;
}
for (pos, (member_pk, _)) in self.members.iter().enumerate() {
if voters.contains_key(member_pk) {
bits |= 1 << pos; // flip the i-th bit to 1
}
}

Expand Down
3 changes: 3 additions & 0 deletions node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ pub trait Mempool {

/// Get all transactions hashes.
fn get_txs_ids(&self) -> Result<Vec<[u8; 32]>>;

/// Number of persisted transactions
fn txs_count(&self) -> usize;
}

pub trait Metadata {
Expand Down
50 changes: 50 additions & 0 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,12 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {

Ok(txs_list)
}

fn txs_count(&self) -> usize {
self.inner
.iterator_cf(self.mempool_cf, IteratorMode::Start)
.count()
}
}

pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
Expand Down Expand Up @@ -1217,6 +1223,50 @@ mod tests {
});
}

#[test]
fn test_txs_count() {
TestWrapper::new("test_txs_count").run(|path| {
let db: Backend =
Backend::create_or_open(path, DatabaseOptions::default());

const N: usize = 100;
const D: usize = 50;

let txs: Vec<_> = (0..N)
.map(|i| ledger::faker::gen_dummy_tx(i as u64))
.collect();

db.update(|db| {
assert_eq!(db.txs_count(), 0);
txs.iter()
.for_each(|t| db.add_tx(&t).expect("tx should be added"));
Ok(())
})
.unwrap();

db.update(|db| {
// Ensure txs count is equal to the number of added tx
assert_eq!(db.txs_count(), N);

txs.iter().take(D).for_each(|tx| {
assert!(db
.delete_tx(tx.id())
.expect("transaction should be deleted"));
});

Ok(())
})
.unwrap();

// Ensure txs count is updated after the deletion
db.update(|db| {
assert_eq!(db.txs_count(), N - D);
Ok(())
})
.unwrap();
});
}

#[test]
fn test_max_gas_limit() {
TestWrapper::new("test_block_size_limit").run(|path| {
Expand Down
5 changes: 5 additions & 0 deletions node/src/databroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ impl DataBrokerSrv {
max_entries: usize,
requester_addr: SocketAddr,
) -> Result<Message> {
let mut max_entries = max_entries;
if m.max_entries > 0 {
max_entries = min(max_entries, m.max_entries as usize);
}

let inv = db.read().await.view(|t| {
let mut inv = payload::Inv::default();
for i in &m.inv_list {
Expand Down
49 changes: 30 additions & 19 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ enum TxAcceptanceError {
NullifierExistsInMempool,
#[error("this transaction is invalid {0}")]
VerificationFailed(String),
#[error("Maximum count of transactions exceeded {0}")]
MaxTxnCountExceeded(usize),
#[error("A generic error occurred {0}")]
Generic(anyhow::Error),
}
Expand All @@ -41,6 +43,7 @@ impl From<anyhow::Error> for TxAcceptanceError {

pub struct MempoolSrv {
inbound: AsyncQueue<Message>,
conf: Params,
}

impl MempoolSrv {
Expand All @@ -51,6 +54,7 @@ impl MempoolSrv {
conf.max_queue_size,
"mempool_inbound",
),
conf,
}
}
}
Expand Down Expand Up @@ -107,21 +111,35 @@ impl MempoolSrv {
vm: &Arc<RwLock<VM>>,
tx: &Transaction,
) -> Result<(), TxAcceptanceError> {
// VM Preverify call
if let Err(e) = vm.read().await.preverify(tx) {
Err(TxAcceptanceError::VerificationFailed(format!("{e:?}")))?;
}

let tx_id = tx.id();

// Perform basic checks on the transaction
db.read().await.view(|view| {
// ensure transaction does not exist in the mempool
let count = view.txs_count();
if count >= self.conf.max_mempool_txn_count {
return Err(TxAcceptanceError::MaxTxnCountExceeded(count));
}

// ensure transaction does not exist in the mempool
if view.get_tx_exists(tx_id)? {
return Err(TxAcceptanceError::AlreadyExistsInMempool);
}

// ensure transaction does not exist in the blockchain
if view.get_ledger_tx_exists(&tx_id)? {
return Err(TxAcceptanceError::AlreadyExistsInLedger);
}

Ok(())
})?;

// VM Preverify call
if let Err(e) = vm.read().await.preverify(tx) {
Err(TxAcceptanceError::VerificationFailed(format!("{e:?}")))?;
}

// Try to add the transaction to the mempool
db.read().await.update(|db| {
let nullifiers: Vec<_> = tx
.inner
.nullifiers()
Expand All @@ -130,34 +148,27 @@ impl MempoolSrv {
.collect();

// ensure nullifiers do not exist in the mempool
for m_tx_id in view.get_txs_by_nullifiers(&nullifiers) {
if let Some(m_tx) = view.get_tx(m_tx_id)? {
for m_tx_id in db.get_txs_by_nullifiers(&nullifiers) {
if let Some(m_tx) = db.get_tx(m_tx_id)? {
if m_tx.inner.gas_price() < tx.inner.gas_price() {
view.delete_tx(m_tx_id)?;
db.delete_tx(m_tx_id)?;
} else {
return Err(
TxAcceptanceError::NullifierExistsInMempool,
TxAcceptanceError::NullifierExistsInMempool.into(),
);
}
}
}

// ensure transaction does not exist in the blockchain
if view.get_ledger_tx_exists(&tx_id)? {
return Err(TxAcceptanceError::AlreadyExistsInLedger);
}

Ok(())
// Persist transaction in mempool storage
db.add_tx(tx)
})?;

tracing::info!(
event = "transaction accepted",
hash = hex::encode(tx_id)
);

// Add transaction to the mempool
db.read().await.update(|db| db.add_tx(tx))?;

Ok(())
}
}
5 changes: 5 additions & 0 deletions node/src/mempool/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ use std::fmt::Formatter;

#[derive(Serialize, Deserialize, Copy, Clone)]
pub struct Params {
/// Number of pending to be processed transactions
pub max_queue_size: usize,

/// Maximum number of transactions that can be accepted/stored in mempool
pub max_mempool_txn_count: usize,
}

impl Default for Params {
fn default() -> Self {
Self {
max_queue_size: 1000,
max_mempool_txn_count: 10_000,
}
}
}
Expand Down
30 changes: 3 additions & 27 deletions node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ use node_data::message::Metadata;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::{error, info, trace, warn};
use tracing::{error, info, trace};

mod frame;

const MAX_PENDING_SENDERS: u64 = 1000;

/// Number of alive peers randomly selected which a `flood_request` is sent to
const REDUNDANCY_PEER_COUNT: usize = 8;

Expand All @@ -33,35 +31,16 @@ type FilterList<const N: usize> = [Option<BoxedFilter>; N];
pub struct Listener<const N: usize> {
routes: Arc<RwLock<RoutesList<N>>>,
filters: Arc<RwLock<FilterList<N>>>,

/// Number of awaiting senders.
pending_senders: Arc<AtomicU64>,
}

impl<const N: usize> Listener<N> {
fn reroute(&self, topic: u8, msg: Message) -> anyhow::Result<()> {
if self.pending_senders.fetch_add(1, Ordering::Relaxed)
>= MAX_PENDING_SENDERS
{
// High value of this field means either a message consumer is
// blocked or it's too slow on processing a wire msg
self.pending_senders.store(0, Ordering::Relaxed);
warn!("too many sender jobs: {}", MAX_PENDING_SENDERS);
}

let counter = self.pending_senders.clone();
fn reroute(&self, topic: u8, msg: Message) {
let routes = self.routes.clone();

// Sender task
tokio::spawn(async move {
if let Some(Some(queue)) = routes.read().await.get(topic as usize) {
queue.try_send(msg);
};

counter.fetch_sub(1, Ordering::Relaxed);
});

Ok(())
}

fn call_filters(
Expand Down Expand Up @@ -104,9 +83,7 @@ impl<const N: usize> kadcast::NetworkListen for Listener<N> {
}

// Reroute message to the upper layer
if let Err(e) = self.reroute(msg.topic().into(), msg) {
error!("could not reroute due to {e}");
}
self.reroute(msg.topic().into(), msg);
}
Err(err) => {
// Dump message blob and topic number
Expand Down Expand Up @@ -144,7 +121,6 @@ impl<const N: usize> Kadcast<N> {
let listener = Listener {
routes: routes.clone(),
filters: filters.clone(),
pending_senders: Arc::new(AtomicU64::new(0)),
};
let peer = Peer::new(conf.clone(), listener)?;
let public_addr = conf
Expand Down
11 changes: 9 additions & 2 deletions node/src/network/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use node_data::message::Message;
use node_data::Serializable;
use std::io::{self, Read, Write};
use std::io::{self, ErrorKind, Read, Write};

const PROTOCOL_VERSION: [u8; 8] = [0, 0, 0, 0, 1, 0, 0, 0];

Expand Down Expand Up @@ -46,8 +46,15 @@ impl Pdu {
Self: Sized,
{
let header = Header::read(r)?;
let payload = Message::read(r)?;

let mut payload_buf = vec![];
r.read_to_end(&mut payload_buf)?;

if header.checksum != calc_checksum(&payload_buf[..]) {
return Err(io::Error::new(ErrorKind::Other, "Checksum is wrong"));
}

let payload = Message::read(&mut &payload_buf[..])?;
Ok(Pdu { header, payload })
}
}
Expand Down

0 comments on commit dc0e3fc

Please sign in to comment.