Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Infallible pdu encoding #2079

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions node-data/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,9 @@ mod tests {
/// Asserts if encoding/decoding of a serializable type runs properly.
fn assert_serializable<S: Dummy<Faker> + Eq + Serializable>() {
let obj: S = Faker.fake();
let mut buf = vec![];
obj.write(&mut buf).expect("should be writable");
let data = obj.write_to_vec();

assert!(obj
.eq(&S::read(&mut &buf.to_vec()[..]).expect("should be readable")));
assert!(obj.eq(&S::read(&mut &data[..]).expect("should be readable")));
}

#[test]
Expand Down
5 changes: 2 additions & 3 deletions node-data/src/ledger/faults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ impl From<BlsSigError> for InvalidFault {
impl Fault {
/// Hash the serialized form
pub fn hash(&self) -> [u8; 32] {
let mut b = vec![];
self.write(&mut b).expect("Write to a vec shall not fail");
BlsScalar::hash_to_scalar(&b[..]).to_bytes()
let data = self.write_to_vec();
BlsScalar::hash_to_scalar(&data).to_bytes()
}

pub fn same(&self, other: &Fault) -> bool {
Expand Down
7 changes: 7 additions & 0 deletions node-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ pub trait Serializable {
where
Self: Sized;

fn write_to_vec(&self) -> Vec<u8> {
let mut buffer = vec![];
self.write(&mut buffer)
.expect("Writing to vec should succeed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that there are no cases where an encoding may fail due to a programmer mistake/edge case thus panicking ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you're right
I was too much tempted to remove those if let that I didn't take it into consideration

I'm going to move this back to draft and tackle it later in a better way

buffer
}

fn read_bytes<R: Read, const N: usize>(r: &mut R) -> io::Result<[u8; N]> {
let mut buffer = [0u8; N];
r.read_exact(&mut buffer)?;
Expand Down
8 changes: 3 additions & 5 deletions node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,7 @@ impl ConsensusHeader {
}

pub fn signable(&self) -> Vec<u8> {
let mut buf = vec![];
self.write(&mut buf).expect("Writing to vec should succeed");
buf
self.write_to_vec()
}
}

Expand Down Expand Up @@ -1387,8 +1385,8 @@ mod tests {
}

fn assert_serialize<S: Serializable + PartialEq + core::fmt::Debug>(v: S) {
let mut buf = vec![];
assert!(v.write(&mut buf).is_ok());
let buf = v.write_to_vec();

let dup = S::read(&mut &buf[..]).expect("deserialize is ok");
assert_eq!(
v,
Expand Down
4 changes: 1 addition & 3 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
};
}

if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to re-route message {e}");
}
network.read().await.broadcast(&msg).await;
},
// Handles accept_block_timeout event
_ = sleep_until(timeout) => {
Expand Down
4 changes: 1 addition & 3 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,9 +1009,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}

async fn broadcast<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
let _ = network.read().await.broadcast(msg).await.map_err(|err| {
warn!("Unable to broadcast msg: {:?} {err} ", msg.topic())
});
network.read().await.broadcast(msg).await
}

/// Performs full verification of block header against prev_block header where
Expand Down
5 changes: 1 addition & 4 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,7 @@ impl<DB: database::DB, VM: vm::VMExecution> Operations for Executor<DB, VM> {
metric.push_back(elapsed);
debug!(event = "avg_updated", ?step_name, metric = ?metric);

let mut bytes = Vec::new();
metric.write(&mut bytes)?;

t.op_write(db_key, bytes)
t.op_write(db_key, metric.write_to_vec())
})
.map_err(Error::MetricsUpdate)?;

Expand Down
30 changes: 8 additions & 22 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;
use std::{sync::Arc, time::SystemTime};
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};

const MAX_BLOCKS_TO_REQUEST: i16 = 50;
const EXPIRY_TIMEOUT_MILLIS: i16 = 5000;
Expand Down Expand Up @@ -122,15 +122,11 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
let get_blocks = Message::new_get_blocks(GetBlocks {
locator: last_finalized.header().hash,
});
if let Err(e) = self
.network
self.network
.read()
.await
.send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR)
.await
{
warn!("Unable to request GetBlocks {e}");
}
.await;
} else {
error!("could not request blocks");
}
Expand Down Expand Up @@ -674,14 +670,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
let req = GetResource::new(inv, this_peer, u64::MAX, 1);
debug!(event = "request block by height", ?req, ?peer_addr);

if let Err(err) = network
network
.read()
.await
.send_to_peer(&Message::new_get_resource(req), peer_addr)
.await
{
warn!("could not request block {err}")
}
}

async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
Expand Down Expand Up @@ -747,15 +740,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
// Request missing blocks from source peer
let gb_msg = Message::new_get_blocks(GetBlocks { locator });

