Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Broadcasting #22

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ categories = ["development-tools::testing", "network-programming", "simulation"]
readme = "README.md"

[dependencies]
libc = "=0.2.41"
libc = "=0.2.42"
unwrap = "1.1"
get_if_addrs = "0.5"
future-utils = "0.8.0"
Expand Down
51 changes: 51 additions & 0 deletions examples/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! This example demonstrates how to simulate LAN and broadcast packets to all machines on it.

extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate net_literals;
extern crate netsim;
extern crate tokio_core;
#[macro_use]
extern crate unwrap;

use netsim::{node, Ipv4Range, Network};
use tokio_core::reactor::Core;

use std::net::{SocketAddr, SocketAddrV4, UdpSocket};

fn main() {
unwrap!(env_logger::init());

let mut evloop = unwrap!(Core::new());
let network = Network::new(&evloop.handle());

let recipe1 = node::ipv4::machine(|ip| {
println!("[machine1] ip = {}", ip);

let bind_addr = SocketAddr::V4(SocketAddrV4::new(ip, 5000));
let sock = unwrap!(UdpSocket::bind(bind_addr));

let mut buf = [0; 4096];
let (_bytes_received, addr) = unwrap!(sock.recv_from(&mut buf));
println!(
"[machine1] received: {}, from: {}",
unwrap!(String::from_utf8(buf.to_vec())), addr
);
});

let recipe2 = node::ipv4::machine(|ip| {
println!("[machine2] ip = {}", ip);

let sock = unwrap!(UdpSocket::bind("0.0.0.0:0"));
unwrap!(sock.set_broadcast(true));
let broadcast_addr = addr!("255.255.255.255:5000");
let _ = unwrap!(sock.send_to(b"hello world!", broadcast_addr));
});

let router_recipe = node::ipv4::router((recipe1, recipe2));
let (spawn_complete, _ipv4_plug) =
network.spawn_ipv4_tree(Ipv4Range::local_subnet_192(1), router_recipe);

evloop.run(spawn_complete).unwrap();
}
5 changes: 2 additions & 3 deletions examples/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate futures;

use futures::Future;
use futures::sync::oneshot;
use netsim::{node, spawn, Ipv4Range, Network};
use netsim::{node, Ipv4Range, Network};
use tokio_core::reactor::Core;

use std::net::{SocketAddr, SocketAddrV4, UdpSocket};
Expand Down Expand Up @@ -40,8 +40,7 @@ fn main() {
});

let router_recipe = node::ipv4::router((server_recipe, client_recipe));
let (spawn_complete, _ipv4_plug) =
spawn::ipv4_tree(&network.handle(), Ipv4Range::global(), router_recipe);
let (spawn_complete, _ipv4_plug) = network.spawn_ipv4_tree(Ipv4Range::global(), router_recipe);

