diff --git a/src/client.rs b/src/client.rs index c39cc98..4aaf8b1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,8 +10,7 @@ use quinn::{Connection, Endpoint, VarInt}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; -use crate::interface::{Interface, InterfaceRead, InterfaceWrite}; -use bytes::Bytes; +use crate::interface::{Interface, InterfaceRead, InterfaceWrite, Packet}; use futures::stream::FuturesUnordered; use futures::StreamExt; use std::sync::Arc; @@ -183,15 +182,9 @@ impl QuincyClient { debug!("Started outgoing traffic task (interface -> QUIC tunnel)"); loop { - let data = read_interface.read_packet(interface_mtu).await?; + let packet = read_interface.read_packet(interface_mtu).await?; - debug!( - "Sending {} bytes to {:?}", - data.len(), - connection.remote_address() - ); - - connection.send_datagram(data)?; + connection.send_datagram(packet.into())?; } } @@ -201,7 +194,7 @@ impl QuincyClient { /// - `tun_queue` - the TUN queue /// - `tun_write` - the write half of the TUN interface async fn process_tun_queue( - mut tun_queue: Receiver, + mut tun_queue: Receiver, mut tun_write: impl InterfaceWrite, ) -> Result<()> { debug!("Started TUN queue task (interface -> QUIC tunnel)"); @@ -223,20 +216,14 @@ impl QuincyClient { /// - `tun_queue` - the TUN queue async fn process_inbound_traffic( connection: Arc, - tun_queue: Sender, + tun_queue: Sender, ) -> Result<()> { debug!("Started inbound traffic task (QUIC tunnel -> interface)"); loop { - let data = connection.read_datagram().await?; - - debug!( - "Received {} bytes from {:?}", - data.len(), - connection.remote_address() - ); + let packet = connection.read_datagram().await?.into(); - tun_queue.send(data).await?; + tun_queue.send(packet).await?; } } } diff --git a/src/config.rs b/src/config.rs index 7f4201c..9bfcf9a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,9 @@ pub struct ServerConfig { pub address_tunnel: Ipv4Addr, /// The address mask for this tunnel pub address_mask: Ipv4Addr, + /// Whether to isolate clients from each other + #[serde(default = "default_true_fn")] + pub isolate_clients: bool, /// Authentication configuration pub authentication: ServerAuthenticationConfig, /// Miscellaneous connection configuration @@ -184,6 +187,10 @@ fn default_auth_type() -> AuthType { AuthType::UsersFile } +fn default_true_fn() -> bool { + true +} + impl ClientConfig { /// Creates Quinn client configuration from this Quincy client configuration. /// diff --git a/src/interface.rs b/src/interface.rs index 89741d0..b12ed75 100644 --- a/src/interface.rs +++ b/src/interface.rs @@ -1,13 +1,16 @@ #![allow(async_fn_in_trait)] -use anyhow::Result; +use std::net::IpAddr; + +use anyhow::{anyhow, Context, Result}; use bytes::{Bytes, BytesMut}; +use etherparse::{NetHeaders, PacketHeaders}; use ipnet::IpNet; use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}; use tun2::{AsyncDevice, Configuration}; pub trait InterfaceRead: AsyncReadExt + Sized + Unpin + Send + 'static { #[inline] - async fn read_packet(&mut self, buf_size: usize) -> Result { + async fn read_packet(&mut self, buf_size: usize) -> Result { let mut buf = BytesMut::with_capacity(buf_size); self.read_buf(&mut buf).await?; @@ -17,14 +20,14 @@ pub trait InterfaceRead: AsyncReadExt + Sized + Unpin + Send + 'static { pub trait InterfaceWrite: AsyncWriteExt + Sized + Unpin + Send + 'static { #[inline] - async fn write_packet(&mut self, packet_data: &Bytes) -> Result<()> { - self.write_all(packet_data).await?; + async fn write_packet(&mut self, packet: &Packet) -> Result<()> { + self.write_all(&packet.0).await?; Ok(()) } #[inline] - async fn write_packets(&mut self, packets: &[Bytes]) -> Result<()> { + async fn write_packets(&mut self, packets: &[Packet]) -> Result<()> { // TODO: Implement this using write_vectored when it actually works for packet in packets { self.write_packet(packet).await?; @@ -65,3 +68,40 @@ impl Interface for AsyncDevice { Ok(interface) } } + +#[derive(Debug, Clone)] +pub struct Packet(Bytes); + +impl Packet { + pub fn new(data: Bytes) -> Self { + Self(data) + } + + pub fn destination(&self) -> Result { + let headers = PacketHeaders::from_ip_slice(&self.0).context("failed to parse IP packet")?; + let net_header = headers.net.ok_or(anyhow!("no network header"))?; + + match net_header { + NetHeaders::Ipv4(header, _) => Ok(header.destination.into()), + NetHeaders::Ipv6(header, _) => Ok(header.destination.into()), + } + } +} + +impl From for Packet { + fn from(data: BytesMut) -> Self { + Self::new(data.freeze()) + } +} + +impl From for Packet { + fn from(data: Bytes) -> Self { + Self::new(data) + } +} + +impl From for Bytes { + fn from(packet: Packet) -> Self { + packet.0 + } +} diff --git a/src/server/connection.rs b/src/server/connection.rs index bb31131..c9c439f 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1,4 +1,4 @@ -use crate::{auth::server::AuthServer, utils::tasks::abort_all}; +use crate::{auth::server::AuthServer, interface::Packet, utils::tasks::abort_all}; use anyhow::{anyhow, Error, Result}; use bytes::Bytes; use futures::stream::FuturesUnordered; @@ -15,7 +15,7 @@ pub struct QuincyConnection { connection: Connection, username: Option, client_address: Option, - ingress_queue: Sender, + ingress_queue: Sender, } impl QuincyConnection { @@ -24,7 +24,7 @@ impl QuincyConnection { /// ### Arguments /// - `connection` - the underlying QUIC connection /// - `tun_queue` - the queue to send data to the TUN interface - pub fn new(connection: Connection, tun_queue: Sender) -> Self { + pub fn new(connection: Connection, tun_queue: Sender) -> Self { Self { connection, username: None, @@ -106,12 +106,12 @@ impl QuincyConnection { /// Processes incoming data and sends it to the TUN interface queue. async fn process_incoming_data( connection: Connection, - ingress_queue: Sender, + ingress_queue: Sender, ) -> Result<()> { loop { - let data = connection.read_datagram().await?; + let packet = connection.read_datagram().await?.into(); - ingress_queue.send(data).await?; + ingress_queue.send(packet).await?; } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 320cf6a..fc306d8 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -12,7 +12,6 @@ use crate::utils::socket::bind_socket; use anyhow::{Context, Result}; use bytes::Bytes; use dashmap::DashMap; -use etherparse::{NetHeaders, PacketHeaders}; use futures::stream::FuturesUnordered; use futures::StreamExt; use ipnet::Ipv4Net; @@ -20,7 +19,7 @@ use quinn::{Endpoint, VarInt}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME}; -use crate::interface::{Interface, InterfaceRead, InterfaceWrite}; +use crate::interface::{Interface, InterfaceRead, InterfaceWrite, Packet}; use crate::utils::tasks::abort_all; use tracing::{debug, info, warn}; @@ -54,7 +53,7 @@ impl QuincyServer { } /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections. - pub async fn run(self) -> Result<()> { + pub async fn run(&self) -> Result<()> { let interface_address = Ipv4Net::with_netmask(self.config.address_tunnel, self.config.address_mask)?.into(); @@ -76,7 +75,12 @@ impl QuincyServer { self.connection_queues.clone(), self.config.connection.mtu as usize, )), - tokio::spawn(Self::process_inbound_traffic(tun_write, receiver)), + tokio::spawn(Self::process_inbound_traffic( + self.connection_queues.clone(), + tun_write, + receiver, + self.config.isolate_clients, + )), ]); let handler_task = self.handle_connections(auth_server, sender); @@ -99,7 +103,7 @@ impl QuincyServer { async fn handle_connections( &self, auth_server: AuthServer, - ingress_queue: Sender, + ingress_queue: Sender, ) -> Result<()> { let endpoint = self.create_quinn_endpoint()?; @@ -218,60 +222,96 @@ impl QuincyServer { debug!("Started tunnel outbound traffic task (interface -> connection queue)"); loop { - let buf = tun_read.read_packet(buffer_size).await?; - - let headers = match PacketHeaders::from_ip_slice(&buf) { - Ok(headers) => headers, + let packet = tun_read.read_packet(buffer_size).await?; + let dest_addr = match packet.destination() { + Ok(addr) => addr, Err(e) => { - warn!("Failed to parse IP packet: {e}"); - continue; - } - }; - - let net_header = match headers.net { - Some(net) => net, - None => { - warn!("Received a packet with invalid IP header"); + warn!("Received packet with malformed header structure: {e}"); continue; } }; - let dest_addr: IpAddr = match net_header { - NetHeaders::Ipv4(header, _) => header.destination.into(), - NetHeaders::Ipv6(header, _) => header.destination.into(), - }; debug!("Destination address for packet: {dest_addr}"); let connection_queue = match connection_queues.get(&dest_addr) { Some(connection_queue) => connection_queue, None => continue, }; + debug!("Found connection for IP {dest_addr}"); - connection_queue.send(buf).await?; + connection_queue.send(packet.into()).await?; } } /// Reads data from the QUIC connection and sends it to the TUN interface worker. /// /// ### Arguments + /// - `connection_queues` - the queues for sending data to the QUIC connections /// - `tun_write` - the write half of the TUN interface /// - `ingress_queue` - the queue for sending data to the TUN interface async fn process_inbound_traffic( - mut tun_write: impl InterfaceWrite, - mut ingress_queue: Receiver, + connection_queues: ConnectionQueues, + tun_write: impl InterfaceWrite, + ingress_queue: Receiver, + isolate_clients: bool, ) -> Result<()> { debug!("Started tunnel inbound traffic task (tunnel queue -> interface)"); - let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE); + if isolate_clients { + relay_isolated(tun_write, ingress_queue).await + } else { + relay_unisolated(connection_queues, tun_write, ingress_queue).await + } + } +} + +#[inline] +async fn relay_isolated( + mut tun_write: impl InterfaceWrite, + mut ingress_queue: Receiver, +) -> Result<()> { + let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE); - loop { - packets.clear(); - ingress_queue - .recv_many(&mut packets, PACKET_BUFFER_SIZE) - .await; + loop { + packets.clear(); + ingress_queue + .recv_many(&mut packets, PACKET_BUFFER_SIZE) + .await; - tun_write.write_packets(&packets).await?; + tun_write.write_packets(&packets).await?; + } +} + +#[inline] +async fn relay_unisolated( + connection_queues: ConnectionQueues, + mut tun_write: impl InterfaceWrite, + mut ingress_queue: Receiver, +) -> Result<()> { + let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE); + + loop { + packets.clear(); + ingress_queue + .recv_many(&mut packets, PACKET_BUFFER_SIZE) + .await; + + for packet in &packets { + let dest_addr = match packet.destination() { + Ok(addr) => addr, + Err(e) => { + warn!("Received packet with malformed header structure: {e}"); + continue; + } + }; + + match connection_queues.get(&dest_addr) { + // Send the packet to the appropriate QUIC connection + Some(connection_queue) => connection_queue.send(packet.clone().into()).await?, + // Send the packet to the TUN interface + None => tun_write.write_packet(packet).await?, + } } } }