Skip to content

Commit

Permalink
feat: improve throughput by unblocking datagram recv (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
M0dEx authored Dec 6, 2023
1 parent 015a16d commit 13e7f48
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 27 deletions.
60 changes: 42 additions & 18 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::auth::client::AuthClient;

use crate::config::ClientConfig;
use crate::constants::QUINN_RUNTIME;
use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME};
use crate::utils::socket::bind_socket;
use anyhow::{anyhow, Result};
use quinn::{Connection, Endpoint};

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};

use crate::interface::{Interface, InterfaceRead, InterfaceWrite};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::{debug, info, warn};

/// Represents a Quincy client that connects to a server and relays packets between the server and a TUN interface.
Expand Down Expand Up @@ -133,25 +137,29 @@ impl QuincyClient {
interface_mtu: usize,
) -> Result<()> {
let connection = Arc::new(connection);
let (read, write) = tokio::io::split(interface);
let (tun_queue_send, tun_queue_recv) = unbounded_channel();
let (tun_read, tun_write) = tokio::io::split(interface);

let inbound_task = tokio::spawn(Self::process_inbound_traffic(connection.clone(), write));
let outgoing_task = tokio::spawn(Self::process_outgoing_traffic(
let mut client_tasks = FuturesUnordered::new();

client_tasks.push(tokio::spawn(Self::process_inbound_traffic(
connection.clone(),
tun_queue_send,
)));
client_tasks.push(tokio::spawn(Self::process_tun_queue(
tun_queue_recv,
tun_write,
)));
client_tasks.push(tokio::spawn(Self::process_outgoing_traffic(
connection.clone(),
read,
tun_read,
interface_mtu,
));

tokio::select! {
inbound_result = inbound_task => {
inbound_result??;
}
outgoing_result = outgoing_task => {
outgoing_result??;
}
}
)));

Ok(())
client_tasks
.next()
.await
.expect("Client tasks are not empty")?
}

/// Handles incoming packets from the TUN interface and relays them to the Quincy server.
Expand Down Expand Up @@ -193,14 +201,30 @@ impl QuincyClient {
}
}

async fn process_tun_queue(
mut tun_queue: UnboundedReceiver<Bytes>,
mut tun_write: impl InterfaceWrite,
) -> Result<()> {
debug!("Started TUN queue task (interface -> QUIC tunnel)");

let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);

loop {
packets.clear();
tun_queue.recv_many(&mut packets, PACKET_BUFFER_SIZE).await;

tun_write.write_packets(&packets).await?;
}
}

/// Handles incoming packets from the Quincy server and relays them to the TUN interface.
///
/// ### Arguments
/// - `connection` - a Quinn connection representing the connection to the Quincy server
/// - `write_interface` - the write half of the TUN interface
async fn process_inbound_traffic(
connection: Arc<Connection>,
mut write_interface: impl InterfaceWrite,
tun_queue: UnboundedSender<Bytes>,
) -> Result<()> {
debug!("Started inbound traffic task (QUIC tunnel -> interface)");

Expand All @@ -213,7 +237,7 @@ impl QuincyClient {
connection.remote_address()
);

write_interface.write_packet(data).await?;
tun_queue.send(data)?;
}
}
}
3 changes: 3 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub const AUTH_TIMEOUT_MESSAGE: &str = "Authentication timed out";
/// Buffer size for authentication messages.
pub const AUTH_MESSAGE_BUFFER_SIZE: usize = 1024;

/// Packet buffer size for operations on the TUN interface.
pub const PACKET_BUFFER_SIZE: usize = 4;

/// Represents the size of the packet info header on UNIX systems.
#[cfg(target_os = "macos")]
pub const DARWIN_PI_HEADER_LENGTH: usize = 4;
Expand Down
16 changes: 13 additions & 3 deletions src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ pub trait InterfaceRead: AsyncReadExt + Sized + Unpin + Send + 'static {

pub trait InterfaceWrite: AsyncWriteExt + Sized + Unpin + Send + 'static {
#[inline]
async fn write_packet(&mut self, packet_data: Bytes) -> Result<()> {
async fn write_packet(&mut self, packet_data: &Bytes) -> Result<()> {
#[cfg(target_os = "macos")]
let packet_data = prepend_packet_info_header(&packet_data)?;
let packet_data = &prepend_packet_info_header(packet_data)?;

self.write_all(&packet_data).await?;
self.write_all(packet_data).await?;

Ok(())
}

#[inline]
async fn write_packets(&mut self, packets: &[Bytes]) -> Result<()> {
// TODO: Implement this using write_vectored when it actually works
for packet in packets {
self.write_packet(packet).await?;
}

Ok(())
}
Expand Down
16 changes: 10 additions & 6 deletions src/server/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use quinn::Endpoint;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use crate::constants::QUINN_RUNTIME;
use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME};
use crate::interface::{Interface, InterfaceRead, InterfaceWrite};
use crate::utils::tasks::join_or_abort_all;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -250,11 +250,15 @@ impl QuincyTunnel {
) -> Result<()> {
debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");

while let Some(buf) = ingress_queue.recv().await {
debug!("Sending {} bytes to tunnel", buf.len());
tun_write.write_packet(buf).await?;
}
let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);

loop {
packets.clear();
ingress_queue
.recv_many(&mut packets, PACKET_BUFFER_SIZE)
.await;

Ok(())
tun_write.write_packets(&packets).await?;
}
}
}

0 comments on commit 13e7f48

Please sign in to comment.