if let Err(e) = self
.network
self.network
.read()
.await
.send_to_peer(&gb_msg, dest_addr)
.await
{
warn!("Unable to send GetBlocks: {e}")
};
.await;

// add to the pool
let key = blk.header().height;
Expand Down Expand Up @@ -873,12 +862,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
async fn flood_request<N: Network>(network: &Arc<RwLock<N>>, inv: &Inv) {
debug!(event = "flood_request", ?inv);

if let Err(err) = network
network
.read()
.await
.flood_request(inv, None, DEFAULT_HOPS_LIMIT)
.await
{
warn!("could not request block {err}")
};
.await;
}
3 changes: 1 addition & 2 deletions node/src/chain/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ mod tests {
assert_eq!(metric.average().expect("positive number"), expected);

// Marshal/Unmarshal
let mut buf = Vec::new();
metric.write(&mut buf).expect("all written");
let buf = metric.write_to_vec();

assert_eq!(
AverageElapsedTime::read(&mut &buf[..])
Expand Down
27 changes: 11 additions & 16 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,17 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
{
let cf = self.ledger_cf;

let mut buf = vec![];
LightBlock {
let block = LightBlock {
header: header.clone(),
transactions_ids: txs
.iter()
.map(|t| t.inner.id())
.collect::<Vec<[u8; 32]>>(),

faults_ids: faults.iter().map(|f| f.hash()).collect::<Vec<_>>(),
}
.write(&mut buf)?;
faults_ids: faults.iter().map(|f| f.hash()).collect(),
};

self.put_cf(cf, header.hash, buf)?;
self.put_cf(cf, header.hash, block.write_to_vec())?;
}

// Update metadata values
Expand All @@ -315,8 +313,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {

// store all block transactions
for tx in txs {
let mut d = vec![];
tx.write(&mut d)?;
let d = tx.write_to_vec();
self.put_cf(cf, tx.inner.id(), d)?;
}
}
Expand All @@ -327,9 +324,9 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {

// store all block faults
for f in faults {
let mut d = vec![];
f.write(&mut d)?;
self.put_cf(cf, f.hash(), d)?;
let key = f.hash();
let value = f.write_to_vec();
self.put_cf(cf, key, value)?;
}
}
self.store_block_label(header.height, &header.hash, label)?;
Expand Down Expand Up @@ -570,8 +567,7 @@ impl<'db, DB: DBAccess> Candidate for DBTransaction<'db, DB> {
/// Returns `Ok(())` if the block is successfully stored, or an error if the
/// operation fails.
fn store_candidate_block(&self, b: ledger::Block) -> Result<()> {
let mut serialized = vec![];
b.write(&mut serialized)?;
let serialized = b.write_to_vec();

self.inner
.put_cf(self.candidates_cf, b.header().hash, serialized)?;
Expand Down Expand Up @@ -681,10 +677,9 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()> {
// Map Hash to serialized transaction
let mut tx_data = vec![];
tx.write(&mut tx_data)?;

let tx_data = tx.write_to_vec();
let hash = tx.id();

self.put_cf(self.mempool_cf, hash, tx_data)?;

// Add Secondary indexes //
Expand Down
5 changes: 1 addition & 4 deletions node/src/databroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
// Send response
let net = network.read().await;
for msg in resp.msgs {
let send = net.send_to_peer(&msg, resp.recv_peer);
if let Err(e) = send.await {
warn!("Unable to send_to_peer {e}")
};
net.send_to_peer(&msg, resp.recv_peer).await;

// Mitigate pressure on UDP buffers.
// Needed only in localnet.
Expand Down
12 changes: 4 additions & 8 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,25 @@ pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
#[async_trait]
pub trait Network: Send + Sync + 'static {
/// Broadcasts a fire-and-forget message.
async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
async fn broadcast(&self, msg: &Message);

/// Broadcasts a request message
async fn flood_request(
&self,
msg_inv: &Inv,
ttl_as_sec: Option<u64>,
hops_limit: u16,
) -> anyhow::Result<()>;
);

/// Sends a message to a specified peer.
async fn send_to_peer(
&self,
msg: &Message,
peer_addr: std::net::SocketAddr,
) -> anyhow::Result<()>;
);

/// Sends to random set of alive peers.
async fn send_to_alive_peers(
&self,
msg: &Message,
amount: usize,
) -> anyhow::Result<()>;
async fn send_to_alive_peers(&self, msg: &Message, amount: usize);

/// Routes any message of the specified type to this queue.
async fn add_route(
Expand Down
7 changes: 2 additions & 5 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use node_data::message::{AsyncQueue, Payload, Topics};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use tracing::{error, info};

const TOPICS: &[u8] = &[Topics::Tx as u8];

Expand Down Expand Up @@ -83,10 +83,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
continue;
}

let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
network.read().await.broadcast(&msg).await;
}
_ => error!("invalid inbound message payload"),
}
Expand Down
Loading
Loading