Skip to content

Commit

Permalink
feat(client, server): use bounded queues for packet transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
M0dEx committed Feb 25, 2024
1 parent 22754c4 commit 9e904d3
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
12 changes: 6 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::auth::client::AuthClient;

use crate::config::ClientConfig;
use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME};
use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
use crate::utils::signal_handler::handle_ctrl_c;
use crate::utils::socket::bind_socket;
use crate::utils::tasks::abort_all;
Expand All @@ -15,7 +15,7 @@ use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{debug, info};

/// Represents a Quincy client that connects to a server and relays packets between the server and a TUN interface.
Expand Down Expand Up @@ -131,7 +131,7 @@ impl QuincyClient {
interface_mtu: usize,
) -> Result<()> {
let connection = Arc::new(connection);
let (tun_queue_send, tun_queue_recv) = unbounded_channel();
let (tun_queue_send, tun_queue_recv) = channel(PACKET_CHANNEL_SIZE);
let (tun_read, tun_write) = interface.split();

let mut tasks = FuturesUnordered::new();
Expand Down Expand Up @@ -198,7 +198,7 @@ impl QuincyClient {
/// - `tun_queue` - the TUN queue
/// - `tun_write` - the write half of the TUN interface
async fn process_tun_queue(
mut tun_queue: UnboundedReceiver<Bytes>,
mut tun_queue: Receiver<Bytes>,
mut tun_write: impl InterfaceWrite,
) -> Result<()> {
debug!("Started TUN queue task (interface -> QUIC tunnel)");
Expand All @@ -220,7 +220,7 @@ impl QuincyClient {
/// - `tun_queue` - the TUN queue
async fn process_inbound_traffic(
connection: Arc<Connection>,
tun_queue: UnboundedSender<Bytes>,
tun_queue: Sender<Bytes>,
) -> Result<()> {
debug!("Started inbound traffic task (QUIC tunnel -> interface)");

Expand All @@ -233,7 +233,7 @@ impl QuincyClient {
connection.remote_address()
);

tun_queue.send(data)?;
tun_queue.send(data).await?;
}
}
}
3 changes: 3 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub const AUTH_MESSAGE_BUFFER_SIZE: usize = 1024;
/// Packet buffer size for operations on the TUN interface.
pub const PACKET_BUFFER_SIZE: usize = 4;

/// Packet channel size used for communication between the TUN interface and QUIC tunnels.
pub const PACKET_CHANNEL_SIZE: usize = 1024 * 1024;

/// Represents the supported TLS cipher suites for Quincy.
pub static QUINCY_CIPHER_SUITES: &[rustls::SupportedCipherSuite] = &[
rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
Expand Down
17 changes: 8 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use ipnet::Ipv4Net;
use quinn::{Endpoint, VarInt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME};
use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
use crate::interface::{Interface, InterfaceRead, InterfaceWrite};
use crate::utils::tasks::abort_all;
use tracing::{debug, info, warn};

use self::address_pool::AddressPool;

type ConnectionQueues = Arc<DashMap<IpAddr, UnboundedSender<Bytes>>>;
type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;

/// Represents a Quincy server encapsulating Quincy connections and TUN interface IO.
pub struct QuincyServer {
Expand Down Expand Up @@ -64,7 +63,7 @@ impl QuincyServer {
let interface = I::create(interface_address, self.config.connection.mtu)?;

let (tun_read, tun_write) = interface.split();
let (sender, receiver) = mpsc::unbounded_channel();
let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);

let mut tasks = FuturesUnordered::new();

Expand Down Expand Up @@ -94,7 +93,7 @@ impl QuincyServer {
/// ### Arguments
/// - `ingress_queue` - the queue to send data to the TUN interface
/// - `endpoint` - the QUIC endpoint
async fn handle_connections(&self, ingress_queue: UnboundedSender<Bytes>) -> Result<()> {
async fn handle_connections(&self, ingress_queue: Sender<Bytes>) -> Result<()> {
let endpoint = self.create_quinn_endpoint()?;

info!(
Expand Down Expand Up @@ -137,7 +136,7 @@ impl QuincyServer {
};

let client_address = connection.client_address()?.addr();
let (connection_sender, connection_receiver) = mpsc::unbounded_channel();
let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);

connection_tasks.push(tokio::spawn(connection.run(connection_receiver)));
self.connection_queues.insert(client_address, connection_sender);
Expand Down Expand Up @@ -234,7 +233,7 @@ impl QuincyServer {
};
debug!("Found connection for IP {dest_addr}");

Check warning on line 234 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L234

Added line #L234 was not covered by tests

connection_queue.send(buf)?;
connection_queue.send(buf).await?;
}
}

Check warning on line 238 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L238

Added line #L238 was not covered by tests

Expand All @@ -245,7 +244,7 @@ impl QuincyServer {
/// - `ingress_queue` - the queue for sending data to the TUN interface
async fn process_inbound_traffic(
mut tun_write: impl InterfaceWrite,
mut ingress_queue: UnboundedReceiver<Bytes>,
mut ingress_queue: Receiver<Bytes>,
) -> Result<()> {
debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");

Expand Down
12 changes: 6 additions & 6 deletions src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::server::address_pool::AddressPool;
use quinn::Connection;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, info};

/// Represents a Quincy connection encapsulating authentication and IO.
Expand All @@ -19,7 +19,7 @@ pub struct QuincyConnection {
connection: Arc<Connection>,
username: Option<String>,
client_address: Option<IpNet>,
ingress_queue: UnboundedSender<Bytes>,
ingress_queue: Sender<Bytes>,
}

impl QuincyConnection {
Expand All @@ -28,7 +28,7 @@ impl QuincyConnection {
/// ### Arguments
/// - `connection` - the underlying QUIC connection
/// - `tun_queue` - the queue to send data to the TUN interface
pub fn new(connection: Connection, tun_queue: UnboundedSender<Bytes>) -> Self {
pub fn new(connection: Connection, tun_queue: Sender<Bytes>) -> Self {
Self {
connection: Arc::new(connection),
username: None,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl QuincyConnection {
}

/// Starts the tasks for this instance of Quincy connection.
pub async fn run(self, egress_queue: UnboundedReceiver<Bytes>) -> (Self, Error) {
pub async fn run(self, egress_queue: Receiver<Bytes>) -> (Self, Error) {
if self.username.is_none() {
let client_address = self.connection.remote_address();
return (
Expand Down Expand Up @@ -105,7 +105,7 @@ impl QuincyConnection {
/// - `egress_queue` - the queue to receive data from the TUN interface
async fn process_outgoing_data(
self: Arc<Self>,
mut egress_queue: UnboundedReceiver<Bytes>,
mut egress_queue: Receiver<Bytes>,
) -> Result<()> {
loop {
let data = egress_queue
Expand Down Expand Up @@ -134,7 +134,7 @@ impl QuincyConnection {
self.client_address()?.addr()
);

self.ingress_queue.send(data)?;
self.ingress_queue.send(data).await?;
}
}

Expand Down

0 comments on commit 9e904d3

Please sign in to comment.