From 767c6bc7482cb6d2190656dffa4d789c31cbbbac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= Date: Sun, 25 Feb 2024 20:33:52 +0100 Subject: [PATCH] feat(client, server): use bounded queues for packet transfer --- src/client.rs | 12 ++++++------ src/constants.rs | 3 +++ src/server.rs | 17 ++++++++--------- src/server/connection.rs | 12 ++++++------ 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8235368..151775d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ use crate::auth::client::AuthClient; use crate::config::ClientConfig; -use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME}; +use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME}; use crate::utils::signal_handler::handle_ctrl_c; use crate::utils::socket::bind_socket; use crate::utils::tasks::abort_all; @@ -15,7 +15,7 @@ use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::StreamExt; use std::sync::Arc; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::{debug, info}; /// Represents a Quincy client that connects to a server and relays packets between the server and a TUN interface. @@ -131,7 +131,7 @@ impl QuincyClient { interface_mtu: usize, ) -> Result<()> { let connection = Arc::new(connection); - let (tun_queue_send, tun_queue_recv) = unbounded_channel(); + let (tun_queue_send, tun_queue_recv) = channel(PACKET_CHANNEL_SIZE); let (tun_read, tun_write) = interface.split(); let mut tasks = FuturesUnordered::new(); @@ -198,7 +198,7 @@ impl QuincyClient { /// - `tun_queue` - the TUN queue /// - `tun_write` - the write half of the TUN interface async fn process_tun_queue( - mut tun_queue: UnboundedReceiver, + mut tun_queue: Receiver, mut tun_write: impl InterfaceWrite, ) -> Result<()> { debug!("Started TUN queue task (interface -> QUIC tunnel)"); @@ -220,7 +220,7 @@ impl QuincyClient { /// - `tun_queue` - the TUN queue async fn process_inbound_traffic( connection: Arc, - tun_queue: UnboundedSender, + tun_queue: Sender, ) -> Result<()> { debug!("Started inbound traffic task (QUIC tunnel -> interface)"); @@ -233,7 +233,7 @@ impl QuincyClient { connection.remote_address() ); - tun_queue.send(data)?; + tun_queue.send(data).await?; } } } diff --git a/src/constants.rs b/src/constants.rs index 430edd4..8205c05 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -18,6 +18,9 @@ pub const AUTH_MESSAGE_BUFFER_SIZE: usize = 1024; /// Packet buffer size for operations on the TUN interface. pub const PACKET_BUFFER_SIZE: usize = 4; +/// Packet channel size used for communication between the TUN interface and QUIC tunnels. +pub const PACKET_CHANNEL_SIZE: usize = 1024 * 1024; + /// Represents the supported TLS cipher suites for Quincy. pub static QUINCY_CIPHER_SUITES: &[rustls::SupportedCipherSuite] = &[ rustls::cipher_suite::TLS13_AES_256_GCM_SHA384, diff --git a/src/server.rs b/src/server.rs index 61d33d8..2382eb2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,17 +17,16 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use ipnet::Ipv4Net; use quinn::{Endpoint, VarInt}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME}; +use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME}; use crate::interface::{Interface, InterfaceRead, InterfaceWrite}; use crate::utils::tasks::abort_all; use tracing::{debug, info, warn}; use self::address_pool::AddressPool; -type ConnectionQueues = Arc>>; +type ConnectionQueues = Arc>>; /// Represents a Quincy server encapsulating Quincy connections and TUN interface IO. pub struct QuincyServer { @@ -64,7 +63,7 @@ impl QuincyServer { let interface = I::create(interface_address, self.config.connection.mtu)?; let (tun_read, tun_write) = interface.split(); - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = channel(PACKET_CHANNEL_SIZE); let mut tasks = FuturesUnordered::new(); @@ -94,7 +93,7 @@ impl QuincyServer { /// ### Arguments /// - `ingress_queue` - the queue to send data to the TUN interface /// - `endpoint` - the QUIC endpoint - async fn handle_connections(&self, ingress_queue: UnboundedSender) -> Result<()> { + async fn handle_connections(&self, ingress_queue: Sender) -> Result<()> { let endpoint = self.create_quinn_endpoint()?; info!( @@ -137,7 +136,7 @@ impl QuincyServer { }; let client_address = connection.client_address()?.addr(); - let (connection_sender, connection_receiver) = mpsc::unbounded_channel(); + let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE); connection_tasks.push(tokio::spawn(connection.run(connection_receiver))); self.connection_queues.insert(client_address, connection_sender); @@ -234,7 +233,7 @@ impl QuincyServer { }; debug!("Found connection for IP {dest_addr}"); - connection_queue.send(buf)?; + connection_queue.send(buf).await?; } } @@ -245,7 +244,7 @@ impl QuincyServer { /// - `ingress_queue` - the queue for sending data to the TUN interface async fn process_inbound_traffic( mut tun_write: impl InterfaceWrite, - mut ingress_queue: UnboundedReceiver, + mut ingress_queue: Receiver, ) -> Result<()> { debug!("Started tunnel inbound traffic task (tunnel queue -> interface)"); diff --git a/src/server/connection.rs b/src/server/connection.rs index 0ab309a..ebec5ef 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -10,7 +10,7 @@ use crate::server::address_pool::AddressPool; use quinn::Connection; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, info}; /// Represents a Quincy connection encapsulating authentication and IO. @@ -19,7 +19,7 @@ pub struct QuincyConnection { connection: Arc, username: Option, client_address: Option, - ingress_queue: UnboundedSender, + ingress_queue: Sender, } impl QuincyConnection { @@ -28,7 +28,7 @@ impl QuincyConnection { /// ### Arguments /// - `connection` - the underlying QUIC connection /// - `tun_queue` - the queue to send data to the TUN interface - pub fn new(connection: Connection, tun_queue: UnboundedSender) -> Self { + pub fn new(connection: Connection, tun_queue: Sender) -> Self { Self { connection: Arc::new(connection), username: None, @@ -67,7 +67,7 @@ impl QuincyConnection { } /// Starts the tasks for this instance of Quincy connection. - pub async fn run(self, egress_queue: UnboundedReceiver) -> (Self, Error) { + pub async fn run(self, egress_queue: Receiver) -> (Self, Error) { if self.username.is_none() { let client_address = self.connection.remote_address(); return ( @@ -105,7 +105,7 @@ impl QuincyConnection { /// - `egress_queue` - the queue to receive data from the TUN interface async fn process_outgoing_data( self: Arc, - mut egress_queue: UnboundedReceiver, + mut egress_queue: Receiver, ) -> Result<()> { loop { let data = egress_queue @@ -134,7 +134,7 @@ impl QuincyConnection { self.client_address()?.addr() ); - self.ingress_queue.send(data)?; + self.ingress_queue.send(data).await?; } }