Skip to content

Commit

Permalink
refactor: improve error handling (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
M0dEx authored Dec 6, 2023
1 parent d57c212 commit 015a16d
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 404 deletions.
207 changes: 131 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quincy"
version = "0.4.3"
version = "0.5.0"
authors = ["Jakub Kubík <[email protected]>"]
license = "MIT"
description = "QUIC-based VPN"
Expand Down Expand Up @@ -42,14 +42,15 @@ libc = "^0.2.147"
# Tokio
tokio = { version = "^1.25", features = ["rt-multi-thread", "macros", "sync", "io-util"] }
dashmap = "^5.4"
futures = "^0.3.17"

# Configuration
figment = { version = "^0.10.8", features = ["toml", "env"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"

# TLS
rustls = "^0.21.1"
rustls = "^0.21.0"
rustls-pemfile = "^1.0"

# Authentication
Expand All @@ -63,7 +64,6 @@ tracing-subscriber = { version = "^0.3.17", features = ["env-filter"] }
# Utils
time = "^0.3.23"
anyhow = "^1.0"
delegate = "^0.10.0"
clap = { version = "^4.1", features = ["derive"] }
once_cell = "^1.17"

Expand Down
47 changes: 25 additions & 22 deletions src/auth/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,68 +19,72 @@ pub enum AuthServerMessage {
}

/// Represents an authentication server handling initial authentication and session management.
pub struct AuthServer {
user_database: Arc<UserDatabase>,
username: Option<String>,
pub struct AuthServer<'a> {
user_database: &'a UserDatabase,
client_address: IpNet,
connection: Arc<Connection>,
auth_timeout: Duration,
}

impl AuthServer {
impl<'a> AuthServer<'a> {
pub fn new(
user_database: Arc<UserDatabase>,
user_database: &'a UserDatabase,
connection: Arc<Connection>,
client_address: IpNet,
auth_timeout: Duration,
) -> Self {
Self {
user_database,
username: None,
client_address,
connection,
auth_timeout,
}
}

/// Handles authentication for a client.
pub async fn handle_authentication(&mut self) -> Result<()> {
pub async fn handle_authentication(&self) -> Result<String> {
let (send_stream, mut recv_stream) =
match timeout(self.auth_timeout, self.connection.accept_bi()).await {
Ok(Ok(streams)) => streams,
Ok(Err(e)) => return Err(e.into()),
Err(_) => return self.handle_failure(AUTH_TIMEOUT_MESSAGE, None).await,
Err(_) => {
return Err(self
.handle_failure(AUTH_TIMEOUT_MESSAGE, None)
.await
.expect_err("Handle failure always returns an error"))
}
};

match timeout(self.auth_timeout, Self::recv_message(&mut recv_stream)).await {
Ok(Ok(AuthClientMessage::Authentication(username, password))) => {
self.authenticate_user(send_stream, username, password)
.await
}
Ok(Err(_)) => {
self.handle_failure(AUTH_FAILED_MESSAGE, Some(send_stream))
.await
}
Err(_) => {
self.handle_failure(AUTH_TIMEOUT_MESSAGE, Some(send_stream))
.await
}
Ok(Err(_)) => Err(self
.handle_failure(AUTH_FAILED_MESSAGE, Some(send_stream))
.await
.expect_err("Handle failure always returns an error")),
Err(_) => Err(self
.handle_failure(AUTH_TIMEOUT_MESSAGE, Some(send_stream))
.await
.expect_err("Handle failure always returns an error")),
}
}

/// Authenticates a user with the given username and password.
async fn authenticate_user(
&mut self,
&self,
mut send_stream: SendStream,
username: String,
password: String,
) -> Result<()> {
) -> Result<String> {
let auth_result = self.user_database.authenticate(&username, password).await;

if auth_result.is_err() {
return self
return Err(self
.handle_failure(AUTH_FAILED_MESSAGE, Some(send_stream))
.await;
.await
.expect_err("Handle failure always returns an error"));
}

let response = AuthServerMessage::Authenticated(
Expand All @@ -89,9 +93,8 @@ impl AuthServer {
);

Self::send_message(&mut send_stream, response).await?;
self.username.replace(username);

Ok(())
Ok(username)
}

/// Handles a failure during authentication.
Expand Down
1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl QuincyClient {

let assigned_address = auth_client.authenticate().await?;

info!("Successfully authenticated");
info!("Received client address: {assigned_address}");

let interface = I::create(assigned_address, self.client_config.connection.mtu)?;
Expand Down
9 changes: 3 additions & 6 deletions src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ pub trait InterfaceWrite: AsyncWriteExt + Sized + Unpin + Send + 'static {
#[inline]
async fn write_packet(&mut self, packet_data: Bytes) -> Result<()> {
#[cfg(target_os = "macos")]
let packet_data = prepend_packet_info_header(packet_data)?;

#[cfg(not(target_os = "macos"))]
let packet_data = packet_data;
let packet_data = prepend_packet_info_header(&packet_data)?;

self.write_all(&packet_data).await?;

Expand All @@ -45,14 +42,14 @@ pub fn truncate_packet_info_header(data: Bytes) -> Bytes {

#[cfg(target_os = "macos")]
#[inline]
pub fn prepend_packet_info_header(data: Bytes) -> Result<Bytes> {
pub fn prepend_packet_info_header(data: &Bytes) -> Result<Bytes> {
use crate::constants::DARWIN_PI_HEADER_IPV4;
use crate::constants::DARWIN_PI_HEADER_IPV6;
use anyhow::anyhow;
use etherparse::IpHeader;
use etherparse::PacketHeaders;

let packet_headers = PacketHeaders::from_ip_slice(&data)?;
let packet_headers = PacketHeaders::from_ip_slice(data)?;
let ip_header = packet_headers
.ip
.ok_or_else(|| anyhow!("Received packet with invalid IP header"))?;
Expand Down
68 changes: 31 additions & 37 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::config::ServerConfig;
use crate::interface::Interface;
use crate::server::tunnel::QuincyTunnel;
use crate::{config::ServerConfig, constants::CLEANUP_INTERVAL};
use anyhow::Result;
use dashmap::DashMap;
use tokio::time::sleep;
use tracing::{error, info};
use anyhow::{anyhow, Result};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tracing::error;

pub mod address_pool;
pub mod connection;
pub mod tunnel;

/// Represents a Quincy server with multiple underlying Quincy tunnels.
pub struct QuincyServer {
active_tunnels: DashMap<String, QuincyTunnel>,
active_tunnels: Vec<QuincyTunnel>,
}

impl QuincyServer {
Expand All @@ -21,46 +21,40 @@ impl QuincyServer {
/// ### Arguments
/// - `config` - the configuration for the server
pub fn new(config: ServerConfig) -> Result<Self> {
let tunnels = DashMap::new();

for (name, tunnel_config) in config.tunnels.iter() {
let tunnel =
QuincyTunnel::new(name.clone(), tunnel_config.clone(), &config.connection)?;

tunnels.insert(name.clone(), tunnel);
}
let tunnels = config
.tunnels
.into_iter()
.flat_map(|(name, tunnel_config)| {
QuincyTunnel::new(name, tunnel_config, &config.connection)
})
.collect();

Ok(Self {
active_tunnels: tunnels,
})
}

/// Starts the Quincy server and all of its underlying tunnels.
pub async fn run<I: Interface>(&self) -> Result<()> {
for mut entry in self.active_tunnels.iter_mut() {
let tunnel = entry.value_mut();

tunnel.start::<I>().await?;
}
pub async fn run<I: Interface>(self) -> Result<()> {
let mut tunnel_tasks = self
.active_tunnels
.into_iter()
.map(|tunnel| tokio::spawn(tunnel.run::<I>()))
.collect::<FuturesUnordered<_>>();

loop {
for mut entry in self.active_tunnels.iter_mut() {
let tunnel_name = entry.key().to_owned();
let tunnel = entry.value_mut();

if tunnel.is_ok() {
continue;
}

error!("Tunnel '{tunnel_name}' encountered an error, restarting...");

tunnel.stop().await?;
tunnel.start::<I>().await?;

info!("Tunnel '{tunnel_name}' restarted successfully");
}

sleep(CLEANUP_INTERVAL).await;
let (tunnel, task_result) = match tunnel_tasks.next().await {
Some(tunnel) => tunnel??,
None => return Err(anyhow!("No tunnels are running")),
};

error!(
"Tunnel {} has encountered an error: {:?}",
tunnel.name,
task_result.expect_err("Tunnel task always returns an error")
);

tunnel_tasks.push(tokio::spawn(tunnel.run::<I>()));
}
}
}
6 changes: 3 additions & 3 deletions src/server/address_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl AddressPool {
///
/// ### Arguments
/// - `address` - the address to release
pub fn release_address(&self, address: IpAddr) {
self.used_addresses.remove(&address);
pub fn release_address(&self, address: &IpAddr) {
self.used_addresses.remove(address);
}

/// Resets the address pool by releasing all addresses.
Expand Down Expand Up @@ -89,7 +89,7 @@ mod tests {
);

assert_eq!(pool.next_available_address(), None);
pool.release_address(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)));
pool.release_address(&IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)));

assert_eq!(
pool.next_available_address().unwrap(),
Expand Down
Loading

0 comments on commit 015a16d

Please sign in to comment.