evloop.run(spawn_complete).unwrap();
}
271 changes: 236 additions & 35 deletions src/device/ipv4/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,22 @@ impl Ipv4RouterBuilder {

/// Connects a bunch of Ipv4 networks and routes packets between them.
pub struct Ipv4Router {
rxs: Vec<Ipv4Receiver>,
// the only reason it's optional, is to make the borrow checker happy
rxs: Option<Vec<Ipv4Receiver>>,
txs: Vec<(Ipv4Sender, Vec<Ipv4Route>)>,
ipv4_addr: Ipv4Addr,
}

impl Ipv4Router {
/// Create a new router with the given connections. You can also use `Ipv4RouterBuilder` to add
/// connections individually.
/// Create a new router with the given IP address and connections. You can also use
/// `Ipv4RouterBuilder` to add connections individually.
pub fn new(ipv4_addr: Ipv4Addr, connections: Vec<(Ipv4Plug, Vec<Ipv4Route>)>) -> Ipv4Router {
let mut rxs = Vec::with_capacity(connections.len());
let mut txs = Vec::with_capacity(connections.len());
for (plug, routes) in connections {
let (tx, rx) = plug.split();
rxs.push(rx);
txs.push((tx, routes));
let (rxs, txs) = split_conn_plugs(connections);
Ipv4Router {
rxs: Some(rxs),
txs,
ipv4_addr,
}

Ipv4Router { rxs, txs, ipv4_addr }
}

/// Create a new `Ipv4Router`, spawning it directly onto the tokio event loop.
Expand All @@ -66,6 +64,56 @@ impl Ipv4Router {
let router_v4 = Ipv4Router::new(ipv4_addr, connections);
handle.spawn(router_v4.infallible());
}

/// Checks if given packet, is destined to the router itself.
fn is_packet_to_me(&self, packet: &Ipv4Packet) -> bool {
packet.dest_ip() == self.ipv4_addr
}

/// Find a plug for given packet by it's destination address and send the packet.
/// Returns true if packet was sent, false otherwise.
fn send_packet(&mut self, packet: Ipv4Packet) -> bool {
let mut packets_sent = 0;

'all_tx_loop: for &mut (ref mut tx, ref routes) in &mut self.txs {
for route in routes {
let route_dest = route.destination();
let packet_is_broadcast = route_dest.is_broadcast(packet.dest_ip());
if route_dest.contains(packet.dest_ip()) || packet_is_broadcast {
info!(
"router {} routing packet on route {:?} {:?}",
self.ipv4_addr, route, packet,
);
tx.unbounded_send(packet.clone());
packets_sent += 1;

// Terminate only, if this is a regular packet. Broadcast packets are sent to
// multiple targets.
if !packet_is_broadcast {
break 'all_tx_loop;
}
}
}
}

if packets_sent == 0 {
info!("router {} dropping unroutable packet {:?}", self.ipv4_addr, packet);
}
packets_sent > 0
}
}

fn split_conn_plugs(
connections: Vec<(Ipv4Plug, Vec<Ipv4Route>)>,
) -> (Vec<Ipv4Receiver>, Vec<(Ipv4Sender, Vec<Ipv4Route>)>) {
let mut rxs = Vec::with_capacity(connections.len());
let mut txs = Vec::with_capacity(connections.len());
for (plug, routes) in connections {
let (tx, rx) = plug.split();
rxs.push(rx);
txs.push((tx, routes));
}
(rxs, txs)
}

impl Future for Ipv4Router {
Expand All @@ -74,34 +122,21 @@ impl Future for Ipv4Router {

fn poll(&mut self) -> Result<Async<()>, Void> {
let mut all_disconnected = true;
for rx in &mut self.rxs {
all_disconnected &= {
'next_packet: loop {
match rx.poll().void_unwrap() {
Async::NotReady => break false,
Async::Ready(None) => break true,
Async::Ready(Some(packet)) => {
let dest_ip = packet.dest_ip();
if dest_ip == self.ipv4_addr {
continue;
}

for &mut (ref mut tx, ref routes) in &mut self.txs {
for route in routes {
if route.destination().contains(dest_ip) {
info!("router {} routing packet on route {:?} {:?}", self.ipv4_addr, route, packet);
tx.unbounded_send(packet);
continue 'next_packet;
}
}
}

info!("router {} dropping unroutable packet {:?}", self.ipv4_addr, packet);
},
let mut rxs = unwrap!(self.rxs.take());
for rx in &mut rxs {
all_disconnected &= loop {
match rx.poll().void_unwrap() {
Async::NotReady => break false,
Async::Ready(None) => break true,
Async::Ready(Some(packet)) => {
if !self.is_packet_to_me(&packet) {
let _ = self.send_packet(packet);
}
}
}
};
}
self.rxs = Some(rxs);

if all_disconnected {
return Ok(Async::Ready(()));
Expand All @@ -111,3 +146,169 @@ impl Future for Ipv4Router {
}
}

#[cfg(test)]
mod tests {
use super::*;

mod ipv4_router {
use super::*;

mod send_packet {
use super::*;

fn udp_packet_v4(src: SocketAddrV4, dst: SocketAddrV4) -> Ipv4Packet {
Ipv4Packet::new_from_fields(
Ipv4Fields {
source_ip: src.ip().clone(),
dest_ip: dst.ip().clone(),
ttl: 10,
},
&Ipv4Payload::Udp(UdpPacket::new_from_fields_v4(
&UdpFields {
source_port: src.port(),
dest_port: dst.port(),
},
src.ip().clone(),
dst.ip().clone(),
&Bytes::new(),
)),
)
}

#[test]
fn it_returns_false_when_packet_sender_is_not_found_for_packet_destination_ip() {
let mut router = Ipv4Router::new(ipv4!("192.168.1.1"), vec![]);
let packet =
udp_packet_v4(addrv4!("192.168.1.100:5000"), addrv4!("192.168.1.200:6000"));

let sent = router.send_packet(packet);

assert!(!sent);
}

#[test]
fn it_sends_packet_to_the_channel_associated_with_packet_destination_address() {
let (plug1_a, mut plug1_b) = Ipv4Plug::new_pair();
let conns = vec![
(
plug1_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
];
let mut router = Ipv4Router::new(ipv4!("192.168.1.1"), conns);
let packet =
udp_packet_v4(addrv4!("192.168.1.100:5000"), addrv4!("192.168.1.200:6000"));

let sent = router.send_packet(packet.clone());

assert!(sent);
let received_packet = plug1_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet))));
}

#[test]
fn it_sends_broadcast_packet_to_the_machine_on_the_same_subnet() {
let (plug1_a, mut plug1_b) = Ipv4Plug::new_pair();
let conns = vec![
(
plug1_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
];
let mut router = Ipv4Router::new(ipv4!("192.168.1.1"), conns);
let packet =
udp_packet_v4(addrv4!("192.168.1.100:5000"), addrv4!("192.168.1.255:6000"));

let sent = router.send_packet(packet.clone());

assert!(sent);
let received_packet = plug1_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet))));
}

#[test]
fn it_sends_broadcast_packet_to_all_machines_on_the_same_subnet() {
let (plug1_a, mut plug1_b) = Ipv4Plug::new_pair();
let (plug2_a, mut plug2_b) = Ipv4Plug::new_pair();
let (plug3_a, mut plug3_b) = Ipv4Plug::new_pair();
let conns = vec![
(
plug1_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
(
plug2_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("10.0.0.0"), 24), None)],
),
(
plug3_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
];
let mut router = Ipv4Router::new(ipv4!("192.168.1.1"), conns);
let packet =
udp_packet_v4(addrv4!("192.168.1.100:5000"), addrv4!("192.168.1.255:6000"));

let sent = router.send_packet(packet.clone());
assert!(sent);

let mut evloop = unwrap!(Core::new());
let task = future::lazy(|| {
let received_packet = plug1_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet.clone()))));

let received_packet = plug2_b.poll();
assert_eq!(received_packet, Ok(Async::NotReady));

let received_packet = plug3_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet))));

