Skip to content

Commit

Permalink
Fix 'reserved' field compatibility
Browse files Browse the repository at this point in the history
Resolves #137
  • Loading branch information
herr-seppia committed Apr 9, 2024
1 parent d434062 commit dedc023
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Change `BinaryID::from_nonce` to return result [#136]
- Change maintainer to ping nodes while removal [#138]

### Fixed

- Fix 'reserved' field compatibility [#137]

## [0.6.0] - 2023-11-01

### Added
Expand Down Expand Up @@ -138,6 +142,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#123]: https://github.com/dusk-network/kadcast/issues/123
[#135]: https://github.com/dusk-network/kadcast/issues/135
[#136]: https://github.com/dusk-network/kadcast/issues/136
[#137]: https://github.com/dusk-network/kadcast/issues/137
[#138]: https://github.com/dusk-network/kadcast/issues/138

<!-- Releases -->
Expand Down
17 changes: 13 additions & 4 deletions src/encoding/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use std::io::{self, Error, ErrorKind, Read, Write};
use super::Marshallable;
use crate::{kbucket::BinaryID, K_ID_LEN_BYTES, K_NONCE_LEN};

#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Clone)]
pub struct Header {
pub(crate) binary_id: BinaryID,
pub(crate) sender_port: u16,
pub(crate) network_id: u8,
pub(crate) reserved: [u8; 2],
pub(crate) reserved: Vec<u8>,
}

impl Header {
Expand All @@ -32,6 +32,7 @@ impl Marshallable for Header {
writer.write_all(self.binary_id.nonce())?;
writer.write_all(&self.sender_port.to_le_bytes())?;
writer.write_all(&[self.network_id])?;
writer.write_all(&(self.reserved.len() as u16).to_le_bytes())?;
writer.write_all(&self.reserved)?;
Ok(())
}
Expand All @@ -55,8 +56,16 @@ impl Marshallable for Header {
reader.read_exact(&mut network_id)?;
let network_id = network_id[0];

let mut reserved = [0; 2];
reader.read_exact(&mut reserved)?;
let mut reserved_len = [0; 2];
reader.read_exact(&mut reserved_len)?;
let reserved_len = u16::from_le_bytes(reserved_len);
let reserved = if reserved_len > 0 {
let mut reserved = vec![0u8; reserved_len as usize];
reader.read_exact(&mut reserved)?;
reserved
} else {
vec![]
};

Ok(Header {
binary_id,
Expand Down
14 changes: 9 additions & 5 deletions src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub(crate) struct MessageHandler {
}

impl MessageHandler {
fn my_header(&self) -> Header {
self.my_header.clone()
}

async fn new(
ktable: RwLock<Tree<PeerInfo>>,
outbound_sender: Sender<MessageBeanOut>,
Expand Down Expand Up @@ -142,7 +146,7 @@ impl MessageHandler {
if let Some(pending) = result.pending_eviction() {
self.outbound_sender
.send((
Message::Ping(self.my_header),
Message::Ping(self.my_header()),
vec![*pending.value().address()],
))
.await
Expand Down Expand Up @@ -173,7 +177,7 @@ impl MessageHandler {

async fn handle_ping(&self, remote_node_addr: SocketAddr) {
self.outbound_sender
.send((Message::Pong(self.my_header), vec![remote_node_addr]))
.send((Message::Pong(self.my_header()), vec![remote_node_addr]))
.await
.unwrap_or_else(|e| error!("Unable to send Pong {e}"));
}
Expand All @@ -190,7 +194,7 @@ impl MessageHandler {
.closest_peers::<K_K>(target)
.map(|p| p.as_peer_info())
.collect();
let message = Message::Nodes(self.my_header, NodePayload { peers });
let message = Message::Nodes(self.my_header(), NodePayload { peers });
self.outbound_sender
.send((message, vec![remote_node_addr]))
.await
Expand Down Expand Up @@ -221,7 +225,7 @@ impl MessageHandler {
})
.map(|n| {
(
(self.nodes_reply_fn)(self.my_header, n.id),
(self.nodes_reply_fn)(self.my_header(), n.id),
vec![n.to_socket_address()],
)
})
Expand Down Expand Up @@ -271,7 +275,7 @@ impl MessageHandler {
height,
gossip_frame,
};
let msg = Message::Broadcast(self.my_header, payload);
let msg = Message::Broadcast(self.my_header(), payload);
let targets =
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Peer {
.extract(height)
.map(|(height, nodes)| {
let msg = Message::Broadcast(
self.header,
self.header.clone(),
BroadcastPayload {
height,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Peer {
// We use the Broadcast message type while setting height to 0
// to prevent further propagation at the receiver
let msg = Message::Broadcast(
self.header,
self.header.clone(),
BroadcastPayload {
height: 0,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
Expand Down
10 changes: 7 additions & 3 deletions src/maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ impl TableMaintainer {
info!("TableMaintainer::contact_bootstrappers");
let bootstrapping_nodes_addr = self.bootstrapping_nodes_addr();
let binary_key = self.header.binary_id().as_binary();
let find_nodes = Message::FindNodes(self.header, *binary_key);
let find_nodes =
Message::FindNodes(self.header.clone(), *binary_key);
self.send((find_nodes, bootstrapping_nodes_addr)).await;
tokio::time::sleep(Duration::from_secs(30)).await;
}
Expand Down Expand Up @@ -119,7 +120,7 @@ impl TableMaintainer {
.idle_nodes()
.map(|n| *n.value().address())
.collect();
self.send((Message::Ping(self.header), idles)).await;
self.send((Message::Ping(self.header.clone()), idles)).await;
self.ktable.write().await.remove_idle_nodes();
}

Expand All @@ -133,7 +134,10 @@ impl TableMaintainer {
.flat_map(|(_, idle_nodes)| idle_nodes)
.map(|target| {
(
Message::FindNodes(self.header, *target.id().as_binary()),
Message::FindNodes(
self.header.clone(),
*target.id().as_binary(),
),
//TODO: Extract alpha nodes
vec![*target.value().address()],
)
Expand Down
2 changes: 1 addition & 1 deletion src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl PeerNode {
Header {
binary_id: *self.id(),
sender_port: self.value().address.port(),
reserved: [0; 2],
reserved: vec![],
network_id: self.network_id,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transport/encoding/raptorq/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Encoder for RaptorQEncoder {
let mut packet_with_uid = base_packet.clone();
packet_with_uid.append(&mut encoded_packet.serialize());
Message::Broadcast(
header,
header.clone(),
BroadcastPayload {
height: payload.height,
gossip_frame: packet_with_uid,
Expand Down

0 comments on commit dedc023

Please sign in to comment.