Skip to content

Commit

Permalink
Merge pull request #89 from dusk-network/fix-87
Browse files Browse the repository at this point in the history
Handle and try to decode chunks in a separate tokio task
  • Loading branch information
goshawk-3 authored Jan 7, 2022
2 parents c794ead + 13f1d77 commit 8a0124e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 39 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

## [0.3.0] - 07-01-22
### Added

- Add network transport configuration [#72] [#76]
Expand All @@ -15,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add configurable FEC redundancy [#82]
- Add configurable UDP send interval [#83]
- Add UDP network tweak configuration [#86]
- Add dedicated tokio task to handle and decode chunks [#87]
- Add logs to pending RwLock [#92]

### Fixed
Expand Down Expand Up @@ -64,5 +64,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#80]: https://github.com/dusk-network/kadcast/issues/80
[#82]: https://github.com/dusk-network/kadcast/issues/82
[#83]: https://github.com/dusk-network/kadcast/issues/83
[#87]: https://github.com/dusk-network/kadcast/issues/87
[#90]: https://github.com/dusk-network/kadcast/issues/90
[#92]: https://github.com/dusk-network/kadcast/issues/92
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl Peer {
listen_address,
outbound_channel_rx,
transport_conf,
channel_size,
);
tokio::spawn(TableMantainer::start(
bootstrapping_nodes,
Expand Down
110 changes: 73 additions & 37 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use socket2::SockRef;
use tokio::{
io,
net::UdpSocket,
sync::mpsc::{Receiver, Sender},
sync::mpsc::{self, Receiver, Sender},
time::{self},
};
use tracing::*;
Expand All @@ -27,6 +27,7 @@ use crate::{
};
pub(crate) type MessageBeanOut = (Message, Vec<SocketAddr>);
pub(crate) type MessageBeanIn = (Message, SocketAddr);
type UDPChunk = (Vec<u8>, SocketAddr);

const MAX_DATAGRAM_SIZE: usize = 65_507;
pub(crate) struct WireNetwork {}
Expand All @@ -40,76 +41,111 @@ impl WireNetwork {
listen_address: String,
outbound_channel_rx: Receiver<MessageBeanOut>,
conf: HashMap<String, String>,
channel_size: usize,
) {
let listen_address = listen_address
.parse()
.expect("Unable to parse public_ip address");
let c = conf.clone();
tokio::spawn(async move {
WireNetwork::listen_out(outbound_channel_rx, &c)
WireNetwork::listen_out(outbound_channel_rx, &conf)
.await
.unwrap_or_else(|op| error!("Error in listen_out {:?}", op));
});

let (dec_chan_tx, dec_chan_rx) = mpsc::channel(channel_size);

let c1 = c.clone();
tokio::spawn(async move {
WireNetwork::listen_in(
listen_address,
inbound_channel_tx.clone(),
&conf,
)
.await
.unwrap_or_else(|op| error!("Error in listen_in {:?}", op));
WireNetwork::decode(inbound_channel_tx.clone(), dec_chan_rx, &c)
.await
.unwrap_or_else(|op| error!("Error in decode {:?}", op));
});

tokio::spawn(async move {
WireNetwork::listen_in(listen_address, dec_chan_tx.clone(), &c1)
.await
.unwrap_or_else(|op| error!("Error in listen_in {:?}", op));
});
}

async fn listen_in(
listen_address: SocketAddr,
inbound_channel_tx: Sender<MessageBeanIn>,
dec_chan_tx: Sender<UDPChunk>,
conf: &HashMap<String, String>,
) -> io::Result<()> {
debug!("WireNetwork::listen_in started");
let mut decoder = TransportDecoder::configure(conf);

let socket = UdpSocket::bind(listen_address)
.await
.expect("Unable to bind address");
WireNetwork::configure_socket(&socket, conf)?;
info!("Listening on: {}", socket.local_addr()?);

// Try to extend socket recv buffer size
WireNetwork::configure_socket(&socket, conf)?;

// Read UDP socket recv buffer and delegate the processing to decode
// task
loop {
let mut bytes = [0; MAX_DATAGRAM_SIZE];
let (_, remote_address) =
let (len, remote_address) =
socket.recv_from(&mut bytes).await.map_err(|e| {
error!("Error receiving from socket {}", e);
e
})?;

match Message::unmarshal_binary(&mut &bytes[..]) {
Ok(deser) => {
debug!("> Received raw message {}", deser.type_byte());
let to_process = decoder.decode(deser);
if let Some(message) = to_process {
let valid_header = PeerNode::verify_header(
message.header(),
&remote_address.ip(),
);
match valid_header {
true => {
inbound_channel_tx
.send((message, remote_address))
.await
.unwrap_or_else(
|op| error!("Unable to send to inbound channel {:?}", op),
dec_chan_tx
.send((bytes[0..len].to_vec(), remote_address))
.await
.unwrap_or_else(|op| {
error!("Unable to send to dec_chan_tx channel {:?}", op)
});
}
}

async fn decode(
inbound_channel_tx: Sender<MessageBeanIn>,
mut dec_chan_rx: Receiver<UDPChunk>,
conf: &HashMap<String, String>,
) -> io::Result<()> {
debug!("WireNetwork::decode started");
let mut decoder = TransportDecoder::configure(conf);

loop {
if let Some((message, remote_address)) = dec_chan_rx.recv().await {
match Message::unmarshal_binary(&mut &message[..]) {
Ok(deser) => {
debug!("> Received raw message {}", deser.type_byte());
let to_process = decoder.decode(deser);
if let Some(message) = to_process {
let valid_header = PeerNode::verify_header(
message.header(),
&remote_address.ip(),
);
match valid_header {
true => {
inbound_channel_tx
.send((message, remote_address))
.await
.unwrap_or_else(
|op| error!("Unable to send to inbound channel {:?}", op),
);
}
false => {
error!(
"Invalid Id {:?} - {}",
message.header(),
&remote_address.ip()
);
}
false => {
error!(
"Invalid Id {:?} - {}",
message.header(),
&remote_address.ip()
);
}
}
}
}
Err(e) => error!(
"Error deser from {:?} - {} - {}",
message, remote_address, e
),
}
Err(e) => error!("Error deser from {} - {}", remote_address, e),
}
}
}
Expand Down

0 comments on commit 8a0124e

Please sign in to comment.