Skip to content

Commit

Permalink
feat: add configurable client isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
M0dEx committed Mar 18, 2024
1 parent 7d4ac60 commit e099dc3
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 63 deletions.
27 changes: 7 additions & 20 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use quinn::{Connection, Endpoint, VarInt};

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};

use crate::interface::{Interface, InterfaceRead, InterfaceWrite};
use bytes::Bytes;
use crate::interface::{Interface, InterfaceRead, InterfaceWrite, Packet};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::sync::Arc;
Expand Down Expand Up @@ -183,15 +182,9 @@ impl QuincyClient {
debug!("Started outgoing traffic task (interface -> QUIC tunnel)");

loop {
let data = read_interface.read_packet(interface_mtu).await?;
let packet = read_interface.read_packet(interface_mtu).await?;

debug!(
"Sending {} bytes to {:?}",
data.len(),
connection.remote_address()
);

connection.send_datagram(data)?;
connection.send_datagram(packet.into())?;
}
}

Expand All @@ -201,7 +194,7 @@ impl QuincyClient {
/// - `tun_queue` - the TUN queue
/// - `tun_write` - the write half of the TUN interface
async fn process_tun_queue(
mut tun_queue: Receiver<Bytes>,
mut tun_queue: Receiver<Packet>,
mut tun_write: impl InterfaceWrite,
) -> Result<()> {
debug!("Started TUN queue task (interface -> QUIC tunnel)");
Expand All @@ -223,20 +216,14 @@ impl QuincyClient {
/// - `tun_queue` - the TUN queue
async fn process_inbound_traffic(
connection: Arc<Connection>,
tun_queue: Sender<Bytes>,
tun_queue: Sender<Packet>,
) -> Result<()> {
debug!("Started inbound traffic task (QUIC tunnel -> interface)");

loop {
let data = connection.read_datagram().await?;

debug!(
"Received {} bytes from {:?}",
data.len(),
connection.remote_address()
);
let packet = connection.read_datagram().await?.into();

tun_queue.send(data).await?;
tun_queue.send(packet).await?;
}
}
}
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct ServerConfig {
pub address_tunnel: Ipv4Addr,
/// The address mask for this tunnel
pub address_mask: Ipv4Addr,
/// Whether to isolate clients from each other
#[serde(default = "default_true_fn")]
pub isolate_clients: bool,
/// Authentication configuration
pub authentication: ServerAuthenticationConfig,
/// Miscellaneous connection configuration
Expand Down Expand Up @@ -184,6 +187,10 @@ fn default_auth_type() -> AuthType {
AuthType::UsersFile
}

fn default_true_fn() -> bool {
true
}

impl ClientConfig {
/// Creates Quinn client configuration from this Quincy client configuration.
///
Expand Down
50 changes: 45 additions & 5 deletions src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(async_fn_in_trait)]
use anyhow::Result;
use std::net::IpAddr;

use anyhow::{anyhow, Context, Result};
use bytes::{Bytes, BytesMut};
use etherparse::{NetHeaders, PacketHeaders};
use ipnet::IpNet;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tun2::{AsyncDevice, Configuration};

pub trait InterfaceRead: AsyncReadExt + Sized + Unpin + Send + 'static {
#[inline]
async fn read_packet(&mut self, buf_size: usize) -> Result<Bytes> {
async fn read_packet(&mut self, buf_size: usize) -> Result<Packet> {
let mut buf = BytesMut::with_capacity(buf_size);
self.read_buf(&mut buf).await?;

Expand All @@ -17,14 +20,14 @@ pub trait InterfaceRead: AsyncReadExt + Sized + Unpin + Send + 'static {

pub trait InterfaceWrite: AsyncWriteExt + Sized + Unpin + Send + 'static {
#[inline]
async fn write_packet(&mut self, packet_data: &Bytes) -> Result<()> {
self.write_all(packet_data).await?;
async fn write_packet(&mut self, packet: &Packet) -> Result<()> {
self.write_all(&packet.0).await?;

Ok(())
}

#[inline]
async fn write_packets(&mut self, packets: &[Bytes]) -> Result<()> {
async fn write_packets(&mut self, packets: &[Packet]) -> Result<()> {
// TODO: Implement this using write_vectored when it actually works
for packet in packets {
self.write_packet(packet).await?;
Expand Down Expand Up @@ -65,3 +68,40 @@ impl Interface for AsyncDevice {
Ok(interface)
}
}

#[derive(Debug, Clone)]

Check warning on line 72 in src/interface.rs

View check run for this annotation

Codecov / codecov/patch

src/interface.rs#L72

Added line #L72 was not covered by tests
pub struct Packet(Bytes);

impl Packet {
pub fn new(data: Bytes) -> Self {
Self(data)
}

pub fn destination(&self) -> Result<IpAddr> {
let headers = PacketHeaders::from_ip_slice(&self.0).context("failed to parse IP packet")?;
let net_header = headers.net.ok_or(anyhow!("no network header"))?;

match net_header {
NetHeaders::Ipv4(header, _) => Ok(header.destination.into()),
NetHeaders::Ipv6(header, _) => Ok(header.destination.into()),

Check warning on line 86 in src/interface.rs

View check run for this annotation

Codecov / codecov/patch

src/interface.rs#L86

Added line #L86 was not covered by tests
}
}
}

impl From<BytesMut> for Packet {
fn from(data: BytesMut) -> Self {
Self::new(data.freeze())
}
}

impl From<Bytes> for Packet {
fn from(data: Bytes) -> Self {
Self::new(data)
}
}

impl From<Packet> for Bytes {
fn from(packet: Packet) -> Self {
packet.0
}
}
12 changes: 6 additions & 6 deletions src/server/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{auth::server::AuthServer, utils::tasks::abort_all};
use crate::{auth::server::AuthServer, interface::Packet, utils::tasks::abort_all};
use anyhow::{anyhow, Error, Result};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
Expand All @@ -15,7 +15,7 @@ pub struct QuincyConnection {
connection: Connection,
username: Option<String>,
client_address: Option<IpNet>,
ingress_queue: Sender<Bytes>,
ingress_queue: Sender<Packet>,
}

impl QuincyConnection {
Expand All @@ -24,7 +24,7 @@ impl QuincyConnection {
/// ### Arguments
/// - `connection` - the underlying QUIC connection
/// - `tun_queue` - the queue to send data to the TUN interface
pub fn new(connection: Connection, tun_queue: Sender<Bytes>) -> Self {
pub fn new(connection: Connection, tun_queue: Sender<Packet>) -> Self {
Self {
connection,
username: None,
Expand Down Expand Up @@ -106,12 +106,12 @@ impl QuincyConnection {
/// Processes incoming data and sends it to the TUN interface queue.
async fn process_incoming_data(
connection: Connection,
ingress_queue: Sender<Bytes>,
ingress_queue: Sender<Packet>,
) -> Result<()> {
loop {
let data = connection.read_datagram().await?;
let packet = connection.read_datagram().await?.into();

ingress_queue.send(data).await?;
ingress_queue.send(packet).await?;
}
}

Expand Down
104 changes: 72 additions & 32 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ use crate::utils::socket::bind_socket;
use anyhow::{Context, Result};
use bytes::Bytes;
use dashmap::DashMap;
use etherparse::{NetHeaders, PacketHeaders};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use ipnet::Ipv4Net;
use quinn::{Endpoint, VarInt};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
use crate::interface::{Interface, InterfaceRead, InterfaceWrite};
use crate::interface::{Interface, InterfaceRead, InterfaceWrite, Packet};
use crate::utils::tasks::abort_all;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -54,7 +53,7 @@ impl QuincyServer {
}

/// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections.
pub async fn run<I: Interface>(self) -> Result<()> {
pub async fn run<I: Interface>(&self) -> Result<()> {
let interface_address =
Ipv4Net::with_netmask(self.config.address_tunnel, self.config.address_mask)?.into();

Expand All @@ -76,7 +75,12 @@ impl QuincyServer {
self.connection_queues.clone(),
self.config.connection.mtu as usize,
)),
tokio::spawn(Self::process_inbound_traffic(tun_write, receiver)),
tokio::spawn(Self::process_inbound_traffic(
self.connection_queues.clone(),
tun_write,
receiver,
self.config.isolate_clients,
)),
]);

let handler_task = self.handle_connections(auth_server, sender);
Expand All @@ -99,7 +103,7 @@ impl QuincyServer {
async fn handle_connections(
&self,
auth_server: AuthServer,
ingress_queue: Sender<Bytes>,
ingress_queue: Sender<Packet>,
) -> Result<()> {
let endpoint = self.create_quinn_endpoint()?;

Expand Down Expand Up @@ -218,60 +222,96 @@ impl QuincyServer {
debug!("Started tunnel outbound traffic task (interface -> connection queue)");

loop {
let buf = tun_read.read_packet(buffer_size).await?;

let headers = match PacketHeaders::from_ip_slice(&buf) {
Ok(headers) => headers,
let packet = tun_read.read_packet(buffer_size).await?;
let dest_addr = match packet.destination() {
Ok(addr) => addr,
Err(e) => {
warn!("Failed to parse IP packet: {e}");
continue;
}
};

let net_header = match headers.net {
Some(net) => net,
None => {
warn!("Received a packet with invalid IP header");
warn!("Received packet with malformed header structure: {e}");

Check warning on line 229 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L229

Added line #L229 was not covered by tests
continue;
}
};

let dest_addr: IpAddr = match net_header {
NetHeaders::Ipv4(header, _) => header.destination.into(),
NetHeaders::Ipv6(header, _) => header.destination.into(),
};
debug!("Destination address for packet: {dest_addr}");

let connection_queue = match connection_queues.get(&dest_addr) {
Some(connection_queue) => connection_queue,
None => continue,
};

debug!("Found connection for IP {dest_addr}");

connection_queue.send(buf).await?;
connection_queue.send(packet.into()).await?;
}
}

/// Reads data from the QUIC connection and sends it to the TUN interface worker.
///
/// ### Arguments
/// - `connection_queues` - the queues for sending data to the QUIC connections
/// - `tun_write` - the write half of the TUN interface
/// - `ingress_queue` - the queue for sending data to the TUN interface
async fn process_inbound_traffic(
mut tun_write: impl InterfaceWrite,
mut ingress_queue: Receiver<Bytes>,
connection_queues: ConnectionQueues,
tun_write: impl InterfaceWrite,
ingress_queue: Receiver<Packet>,
isolate_clients: bool,
) -> Result<()> {
debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");

let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
if isolate_clients {
relay_isolated(tun_write, ingress_queue).await
} else {
relay_unisolated(connection_queues, tun_write, ingress_queue).await

Check warning on line 264 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L264

Added line #L264 was not covered by tests
}
}

Check warning on line 266 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L266

Added line #L266 was not covered by tests
}

#[inline]
async fn relay_isolated(
mut tun_write: impl InterfaceWrite,
mut ingress_queue: Receiver<Packet>,
) -> Result<()> {
let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);

loop {
packets.clear();
ingress_queue
.recv_many(&mut packets, PACKET_BUFFER_SIZE)
.await;
loop {
packets.clear();
ingress_queue
.recv_many(&mut packets, PACKET_BUFFER_SIZE)
.await;

tun_write.write_packets(&packets).await?;
tun_write.write_packets(&packets).await?;
}
}

Check warning on line 284 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L283-L284

Added lines #L283 - L284 were not covered by tests

#[inline]
async fn relay_unisolated(
connection_queues: ConnectionQueues,
mut tun_write: impl InterfaceWrite,
mut ingress_queue: Receiver<Packet>,
) -> Result<()> {
let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);

Check warning on line 292 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L287-L292

Added lines #L287 - L292 were not covered by tests

loop {
packets.clear();
ingress_queue
.recv_many(&mut packets, PACKET_BUFFER_SIZE)
.await;

Check warning on line 298 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L294-L298

Added lines #L294 - L298 were not covered by tests

for packet in &packets {
let dest_addr = match packet.destination() {
Ok(addr) => addr,
Err(e) => {
warn!("Received packet with malformed header structure: {e}");
continue;

Check warning on line 305 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L300-L305

Added lines #L300 - L305 were not covered by tests
}
};

match connection_queues.get(&dest_addr) {

Check warning on line 309 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L309

Added line #L309 was not covered by tests
// Send the packet to the appropriate QUIC connection
Some(connection_queue) => connection_queue.send(packet.clone().into()).await?,

Check warning on line 311 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L311

Added line #L311 was not covered by tests
// Send the packet to the TUN interface
None => tun_write.write_packet(packet).await?,

Check warning on line 313 in src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/server/mod.rs#L313

Added line #L313 was not covered by tests
}
}
}
}

0 comments on commit e099dc3

Please sign in to comment.