From 78bbb4f55f4b4411a498cf4c6ce9cebec23e1470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= Date: Sun, 25 Feb 2024 12:22:09 +0100 Subject: [PATCH] refactor!: remove tunnel, add signal handlers --- Cargo.lock | 12 +- Cargo.toml | 3 +- README.md | 18 ++- examples/client.toml | 2 +- examples/server.toml | 4 +- src/auth/server.rs | 6 +- src/client.rs | 113 ++++++++------- src/config.rs | 71 ++-------- src/constants.rs | 4 - src/interface.rs | 4 + src/server.rs | 271 ++++++++++++++++++++++++++++++----- src/server/address_pool.rs | 18 +-- src/server/connection.rs | 34 +++-- src/server/tunnel.rs | 273 ------------------------------------ src/utils.rs | 1 + src/utils/signal_handler.rs | 8 ++ src/utils/tasks.rs | 43 ++---- tests/static/server.toml | 1 - 18 files changed, 395 insertions(+), 491 deletions(-) delete mode 100644 src/server/tunnel.rs create mode 100644 src/utils/signal_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 995fee5..c86ab67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -790,7 +790,7 @@ dependencies = [ [[package]] name = "quincy" -version = "0.7.1" +version = "0.8.0" dependencies = [ "anyhow", "argon2", @@ -1218,6 +1218,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1365,6 +1374,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index 087f9bb..16a59d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quincy" -version = "0.7.1" +version = "0.8.0" authors = ["Jakub KubĂ­k "] license = "MIT" description = "QUIC-based VPN" @@ -51,6 +51,7 @@ tokio = { version = "^1.25", features = [ "macros", "sync", "io-util", + "signal", ] } dashmap = "^5.4" futures = "^0.3.17" diff --git a/README.md b/README.md index f525602..0921f52 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ $ cargo build --release The resulting binaries can be found in the `target/debug` and `target/release` directories. ## Usage -Quincy is split into 3 binaries: +Quincy is split into 3 binaries: - `client`: The VPN client - `server`: The VPN server - `users`: A utility binary meant for managing the `users` file @@ -85,9 +85,7 @@ Any additional routes now have to be set up manually. ### Server The Quincy server requires a separate configuration file, an example of which can be found in `examples/server.toml`: ```toml -# Section representing tunnel configuration -[tunnels.tun0] -# Name of the tunnel (currently not used as the name of the interface) +# Name of the server instance (currently not used as the name of the interface) name = "tun0" # Path to the certificate used for TLS certificate_file = "examples/cert/server_cert.pem" @@ -115,7 +113,7 @@ $ quincy-server --config-path examples/server.toml ``` ### Users -The users utility can be used to manage entries in the `users` file. +The users utility can be used to manage entries in the `users` file. The `users` file contains usernames and password hashes in the following format (`examples/users`): ``` test:$argon2id$v=19$m=19456,t=2,p=1$S9rMLOcz/dnYN4cnyc/TJg$ES0p+DErLfcWoUJ2tvZlxZSSIGYNUEe0ZpKBDz7MOj0 @@ -128,9 +126,9 @@ $ quincy-users --add examples/users The prompts will look something like this: ``` -Enter the username: test -Enter password for user 'test': -Confirm password for user 'test': +Enter the username: test +Enter password for user 'test': +Confirm password for user 'test': ``` A similar command can be used to remove users from the file: @@ -140,7 +138,7 @@ $ quincy-users --remove examples/users The prompt will again look something like this: ``` -Enter the username: test +Enter the username: test ``` ## Certificate management @@ -168,7 +166,7 @@ $ openssl genpkey -algorithm EC -pkeyopt ec_paramgen_curve:secp384r1 -out -out +$ openssl req -new -key -out You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. diff --git a/examples/client.toml b/examples/client.toml index bf5ad2b..abc27cb 100644 --- a/examples/client.toml +++ b/examples/client.toml @@ -15,4 +15,4 @@ mtu = 1400 [log] # The log level -level = "info" \ No newline at end of file +level = "info" diff --git a/examples/server.toml b/examples/server.toml index 6f53569..dd0b4ce 100644 --- a/examples/server.toml +++ b/examples/server.toml @@ -1,6 +1,4 @@ -# Section representing tunnel configuration -[tunnels.tun0] -# Name of the tunnel (currently not used as the name of the interface) +# Name of the server instance (currently not used as the name of the interface) name = "tun0" # Path to the certificate used for TLS certificate_file = "examples/cert/server_cert.pem" diff --git a/src/auth/server.rs b/src/auth/server.rs index 7d60b21..6e8f0a3 100644 --- a/src/auth/server.rs +++ b/src/auth/server.rs @@ -1,7 +1,9 @@ use std::{net::IpAddr, sync::Arc, time::Duration}; -use crate::constants::{AUTH_FAILED_MESSAGE, AUTH_MESSAGE_BUFFER_SIZE, AUTH_TIMEOUT_MESSAGE}; -use crate::server::address_pool::AddressPool; +use crate::{ + constants::{AUTH_FAILED_MESSAGE, AUTH_MESSAGE_BUFFER_SIZE, AUTH_TIMEOUT_MESSAGE}, + server::address_pool::AddressPool, +}; use anyhow::{anyhow, Context, Result}; use bytes::BytesMut; use ipnet::IpNet; diff --git a/src/client.rs b/src/client.rs index bb421f5..8235368 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,9 +2,11 @@ use crate::auth::client::AuthClient; use crate::config::ClientConfig; use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME}; +use crate::utils::signal_handler::handle_ctrl_c; use crate::utils::socket::bind_socket; +use crate::utils::tasks::abort_all; use anyhow::{anyhow, Result}; -use quinn::{Connection, Endpoint}; +use quinn::{Connection, Endpoint, VarInt}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; @@ -18,7 +20,7 @@ use tracing::{debug, info}; /// Represents a Quincy client that connects to a server and relays packets between the server and a TUN interface. pub struct QuincyClient { - client_config: ClientConfig, + config: ClientConfig, } impl QuincyClient { @@ -26,29 +28,25 @@ impl QuincyClient { /// /// ### Arguments /// - `client_config` - the configuration for the client - pub fn new(client_config: ClientConfig) -> Self { - Self { client_config } + pub fn new(config: ClientConfig) -> Self { + Self { config } } /// Connects to the Quincy server and starts the workers for this instance of the Quincy client. pub async fn run(&self) -> Result<()> { let connection = self.connect_to_server().await?; - let mut auth_client = - AuthClient::new(&connection, &self.client_config.authentication).await?; + let mut auth_client = AuthClient::new(&connection, &self.config.authentication).await?; let assigned_address = auth_client.authenticate().await?; info!("Successfully authenticated"); info!("Received client address: {assigned_address}"); - let interface = I::create(assigned_address, self.client_config.connection.mtu)?; + let mtu = self.config.connection.mtu; + let interface = I::create(assigned_address, mtu)?; - self.relay_packets( - connection, - interface, - self.client_config.connection.mtu as usize, - ) - .await + self.relay_packets(connection, interface, mtu as usize) + .await } /// Connects to the Quincy server. @@ -56,43 +54,40 @@ impl QuincyClient { /// ### Returns /// - `Connection` - a Quinn connection representing the connection to the Quincy server async fn connect_to_server(&self) -> Result { - let quinn_config = self.client_config.as_quinn_client_config()?; + let quinn_config = self.config.as_quinn_client_config()?; let server_hostname = self - .client_config + .config .connection_string .split(':') .next() .ok_or_else(|| { anyhow!( "Could not parse hostname from connection string '{}'", - self.client_config.connection_string + self.config.connection_string ) })?; let server_addr = self - .client_config + .config .connection_string .to_socket_addrs()? .next() .ok_or_else(|| { anyhow!( "Connection string '{}' is invalid", - self.client_config.connection_string + self.config.connection_string ) })?; - info!("Connecting: {}", self.client_config.connection_string); + info!("Connecting: {}", self.config.connection_string); let endpoint = self.create_quinn_endpoint(server_addr)?; let connection = endpoint .connect_with(quinn_config, server_addr, server_hostname)? .await?; - info!( - "Connection established: {}", - self.client_config.connection_string - ); + info!("Connection established: {}", self.config.connection_string); Ok(connection) } @@ -113,21 +108,22 @@ impl QuincyClient { let socket = bind_socket( bind_addr, - self.client_config.connection.send_buffer_size as usize, - self.client_config.connection.recv_buffer_size as usize, + self.config.connection.send_buffer_size as usize, + self.config.connection.recv_buffer_size as usize, )?; - let endpoint_config = self.client_config.connection.as_endpoint_config()?; + let endpoint_config = self.config.connection.as_endpoint_config()?; let endpoint = Endpoint::new(endpoint_config, None, socket, QUINN_RUNTIME.clone())?; Ok(endpoint) } - /// Relays packets between the TUN interface and the Quincy server. + /// Relays packets between the TUN interface and the Quincy clients. /// /// ### Arguments /// - `connection` - a Quinn connection representing the connection to the Quincy server /// - `interface` - the TUN interface + /// - `interface_mtu` - the MTU of the TUN interface async fn relay_packets( &self, connection: Connection, @@ -136,28 +132,38 @@ impl QuincyClient { ) -> Result<()> { let connection = Arc::new(connection); let (tun_queue_send, tun_queue_recv) = unbounded_channel(); - let (tun_read, tun_write) = tokio::io::split(interface); - - let mut client_tasks = FuturesUnordered::new(); - - client_tasks.push(tokio::spawn(Self::process_inbound_traffic( - connection.clone(), - tun_queue_send, - ))); - client_tasks.push(tokio::spawn(Self::process_tun_queue( - tun_queue_recv, - tun_write, - ))); - client_tasks.push(tokio::spawn(Self::process_outgoing_traffic( - connection.clone(), - tun_read, - interface_mtu, - ))); - - client_tasks - .next() - .await - .expect("Client tasks are not empty")? + let (tun_read, tun_write) = interface.split(); + + let mut tasks = FuturesUnordered::new(); + + tasks.extend([ + tokio::spawn(Self::process_inbound_traffic( + connection.clone(), + tun_queue_send, + )), + tokio::spawn(Self::process_tun_queue(tun_queue_recv, tun_write)), + tokio::spawn(Self::process_outgoing_traffic( + connection.clone(), + tun_read, + interface_mtu, + )), + ]); + + let result = tokio::select! { + Some(task_result) = tasks.next() => task_result?, + signal_res = handle_ctrl_c() => { + info!("Received shutdown signal, shutting down"); + signal_res + }, + }; + + // Stop all running tasks + let _ = abort_all(tasks).await; + + // Close the QUIC connection + connection.close(VarInt::from_u32(0x01), "Client shutdown".as_bytes()); + + result } /// Handles incoming packets from the TUN interface and relays them to the Quincy server. @@ -186,6 +192,11 @@ impl QuincyClient { } } + /// Handles incoming packets from the Quincy clients and relays them to the TUN interface. + /// + /// ### Arguments + /// - `tun_queue` - the TUN queue + /// - `tun_write` - the write half of the TUN interface async fn process_tun_queue( mut tun_queue: UnboundedReceiver, mut tun_write: impl InterfaceWrite, @@ -202,11 +213,11 @@ impl QuincyClient { } } - /// Handles incoming packets from the Quincy server and relays them to the TUN interface. + /// Handles incoming packets from the Quincy server and relays them to the TUN interface queue. /// /// ### Arguments /// - `connection` - a Quinn connection representing the connection to the Quincy server - /// - `write_interface` - the write half of the TUN interface + /// - `tun_queue` - the TUN queue async fn process_inbound_traffic( connection: Arc, tun_queue: UnboundedSender, diff --git a/src/config.rs b/src/config.rs index 1ba1b2b..4c16158 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,33 +7,20 @@ use quinn::{EndpointConfig, TransportConfig}; use rustls::{Certificate, RootCertStore}; use serde::de::DeserializeOwned; use serde::Deserialize; -use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::{collections::hash_map::Entry, time::Duration}; +use std::time::Duration; use crate::constants::{ QUIC_MTU_OVERHEAD, QUINCY_CIPHER_SUITES, TLS_ALPN_PROTOCOLS, TLS_PROTOCOL_VERSIONS, }; use crate::utils::certificates::{load_certificates_from_file, load_private_key_from_file}; -use tracing::{error, warn}; +use tracing::error; /// Represents the configuration for a Quincy server. #[derive(Clone, Debug, PartialEq, Deserialize)] pub struct ServerConfig { - tunnel_path: Option, - /// Configuration for the tunnels associated with this server - pub tunnels: HashMap, - /// Miscellaneous connection configuration - pub connection: ConnectionConfig, - /// Logging configuration - pub log: LogConfig, -} - -/// Represents the configuration for a Quincy tunnel. -#[derive(Clone, Debug, PartialEq, Deserialize)] -pub struct TunnelConfig { /// The name of the tunnel pub name: String, /// The certificate to use for the tunnel @@ -52,6 +39,10 @@ pub struct TunnelConfig { pub address_mask: Ipv4Addr, /// A path to a file containing a list of users and their password hashes pub users_file: PathBuf, + /// Miscellaneous connection configuration + pub connection: ConnectionConfig, + /// Logging configuration + pub log: LogConfig, } /// Represents the configuration for a Quincy client. @@ -131,46 +122,11 @@ pub trait FromPath> { } } -impl ConfigInit for ServerConfig { - fn init(figment: Figment, env_prefix: &str) -> Result { - let mut config: ServerConfig = figment.extract()?; - - let tunnel_configs: Vec = match &config.tunnel_path { - Some(tunnel_path) => { - if tunnel_path.is_dir() { - tunnel_path - .read_dir()? - .flatten() - .filter_map(|config_file| { - TunnelConfig::from_path(&config_file.path(), env_prefix).ok() - }) - .collect() - } else { - warn!("Failed to load tunnel configuration files from '{tunnel_path:?}' - the folder does not exist"); - vec![] - } - } - None => vec![], - }; - - for tunnel in tunnel_configs { - match config.tunnels.entry(tunnel.name.clone()) { - Entry::Occupied(_) => warn!("Tunnel with the name {} already exists", tunnel.name), - Entry::Vacant(slot) => { - slot.insert(tunnel); - } - } - } - - Ok(config) - } -} +impl ConfigInit for ServerConfig {} impl ConfigInit for ClientConfig {} -impl ConfigInit for TunnelConfig {} impl FromPath for ServerConfig {} impl FromPath for ClientConfig {} -impl FromPath for TunnelConfig {} fn default_log_level() -> String { "info".to_string() @@ -249,7 +205,7 @@ impl ClientConfig { } } -impl TunnelConfig { +impl ServerConfig { /// Creates Quinn server configuration from this Quincy tunnel configuration. /// /// ### Arguments @@ -257,10 +213,7 @@ impl TunnelConfig { /// /// ### Returns /// - `quinn::ServerConfig` - the Quinn server configuration - pub fn as_quinn_server_config( - &self, - connection_config: &ConnectionConfig, - ) -> Result { + pub fn as_quinn_server_config(&self) -> Result { let certificate_file_path = self.certificate_file.clone(); let certificate_key_path = self.certificate_key_file.clone(); let key = load_private_key_from_file(&certificate_key_path)?; @@ -278,9 +231,9 @@ impl TunnelConfig { let mut quinn_config = quinn::ServerConfig::with_crypto(Arc::new(rustls_config)); let mut transport_config = TransportConfig::default(); - transport_config.max_idle_timeout(Some(connection_config.timeout.try_into()?)); - transport_config.initial_mtu(connection_config.mtu_with_overhead()); - transport_config.min_mtu(connection_config.mtu_with_overhead()); + transport_config.max_idle_timeout(Some(self.connection.timeout.try_into()?)); + transport_config.initial_mtu(self.connection.mtu_with_overhead()); + transport_config.min_mtu(self.connection.mtu_with_overhead()); quinn_config.transport_config(Arc::new(transport_config)); diff --git a/src/constants.rs b/src/constants.rs index a6025b8..430edd4 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Duration; use once_cell::sync::Lazy; use quinn::Runtime; @@ -7,9 +6,6 @@ use quinn::Runtime; /// Represents the maximum MTU overhead for QUIC, since the QUIC header is variable in size. pub const QUIC_MTU_OVERHEAD: u16 = 50; -/// Represents the interval used by various cleanup tasks. -pub const CLEANUP_INTERVAL: Duration = Duration::from_secs(1); - /// Error message when authentication fails. pub const AUTH_FAILED_MESSAGE: &str = "Authentication failed"; diff --git a/src/interface.rs b/src/interface.rs index 3bac75f..89741d0 100644 --- a/src/interface.rs +++ b/src/interface.rs @@ -36,6 +36,10 @@ pub trait InterfaceWrite: AsyncWriteExt + Sized + Unpin + Send + 'static { pub trait Interface: InterfaceRead + InterfaceWrite { fn create(interface_address: IpNet, mtu: u16) -> Result; + + fn split(self) -> (ReadHalf, WriteHalf) { + tokio::io::split(self) + } } impl InterfaceRead for ReadHalf {} diff --git a/src/server.rs b/src/server.rs index e648ca0..61d33d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,60 +1,263 @@ +pub mod address_pool; +mod connection; + +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; + +use crate::auth::user::{load_users_file, UserDatabase}; use crate::config::ServerConfig; -use crate::interface::Interface; -use crate::server::tunnel::QuincyTunnel; -use anyhow::{anyhow, Result}; +use crate::server::connection::QuincyConnection; +use crate::utils::signal_handler::handle_ctrl_c; +use crate::utils::socket::bind_socket; +use anyhow::Result; +use bytes::Bytes; +use dashmap::DashMap; +use etherparse::{NetHeaders, PacketHeaders}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use tracing::error; +use ipnet::Ipv4Net; +use quinn::{Endpoint, VarInt}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -pub mod address_pool; -pub mod connection; -pub mod tunnel; +use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME}; +use crate::interface::{Interface, InterfaceRead, InterfaceWrite}; +use crate::utils::tasks::abort_all; +use tracing::{debug, info, warn}; -/// Represents a Quincy server with multiple underlying Quincy tunnels. +use self::address_pool::AddressPool; + +type ConnectionQueues = Arc>>; + +/// Represents a Quincy server encapsulating Quincy connections and TUN interface IO. pub struct QuincyServer { - active_tunnels: Vec, + config: ServerConfig, + connection_queues: ConnectionQueues, + user_database: UserDatabase, + address_pool: AddressPool, } impl QuincyServer { - /// Creates a new instance of a Quincy server. + /// Creates a new instance of the Quincy tunnel. /// /// ### Arguments - /// - `config` - the configuration for the server + /// - `config` - the server configuration pub fn new(config: ServerConfig) -> Result { - let tunnels = config - .tunnels - .into_iter() - .flat_map(|(name, tunnel_config)| { - QuincyTunnel::new(name, tunnel_config, &config.connection) - }) - .collect(); + let interface_address = + Ipv4Net::with_netmask(config.address_tunnel, config.address_mask)?.into(); + + let user_database = UserDatabase::new(load_users_file(&config.users_file)?); + let address_pool = AddressPool::new(interface_address); Ok(Self { - active_tunnels: tunnels, + config, + connection_queues: Arc::new(DashMap::new()), + user_database, + address_pool, }) } - /// Starts the Quincy server and all of its underlying tunnels. + /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections. pub async fn run(self) -> Result<()> { - let mut tunnel_tasks = self - .active_tunnels - .into_iter() - .map(|tunnel| tokio::spawn(tunnel.run::())) - .collect::>(); + let interface_address = + Ipv4Net::with_netmask(self.config.address_tunnel, self.config.address_mask)?.into(); + let interface = I::create(interface_address, self.config.connection.mtu)?; + + let (tun_read, tun_write) = interface.split(); + let (sender, receiver) = mpsc::unbounded_channel(); + + let mut tasks = FuturesUnordered::new(); + + tasks.extend([ + tokio::spawn(Self::process_outbound_traffic( + tun_read, + self.connection_queues.clone(), + self.config.connection.mtu as usize, + )), + tokio::spawn(Self::process_inbound_traffic(tun_write, receiver)), + ]); + + let handler_task = self.handle_connections(sender); + + let result = tokio::select! { + handler_task_result = handler_task => handler_task_result, + Some(task_result) = tasks.next() => task_result?, + }; + + let _ = abort_all(tasks).await; + + result + } + + /// Handles incoming connections by spawning a new QuincyConnection instance for them. + /// + /// ### Arguments + /// - `ingress_queue` - the queue to send data to the TUN interface + /// - `endpoint` - the QUIC endpoint + async fn handle_connections(&self, ingress_queue: UnboundedSender) -> Result<()> { + let endpoint = self.create_quinn_endpoint()?; + + info!( + "Starting connection handler: {}", + endpoint.local_addr().expect("Endpoint has a local address") + ); + + let mut authentication_tasks = FuturesUnordered::new(); + let mut connection_tasks = FuturesUnordered::new(); + + loop { + tokio::select! { + // New connections + Some(handshake) = endpoint.accept() => { + debug!( + "Received incoming connection from '{}'", + handshake.remote_address().ip() + ); + + let quic_connection = handshake.await?; + + let connection = QuincyConnection::new( + quic_connection, + ingress_queue.clone(), + ); + + authentication_tasks.push( + connection.authenticate(&self.user_database, &self.address_pool, self.config.connection.timeout) + ); + } + + // Authentication tasks + Some(connection) = authentication_tasks.next() => { + let connection = match connection { + Ok(connection) => connection, + Err(e) => { + warn!("Failed to authenticate client: {e}"); + continue; + } + }; + + let client_address = connection.client_address()?.addr(); + let (connection_sender, connection_receiver) = mpsc::unbounded_channel(); + + connection_tasks.push(tokio::spawn(connection.run(connection_receiver))); + self.connection_queues.insert(client_address, connection_sender); + } + + // Connection tasks + Some(connection) = connection_tasks.next() => { + let (connection, err) = connection?; + let client_address = &connection.client_address()?.addr(); + + self.connection_queues.remove(client_address); + self.address_pool.release_address(client_address); + warn!("Connection with client {client_address} has encountered an error: {err}"); + } + + // Shutdown + signal_res = handle_ctrl_c() => { + info!("Received shutdown signal, shutting down"); + let _ = abort_all(connection_tasks).await; + + endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes()); + + return signal_res; + } + } + } + } + + /// Creates a Quinn QUIC endpoint that clients can connect to. + /// + /// ### Arguments + /// - `quinn_config` - the Quinn server configuration to use + fn create_quinn_endpoint(&self) -> Result { + let quinn_config = self.config.as_quinn_server_config()?; + + let socket = bind_socket( + SocketAddr::new(self.config.bind_address, self.config.bind_port), + self.config.connection.send_buffer_size as usize, + self.config.connection.recv_buffer_size as usize, + )?; + + let endpoint_config = self.config.connection.as_endpoint_config()?; + let endpoint = Endpoint::new( + endpoint_config, + Some(quinn_config), + socket, + QUINN_RUNTIME.clone(), + )?; + + Ok(endpoint) + } + + /// Reads data from the TUN interface and sends it to the appropriate client. + /// + /// ### Arguments + /// - `tun_read` - the read half of the TUN interface + /// - `connection_queues` - the queues for sending data to the QUIC connections + /// - `buffer_size` - the size of the buffer to use when reading from the TUN interface + async fn process_outbound_traffic( + mut tun_read: impl InterfaceRead, + connection_queues: ConnectionQueues, + buffer_size: usize, + ) -> Result<()> { + debug!("Started tunnel outbound traffic task (interface -> connection queue)"); loop { - let (tunnel, task_result) = match tunnel_tasks.next().await { - Some(tunnel) => tunnel??, - None => return Err(anyhow!("No tunnels are running")), + let buf = tun_read.read_packet(buffer_size).await?; + + let headers = match PacketHeaders::from_ip_slice(&buf) { + Ok(headers) => headers, + Err(e) => { + warn!("Failed to parse IP packet: {e}"); + continue; + } + }; + + let net_header = match headers.net { + Some(net) => net, + None => { + warn!("Received a packet with invalid IP header"); + continue; + } + }; + + let dest_addr: IpAddr = match net_header { + NetHeaders::Ipv4(header, _) => header.destination.into(), + NetHeaders::Ipv6(header, _) => header.destination.into(), + }; + debug!("Destination address for packet: {dest_addr}"); + + let connection_queue = match connection_queues.get(&dest_addr) { + Some(connection_queue) => connection_queue, + None => continue, }; + debug!("Found connection for IP {dest_addr}"); - error!( - "Tunnel {} has encountered an error: {:?}", - tunnel.name, - task_result.expect_err("Tunnel task always returns an error") - ); + connection_queue.send(buf)?; + } + } + + /// Reads data from the QUIC connection and sends it to the TUN interface worker. + /// + /// ### Arguments + /// - `tun_write` - the write half of the TUN interface + /// - `ingress_queue` - the queue for sending data to the TUN interface + async fn process_inbound_traffic( + mut tun_write: impl InterfaceWrite, + mut ingress_queue: UnboundedReceiver, + ) -> Result<()> { + debug!("Started tunnel inbound traffic task (tunnel queue -> interface)"); + + let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE); + + loop { + packets.clear(); + ingress_queue + .recv_many(&mut packets, PACKET_BUFFER_SIZE) + .await; - tunnel_tasks.push(tokio::spawn(tunnel.run::())); + tun_write.write_packets(&packets).await?; } } } diff --git a/src/server/address_pool.rs b/src/server/address_pool.rs index a005ee0..053240c 100644 --- a/src/server/address_pool.rs +++ b/src/server/address_pool.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use dashmap::DashSet; use ipnet::{IpAddrRange, IpNet, Ipv4AddrRange, Ipv6AddrRange}; use std::net::IpAddr; @@ -14,13 +13,15 @@ impl AddressPool { /// /// ### Arguments /// - `network` - the network address and mask - pub fn new(network: IpNet) -> Result { - let used = DashSet::from_iter(vec![network.network(), network.addr(), network.broadcast()]); - - Ok(Self { + pub fn new(network: IpNet) -> Self { + let pool = Self { network, - used_addresses: used, - }) + used_addresses: DashSet::new(), + }; + + pool.reset(); + + pool } /// Returns the next available address if such an address exists. @@ -74,8 +75,7 @@ mod tests { Ipv4Addr::new(255, 255, 255, 252), ) .unwrap(), - )) - .unwrap(); + )); assert_eq!( pool.next_available_address().unwrap(), diff --git a/src/server/connection.rs b/src/server/connection.rs index e34e3b4..0ab309a 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1,7 +1,9 @@ -use crate::auth::server::AuthServer; use crate::auth::user::UserDatabase; +use crate::{auth::server::AuthServer, utils::tasks::abort_all}; use anyhow::{anyhow, Error, Result}; use bytes::Bytes; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use ipnet::IpNet; use crate::server::address_pool::AddressPool; @@ -74,20 +76,27 @@ impl QuincyConnection { ); } - let connection = Arc::new(self.clone()); + let connection = Arc::new(self); - let outgoing_data_task = - tokio::spawn(connection.clone().process_outgoing_data(egress_queue)); - let incoming_data_task = tokio::spawn(connection.clone().process_incoming_data()); + let mut tasks = FuturesUnordered::new(); - let err = tokio::select! { - outgoing_data_err = outgoing_data_task => outgoing_data_err, - incoming_data_err = incoming_data_task => incoming_data_err, - } - .expect("joining tasks never fails") - .expect_err("connection tasks always return an error"); + tasks.extend([ + tokio::spawn(connection.clone().process_outgoing_data(egress_queue)), + tokio::spawn(connection.clone().process_incoming_data()), + ]); + + let res = tasks + .next() + .await + .expect("tasks is not empty") + .expect("task is joinable"); + + let _ = abort_all(tasks).await; - (self, err) + ( + Arc::into_inner(connection).expect("there is exactly one Arc instance at this point"), + res.expect_err("task failed"), + ) } /// Processes outgoing data and sends it to the QUIC connection. @@ -130,6 +139,7 @@ impl QuincyConnection { } /// Returns the username associated with this connection. + #[allow(dead_code)] pub fn username(&self) -> Result<&str> { self.username .as_deref() diff --git a/src/server/tunnel.rs b/src/server/tunnel.rs deleted file mode 100644 index d2bc911..0000000 --- a/src/server/tunnel.rs +++ /dev/null @@ -1,273 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; -use std::time::Duration; - -use crate::auth::user::{load_users_file, UserDatabase}; -use crate::config::{ConnectionConfig, TunnelConfig}; -use crate::server::address_pool::AddressPool; -use crate::server::connection::QuincyConnection; -use crate::utils::socket::bind_socket; -use anyhow::Result; -use bytes::Bytes; -use dashmap::DashMap; -use etherparse::{NetHeaders, PacketHeaders}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use ipnet::Ipv4Net; -use quinn::Endpoint; -use tokio::sync::mpsc; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -use crate::constants::{PACKET_BUFFER_SIZE, QUINN_RUNTIME}; -use crate::interface::{Interface, InterfaceRead, InterfaceWrite}; -use crate::utils::tasks::join_or_abort_all; -use tracing::{debug, info, warn}; - -type ConnectionQueues = Arc>>; - -/// Represents a Quincy tunnel encapsulating Quincy connections and TUN interface IO. -pub struct QuincyTunnel { - pub name: String, - tunnel_config: TunnelConfig, - connection_config: ConnectionConfig, - connection_queues: ConnectionQueues, - user_database: UserDatabase, - address_pool: AddressPool, - buffer_size: usize, -} - -impl QuincyTunnel { - /// Creates a new instance of the Quincy tunnel. - /// - /// ### Arguments - /// = `name` - the name of the tunnel - /// - `tunnel_config` - the tunnel configuration - /// - `connection_config` - the connection configuration - pub fn new( - name: String, - tunnel_config: TunnelConfig, - connection_config: &ConnectionConfig, - ) -> Result { - let interface_address = - Ipv4Net::with_netmask(tunnel_config.address_tunnel, tunnel_config.address_mask)?.into(); - - let user_database = UserDatabase::new(load_users_file(&tunnel_config.users_file)?); - let address_pool = AddressPool::new(interface_address)?; - - Ok(Self { - name, - tunnel_config, - connection_config: connection_config.clone(), - connection_queues: Arc::new(DashMap::new()), - user_database, - address_pool, - buffer_size: connection_config.mtu as usize, - }) - } - - /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections. - pub async fn run(self) -> Result<(Self, Result<()>)> { - let interface_address = Ipv4Net::with_netmask( - self.tunnel_config.address_tunnel, - self.tunnel_config.address_mask, - )? - .into(); - let interface = I::create(interface_address, self.connection_config.mtu)?; - - let (tun_read, tun_write) = tokio::io::split(interface); - let (sender, receiver) = mpsc::unbounded_channel(); - - let quinn_configuration = self - .tunnel_config - .as_quinn_server_config(&self.connection_config)?; - let endpoint = self.create_quinn_endpoint(quinn_configuration)?; - - let mut tunnel_tasks = FuturesUnordered::new(); - - tunnel_tasks.push(tokio::spawn(Self::process_outbound_traffic( - tun_read, - self.connection_queues.clone(), - self.buffer_size, - ))); - - tunnel_tasks.push(tokio::spawn(Self::process_inbound_traffic( - tun_write, receiver, - ))); - - let handler_task = self.handle_incoming_connections(sender, endpoint); - - let result = tokio::select! { - handler_task_result = handler_task => handler_task_result, - Some(task_result) = tunnel_tasks.next() => task_result?, - }; - - join_or_abort_all(tunnel_tasks, Duration::from_secs(1)).await?; - - Ok((self, result)) - } - - /// Handles incoming connections by spawning a new QuincyConnection instance for them. - /// - /// ### Arguments - /// - `ingress_queue` - the queue to send data to the TUN interface - /// - `endpoint` - the QUIC endpoint - async fn handle_incoming_connections( - &self, - ingress_queue: UnboundedSender, - endpoint: Endpoint, - ) -> Result<()> { - info!( - "Starting connection handler: {}", - endpoint.local_addr().expect("Endpoint has a local address") - ); - - let mut authentication_tasks = FuturesUnordered::new(); - let mut connection_tasks = FuturesUnordered::new(); - - loop { - tokio::select! { - // New connections - Some(handshake) = endpoint.accept() => { - debug!( - "Received incoming connection from '{}'", - handshake.remote_address().ip() - ); - - let quic_connection = handshake.await?; - - let connection = QuincyConnection::new( - quic_connection, - ingress_queue.clone(), - ); - - authentication_tasks.push( - connection.authenticate(&self.user_database, &self.address_pool, self.connection_config.timeout) - ); - } - - // Authentication tasks - Some(connection) = authentication_tasks.next() => { - let connection = match connection { - Ok(connection) => connection, - Err(e) => { - warn!("Failed to authenticate client: {e}"); - continue; - } - }; - - let client_address = connection.client_address()?.addr(); - let (connection_sender, connection_receiver) = mpsc::unbounded_channel(); - - connection_tasks.push(tokio::spawn(connection.run(connection_receiver))); - self.connection_queues.insert(client_address, connection_sender); - } - - // Connection tasks - Some(connection) = connection_tasks.next() => { - let (connection, err) = connection?; - let client_address = &connection.client_address()?.addr(); - - self.connection_queues.remove(client_address); - self.address_pool.release_address(client_address); - warn!("Connection with client {client_address} has encountered an error: {err}"); - } - } - } - } - - /// Creates a Quinn QUIC endpoint that clients can connect to. - /// - /// ### Arguments - /// - `quinn_config` - the Quinn server configuration to use - fn create_quinn_endpoint(&self, quinn_config: quinn::ServerConfig) -> Result { - let socket = bind_socket( - SocketAddr::new( - self.tunnel_config.bind_address, - self.tunnel_config.bind_port, - ), - self.connection_config.send_buffer_size as usize, - self.connection_config.recv_buffer_size as usize, - )?; - - let endpoint_config = self.connection_config.as_endpoint_config()?; - let endpoint = Endpoint::new( - endpoint_config, - Some(quinn_config), - socket, - QUINN_RUNTIME.clone(), - )?; - - Ok(endpoint) - } - - /// Reads data from the TUN interface and sends it to the appropriate client. - /// - /// ### Arguments - /// - `tun_read` - the read half of the TUN interface - /// - `connection_queues` - the queues for sending data to the QUIC connections - /// - `buffer_size` - the size of the buffer to use when reading from the TUN interface - async fn process_outbound_traffic( - mut tun_read: impl InterfaceRead, - connection_queues: ConnectionQueues, - buffer_size: usize, - ) -> Result<()> { - debug!("Started tunnel outbound traffic task (interface -> connection queue)"); - - loop { - let buf = tun_read.read_packet(buffer_size).await?; - - let headers = match PacketHeaders::from_ip_slice(&buf) { - Ok(headers) => headers, - Err(e) => { - warn!("Failed to parse IP packet: {e}"); - continue; - } - }; - - let net_header = match headers.net { - Some(net) => net, - None => { - warn!("Received a packet with invalid IP header"); - continue; - } - }; - - let dest_addr: IpAddr = match net_header { - NetHeaders::Ipv4(header, _) => header.destination.into(), - NetHeaders::Ipv6(header, _) => header.destination.into(), - }; - debug!("Destination address for packet: {dest_addr}"); - - let connection_queue = match connection_queues.get(&dest_addr) { - Some(connection_queue) => connection_queue, - None => continue, - }; - debug!("Found connection for IP {dest_addr}"); - - connection_queue.send(buf)?; - } - } - - /// Reads data from the QUIC connection and sends it to the TUN interface worker. - /// - /// ### Arguments - /// - `tun_write` - the write half of the TUN interface - /// - `ingress_queue` - the queue for sending data to the TUN interface - async fn process_inbound_traffic( - mut tun_write: impl InterfaceWrite, - mut ingress_queue: UnboundedReceiver, - ) -> Result<()> { - debug!("Started tunnel inbound traffic task (tunnel queue -> interface)"); - - let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE); - - loop { - packets.clear(); - ingress_queue - .recv_many(&mut packets, PACKET_BUFFER_SIZE) - .await; - - tun_write.write_packets(&packets).await?; - } - } -} diff --git a/src/utils.rs b/src/utils.rs index 337a430..1957e1a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,6 @@ pub mod certificates; pub mod cli; +pub mod signal_handler; pub mod socket; pub mod tasks; pub mod tracing; diff --git a/src/utils/signal_handler.rs b/src/utils/signal_handler.rs new file mode 100644 index 0000000..04e869a --- /dev/null +++ b/src/utils/signal_handler.rs @@ -0,0 +1,8 @@ +use anyhow::{anyhow, Result}; +use tokio::signal; + +pub(crate) async fn handle_ctrl_c() -> Result<()> { + signal::ctrl_c().await?; + + Err(anyhow!("Received stop signal")) +} diff --git a/src/utils/tasks.rs b/src/utils/tasks.rs index f78b997..b141a20 100644 --- a/src/utils/tasks.rs +++ b/src/utils/tasks.rs @@ -1,46 +1,29 @@ use anyhow::Result; use futures::stream::FuturesUnordered; -use std::time::Duration; -use tokio::task::{AbortHandle, JoinError, JoinHandle}; -use tokio::time::sleep; -use tokio::try_join; +use futures::StreamExt; +use tokio::task::JoinHandle; -/// Aborts a task after a specified duration. -/// -/// ### Arguments -/// - `abort_handle` - the abort handle of the task to be aborted -/// - `duration` - the duration after which the task should be aborted -async fn abort_after(abort_handle: AbortHandle, duration: Duration) -> Result<(), JoinError> { - sleep(duration).await; - abort_handle.abort(); - - Ok(()) -} - -/// Joins a task or aborts it after a specified duration. +/// Aborts a given task. /// /// ### Arguments /// - `task` - the task to be joined or aborted -/// - `duration` - the duration after which the task should be aborted /// /// ### Returns /// - `R` - the result of the task -pub async fn join_or_abort_task(task: &mut JoinHandle, duration: Duration) -> Result { - let abort_handle = task.abort_handle(); +pub async fn abort_task(task: &mut JoinHandle) -> Result { + task.abort(); - let (_, result) = try_join!(abort_after(abort_handle, duration), task)?; - - Ok(result) + Ok(task.await?) } /// Joins all tasks in a FuturesUnordered or aborts them after a specified duration. -pub async fn join_or_abort_all( - mut tasks: FuturesUnordered>, - duration: Duration, -) -> Result<()> { - for task in tasks.iter_mut() { - join_or_abort_task(task, duration).await?; - } +pub async fn abort_all(mut tasks: FuturesUnordered>) -> Result<()> { + let mut aborted_tasks = tasks + .iter_mut() + .map(|handle| abort_task(handle)) + .collect::>(); + + while (aborted_tasks.next().await).is_some() {} Ok(()) } diff --git a/tests/static/server.toml b/tests/static/server.toml index dd4b929..10f8215 100644 --- a/tests/static/server.toml +++ b/tests/static/server.toml @@ -1,4 +1,3 @@ -[tunnels.tun0] name = "tun0" certificate_file = "tests/static/cert.pem" certificate_key_file = "tests/static/key.pem"