future::ok::<(), ()>(())
});
evloop.run(task);
}

#[test]
fn it_sends_255_255_255_255_packet_to_all_connected_machines() {
let (plug1_a, mut plug1_b) = Ipv4Plug::new_pair();
let (plug2_a, mut plug2_b) = Ipv4Plug::new_pair();
let (plug3_a, mut plug3_b) = Ipv4Plug::new_pair();
let conns = vec![
(
plug1_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
(
plug2_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("10.0.0.0"), 24), None)],
),
(
plug3_a,
vec![Ipv4Route::new(Ipv4Range::new(ipv4!("192.168.1.0"), 24), None)],
),
];
let mut router = Ipv4Router::new(ipv4!("192.168.1.1"), conns);
let packet =
udp_packet_v4(addrv4!("192.168.1.100:5000"), addrv4!("255.255.255.255:6000"));

let sent = router.send_packet(packet.clone());
assert!(sent);

let mut evloop = unwrap!(Core::new());
let task = future::lazy(|| {
let received_packet = plug1_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet.clone()))));

let received_packet = plug2_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet.clone()))));

let received_packet = plug3_b.poll();
assert_eq!(received_packet, Ok(Async::Ready(Some(packet))));

future::ok::<(), ()>(())
});
evloop.run(task);
}
}
}
}
4 changes: 2 additions & 2 deletions src/plug/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ mod packet_loss;
pub use self::latency::*;
pub use self::packet_loss::*;

#[derive(Debug)]
/// Bidirectional network plug that can be used to exchange data between two devices.
/// Anything written to the plub will be readable on the other side.
/// Anything written to the plug will be readable on the other side.
#[derive(Debug)]
pub struct Plug<T: fmt::Debug + 'static> {
/// The sender
pub tx: UnboundedSender<T>,
Expand Down
Loading