From 4b9f64ce13034038246b15da37bf444b478b6369 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Thu, 7 Dec 2023 15:50:17 +1300 Subject: [PATCH 01/19] Testing dedicated host --- src/session/models/game_manager.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index 742a047..78e2566 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -1,3 +1,5 @@ +use std::net::Ipv4Addr; + use bitflags::bitflags; use serde::Serialize; use tdf::{ @@ -798,14 +800,22 @@ impl TdfSerialize for GameSetupResponse<'_> { // Topology host network list (The heat bug is present so this encoded as a group even though its a union) w.tag_list_start(b"HNET", TdfType::Group, 1); - if let NetworkAddress::AddressPair(pair) = &host.net.addr { - w.write_byte(2 /* Address pair type */); - TdfSerialize::serialize(pair, w) - } else { - // Uh oh.. host networking is missing...? - w.write_byte(TAGGED_UNSET_KEY); - w.write_byte(0); - } + // Forced local host for test dedicated server + w.write_byte(3); + let v = super::PairAddress { + addr: Ipv4Addr::LOCALHOST, + port: 5679, + }; + TdfSerialize::serialize(&v, w); + + // if let NetworkAddress::AddressPair(pair) = &host.net.addr { + // w.write_byte(2 /* Address pair type */); + // TdfSerialize::serialize(pair, w) + // } else { + // // Uh oh.. host networking is missing...? + // w.write_byte(TAGGED_UNSET_KEY); + // w.write_byte(0); + // } } // Host session ID @@ -822,7 +832,7 @@ impl TdfSerialize for GameSetupResponse<'_> { w.tag_bool(b"NRES", false); // Game network topology - w.tag_alt(b"NTOP", GameNetworkTopology::PeerHosted); + w.tag_alt(b"NTOP", GameNetworkTopology::Dedicated); // Persisted Game id for the game, used only when game setting's enablePersistedGameIds is true. w.tag_str_empty(b"PGID"); From 1d8490906944cfa0804a8eaa279e2ecc971ca7d7 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Fri, 8 Dec 2023 13:35:40 +1300 Subject: [PATCH 02/19] Experimental test tunneling --- src/main.rs | 8 +- src/routes/mod.rs | 1 + src/routes/server.rs | 53 ++++++- src/services/game/manager.rs | 25 ++- src/services/game/mod.rs | 10 +- src/services/mod.rs | 1 + src/services/retriever/mod.rs | 2 +- src/services/tunnel/mod.rs | 238 +++++++++++++++++++++++++++++ src/session/mod.rs | 2 +- src/session/models/game_manager.rs | 2 +- 10 files changed, 330 insertions(+), 12 deletions(-) create mode 100644 src/services/tunnel/mod.rs diff --git a/src/main.rs b/src/main.rs index 6fb96d0..ebf0bb2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,9 @@ use crate::{ config::{RuntimeConfig, VERSION}, - services::{game::manager::GameManager, retriever::Retriever, sessions::Sessions}, + services::{ + game::manager::GameManager, retriever::Retriever, sessions::Sessions, tunnel::TunnelService, + }, utils::signing::SigningKey, }; use axum::{Extension, Server}; @@ -55,7 +57,8 @@ async fn main() { SigningKey::global() ); - let game_manager = Arc::new(GameManager::new()); + let tunnel_service = Arc::new(TunnelService::new()); + let game_manager = Arc::new(GameManager::new(tunnel_service.clone())); let sessions = Arc::new(Sessions::new(signing_key)); let config = Arc::new(runtime_config); let retriever = Arc::new(retriever); @@ -79,6 +82,7 @@ async fn main() { .layer(Extension(router)) .layer(Extension(game_manager)) .layer(Extension(sessions)) + .layer(Extension(tunnel_service)) .into_make_service_with_connect_info::(); info!("Starting server on {} (v{})", addr, VERSION); diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 11b5106..00e0c34 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -96,6 +96,7 @@ pub fn router() -> Router { .route("/", get(server::server_details)) .route("/log", get(server::get_log).delete(clear_log)) .route("/upgrade", get(server::upgrade)) + .route("/tunnel", get(server::tunnel)) .route("/telemetry", post(server::submit_telemetry)) .route("/dashboard", get(server::dashboard_details)), ) diff --git a/src/routes/server.rs b/src/routes/server.rs index ee14355..4813798 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -5,7 +5,10 @@ use crate::{ config::{RuntimeConfig, VERSION}, database::entities::players::PlayerRole, middleware::{auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade}, - services::sessions::Sessions, + services::{ + sessions::Sessions, + tunnel::{Tunnel, TunnelCodec, TunnelService}, + }, session::{router::BlazeRouter, Session}, utils::logging::LOG_FILE_NAME, }; @@ -19,6 +22,7 @@ use log::{debug, error}; use serde::{Deserialize, Serialize}; use std::{net::Ipv4Addr, sync::Arc}; use tokio::fs::{read_to_string, OpenOptions}; +use tokio_util::codec::Framed; /// Response detailing the information about this Pocket Relay server /// contains the version information as well as the server information @@ -105,6 +109,53 @@ pub async fn handle_upgrade( Session::start(upgraded, addr, router, sessions).await; } +/// GET /api/server/tunnel +/// +/// Handles upgrading connections from the Pocket Relay Client tool +/// from HTTP over to the Blaze protocol for proxing the game traffic +/// as blaze sessions using HTTP Upgrade +pub async fn tunnel( + IpAddress(addr): IpAddress, + Extension(router): Extension>, + Extension(tunnel_service): Extension>, + Upgrade(upgrade): Upgrade, +) -> Response { + // Spawn the upgrading process to its own task + tokio::spawn(handle_upgrade_tunnel(upgrade, addr, router, tunnel_service)); + + // Let the client know to upgrade its connection + ( + // Switching protocols status code + StatusCode::SWITCHING_PROTOCOLS, + // Headers required for upgrading + [(header::CONNECTION, "upgrade"), (header::UPGRADE, "tunnel")], + ) + .into_response() +} + +/// Handles upgrading a connection and starting a new session +/// from the connection +pub async fn handle_upgrade_tunnel( + upgrade: OnUpgrade, + addr: Ipv4Addr, + router: Arc, + tunnel_service: Arc, +) { + let upgraded = match upgrade.await { + Ok(upgraded) => upgraded, + Err(err) => { + error!("Failed to upgrade client connection: {}", err); + return; + } + }; + + let handle = Tunnel::start( + tunnel_service.clone(), + Framed::new(upgraded, TunnelCodec::default()), + ); + tunnel_service.set_tunnel(addr, handle); +} + /// GET /api/server/log /// /// Responds with the server log file contents diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 612eea3..04fc527 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -1,5 +1,6 @@ use super::{rules::RuleSet, AttrMap, Game, GameJoinableState, GamePlayer, GameRef, GameSnapshot}; use crate::{ + services::tunnel::TunnelService, session::{ models::game_manager::{ AsyncMatchmakingStatus, GameSettings, GameSetupContext, MatchmakingResult, @@ -37,6 +38,7 @@ pub struct GameManager { next_id: AtomicU32, /// Matchmaking entry queue queue: Mutex>, + tunnel_service: Arc, } /// Entry into the matchmaking queue @@ -56,11 +58,12 @@ impl GameManager { const MAX_RELEASE_ATTEMPTS: u8 = 20; /// Starts a new game manager service returning its link - pub fn new() -> Self { + pub fn new(tunnel_service: Arc) -> Self { Self { games: Default::default(), next_id: AtomicU32::new(1), queue: Default::default(), + tunnel_service, } } @@ -140,10 +143,16 @@ impl GameManager { context: GameSetupContext, ) { // Add the player to the game - let game_id = { + let (game_id, index) = { let game = &mut *game_ref.write().await; - game.add_player(player, context); - game.id + let slot = game.add_player(player, context); + (game.id, slot) + }; + + let addr = session.addr.clone(); + let handle = self.tunnel_service.get_tunnel(addr); + if let Some(handle) = handle { + self.tunnel_service.set_pool_handle(game_id, index, handle); }; // Update the player current game @@ -188,7 +197,13 @@ impl GameManager { setting: GameSettings, ) -> (GameRef, GameID) { let id = self.next_id.fetch_add(1, Ordering::AcqRel); - let game = Game::new(id, attributes, setting, self.clone()); + let game = Game::new( + id, + attributes, + setting, + self.clone(), + self.tunnel_service.clone(), + ); let link = Arc::new(RwLock::new(game)); { let games = &mut *self.games.write().await; diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index 606916d..bbc1e25 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -27,6 +27,8 @@ use std::sync::{Arc, Weak}; use tdf::{ObjectId, TdfMap, TdfSerializer}; use tokio::sync::RwLock; +use super::tunnel::TunnelService; + pub mod manager; pub mod rules; @@ -49,6 +51,8 @@ pub struct Game { /// Services access pub game_manager: Arc, + + pub tunnel_service: Arc, } /// Snapshot of the current game state and players @@ -211,6 +215,7 @@ impl Game { attributes: AttrMap, settings: GameSettings, game_manager: Arc, + tunnel_service: Arc, ) -> Game { Game { id, @@ -219,6 +224,7 @@ impl Game { state: Default::default(), players: Default::default(), game_manager, + tunnel_service, } } @@ -227,7 +233,7 @@ impl Game { data.into() } - pub fn add_player(&mut self, player: GamePlayer, context: GameSetupContext) { + pub fn add_player(&mut self, player: GamePlayer, context: GameSetupContext) -> usize { let slot = self.players.len(); // Update other players with the client details @@ -261,6 +267,8 @@ impl Game { context, }, )); + + slot } pub fn add_admin_player(&mut self, target_id: PlayerID) { diff --git a/src/services/mod.rs b/src/services/mod.rs index b497ce6..32d4f8a 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,3 +1,4 @@ pub mod game; pub mod retriever; pub mod sessions; +pub mod tunnel; diff --git a/src/services/retriever/mod.rs b/src/services/retriever/mod.rs index 8b0bdb8..1e5f74b 100644 --- a/src/services/retriever/mod.rs +++ b/src/services/retriever/mod.rs @@ -204,7 +204,7 @@ impl Retriever { match OfficialInstance::obtain().await { Ok(value) => Some(value), Err(error) => { - error!("Failed to setup redirector: {}", error); + error!("Failed to setup retriever: {}", error); None } } diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs new file mode 100644 index 0000000..9b02392 --- /dev/null +++ b/src/services/tunnel/mod.rs @@ -0,0 +1,238 @@ +//! Clients connect through this service in order to form a connection +//! with a host player without the possible NAT restrictions that may +//! occur on stricter NATs +//! +//! +//! +//! +//! Client(s) -- Sends packets to -> Local host socket +//! +//! Local host socket -- Sends packet with index 0 --> Server +//! +//! Server -- Forwards packets to --> Host local pool +//! +//! Host local pool -- Sends packet pretending to be the other client --> Host +//! +//! Host -- Sends reply to --> Host local pool +//! +//! Host local pool -- Sends reply with index --> Server +//! +//! Server -- Forwards packets to index --> Client +//! + +use std::{ + io, + net::Ipv4Addr, + sync::{ + atomic::{AtomicU32, AtomicUsize}, + Arc, + }, +}; + +use bytes::{Buf, BufMut, Bytes}; +use futures_util::{SinkExt, StreamExt}; +use hashbrown::HashMap; +use hyper::upgrade::Upgraded; +use log::debug; +use parking_lot::Mutex; +use tokio::{select, sync::mpsc}; +use tokio_util::codec::{Decoder, Encoder, Framed}; + +use crate::utils::types::GameID; + +pub struct TunnelService { + /// Mapping between host addreses and access to their tunnel + pub tunnels: Mutex>, + /// Tunnel pooling allocated for games + pub pools: Mutex>>>, + /// Mapping for which game a tunnel is connected to + pub mapping: Mutex>, +} + +impl TunnelService { + pub fn new() -> Self { + Self { + tunnels: Mutex::new(Default::default()), + pools: Mutex::new(Default::default()), + mapping: Mutex::new(Default::default()), + } + } +} + +static TUNNEL_ID: AtomicU32 = AtomicU32::new(1); + +/// Represents a pool +pub struct TunnelPool { + pub handles: [Option; 4], +} + +/// Handle for sending messages to a tunnel +#[derive(Clone)] +pub struct TunnelHandle(mpsc::UnboundedSender); + +pub struct Tunnel { + service: Arc, + id: u32, + /// The IO tunnel used to send information to the host and recieve + /// respones + io: Framed, + /// Reciever for messages that should be written to the tunnel + rx: mpsc::UnboundedReceiver, +} + +impl Tunnel { + pub fn start(service: Arc, io: Framed) -> TunnelHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let id = TUNNEL_ID.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + let tunnel = Tunnel { + service, + id, + io, + rx, + }; + + tokio::spawn(async move { + tunnel.handle().await; + }); + + TunnelHandle(tx) + } + + pub async fn handle(mut self) { + loop { + select! { + message = self.io.next() => { + if let Some(Ok(message)) = message { + + let index =message.index as usize; + debug!("Message for {index}"); + if let Some(pool) = self.service.get_pool_for(self.id) { + let pool = &mut *pool.lock(); + if let Some(Some(handle)) = pool.handles.get(index) { + handle.0.send(message).unwrap(); + } + } + } else { + debug!("Dropping tunnel"); + break; + } + } + + message = self.rx.recv() => { + if let Some(message) = message { + debug!("Outgoing message"); + self.io.send(message).await.unwrap(); + } + } + } + } + } +} + +impl TunnelService { + pub fn get_pool_for(&self, tunnel_id: u32) -> Option>> { + let game_id = *self.mapping.lock().get(&tunnel_id)?; + self.pools.lock().get(&game_id).cloned() + } + + /// Gets the tunnel for the provided IP address if one is present + pub fn get_tunnel(&self, addr: Ipv4Addr) -> Option { + let tunnels = &*self.tunnels.lock(); + tunnels.get(&addr).cloned() + } + + pub fn set_tunnel(&self, addr: Ipv4Addr, tunnel: TunnelHandle) { + let tunnels = &mut *self.tunnels.lock(); + tunnels.insert(addr, tunnel); + } + + pub fn remove_tunnel(&self, addr: Ipv4Addr) { + let tunnels = &mut *self.tunnels.lock(); + tunnels.remove(&addr); + } + + /// Sets the handle at the provided index within a pool to the provided handle + pub fn set_pool_handle(&self, game_id: GameID, index: usize, handle: TunnelHandle) { + let pools = &mut *self.pools.lock(); + + // Get the existing pool or insert a new one + let pool = pools.entry(game_id).or_insert_with(|| { + let handles = [None, None, None, None]; + Arc::new(Mutex::new(TunnelPool { handles })) + }); + + let pool = &mut *pool.lock(); + if let Some(pool_handle) = pool.handles.get_mut(index) { + *pool_handle = Some(handle); + } + } +} + +/// Partially decoded tunnnel message +pub struct TunnelMessagePartial { + pub index: u8, + pub length: u32, +} + +/// Message sent through the tunnel +pub struct TunnelMessage { + /// Socket index to use + pub index: u8, + + /// The message contents + pub message: Bytes, +} + +#[derive(Default)] +pub struct TunnelCodec { + partial: Option, +} + +impl Decoder for TunnelCodec { + type Item = TunnelMessage; + type Error = std::io::Error; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + let partial = match self.partial.as_mut() { + Some(value) => value, + None => { + // Not enough room for a partial frame + if src.len() < 5 { + return Ok(None); + } + let index = src.get_u8(); + let length = src.get_u32(); + + self.partial.insert(TunnelMessagePartial { index, length }) + } + }; + // Not enough data for the partial frame + if src.len() < partial.length as usize { + return Ok(None); + } + + let partial = self.partial.take().expect("Partial frame missing"); + let bytes = src.split_to(partial.length as usize); + + Ok(Some(TunnelMessage { + index: partial.index, + message: bytes.freeze(), + })) + } +} + +impl Encoder for TunnelCodec { + type Error = io::Error; + + fn encode( + &mut self, + item: TunnelMessage, + dst: &mut bytes::BytesMut, + ) -> Result<(), Self::Error> { + dst.put_u8(item.index); + dst.put_u32(item.message.len() as u32); + dst.extend_from_slice(&item.message); + Ok(()) + } +} diff --git a/src/session/mod.rs b/src/session/mod.rs index a9cd13b..bcdd49c 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -56,7 +56,7 @@ pub type WeakSessionLink = Weak; pub struct Session { id: u32, - addr: Ipv4Addr, + pub addr: Ipv4Addr, busy_lock: QueueLock, tx: mpsc::UnboundedSender, data: Mutex>, diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index 78e2566..33c57e5 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -804,7 +804,7 @@ impl TdfSerialize for GameSetupResponse<'_> { w.write_byte(3); let v = super::PairAddress { addr: Ipv4Addr::LOCALHOST, - port: 5679, + port: 42132, }; TdfSerialize::serialize(&v, w); From 7a2b543892108252d88c55d6e76c64d51fe00a45 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Sun, 10 Dec 2023 12:51:15 +1300 Subject: [PATCH 03/19] Added ids to tunnel handles for mapping, update outgoing message indexes to ensure correct sending --- src/services/tunnel/mod.rs | 64 +++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 9b02392..2215f7c 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -68,7 +68,10 @@ pub struct TunnelPool { /// Handle for sending messages to a tunnel #[derive(Clone)] -pub struct TunnelHandle(mpsc::UnboundedSender); +pub struct TunnelHandle { + id: u32, + tx: mpsc::UnboundedSender, +} pub struct Tunnel { service: Arc, @@ -96,36 +99,47 @@ impl Tunnel { tunnel.handle().await; }); - TunnelHandle(tx) + TunnelHandle { id, tx } } pub async fn handle(mut self) { loop { select! { - message = self.io.next() => { - if let Some(Ok(message)) = message { - - let index =message.index as usize; - debug!("Message for {index}"); - if let Some(pool) = self.service.get_pool_for(self.id) { - let pool = &mut *pool.lock(); - if let Some(Some(handle)) = pool.handles.get(index) { - handle.0.send(message).unwrap(); - } + message = self.io.next() => { + if let Some(Ok(mut message)) = message { + + let index =message.index as usize; + debug!("Message for {index}"); + if let Some(pool) = self.service.get_pool_for(self.id) { + let pool = &mut *pool.lock(); + let self_handle_index = pool.handles + .iter() + .flatten() + .position(|handle| handle.id == self.id) + .unwrap(); + if let Some(Some(handle)) = pool.handles.get(index) { + debug!("Sending message as {}", self_handle_index as u8); + message.index = self_handle_index as u8; + handle.tx.send(message).unwrap(); + } else { + debug!("Handle not found"); } } else { - debug!("Dropping tunnel"); - break; - } + debug!("Pool not found"); } + } else { + debug!("Dropping tunnel"); + break; + } + } - message = self.rx.recv() => { - if let Some(message) = message { - debug!("Outgoing message"); - self.io.send(message).await.unwrap(); - } - } + message = self.rx.recv() => { + if let Some(message) = message { + debug!("Outgoing message"); + self.io.send(message).await.unwrap(); } + } + } } } } @@ -164,7 +178,15 @@ impl TunnelService { let pool = &mut *pool.lock(); if let Some(pool_handle) = pool.handles.get_mut(index) { + // Assocate the handle with the game + { + let mapping = &mut *self.mapping.lock(); + mapping.insert(handle.id, game_id); + } + *pool_handle = Some(handle); + } else { + debug!("Tried to set unknown pool handle"); } } } From ddac6d2bdf73563af1a7e8f4474774c5f40ea189 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Sun, 10 Dec 2023 16:32:53 +1300 Subject: [PATCH 04/19] Cleaned up tunneling code, manul tunnel polling, Proper tunnel cleanup --- src/main.rs | 2 +- src/routes/server.rs | 12 +- src/services/game/manager.rs | 2 +- src/services/game/mod.rs | 6 + src/services/tunnel/mod.rs | 483 +++++++++++++++++++++++------------ 5 files changed, 332 insertions(+), 173 deletions(-) diff --git a/src/main.rs b/src/main.rs index ebf0bb2..e4685da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,7 +57,7 @@ async fn main() { SigningKey::global() ); - let tunnel_service = Arc::new(TunnelService::new()); + let tunnel_service = Arc::new(TunnelService::default()); let game_manager = Arc::new(GameManager::new(tunnel_service.clone())); let sessions = Arc::new(Sessions::new(signing_key)); let config = Arc::new(runtime_config); diff --git a/src/routes/server.rs b/src/routes/server.rs index 4813798..a091a2e 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -7,7 +7,7 @@ use crate::{ middleware::{auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade}, services::{ sessions::Sessions, - tunnel::{Tunnel, TunnelCodec, TunnelService}, + tunnel::{Tunnel, TunnelService}, }, session::{router::BlazeRouter, Session}, utils::logging::LOG_FILE_NAME, @@ -22,7 +22,6 @@ use log::{debug, error}; use serde::{Deserialize, Serialize}; use std::{net::Ipv4Addr, sync::Arc}; use tokio::fs::{read_to_string, OpenOptions}; -use tokio_util::codec::Framed; /// Response detailing the information about this Pocket Relay server /// contains the version information as well as the server information @@ -116,12 +115,11 @@ pub async fn handle_upgrade( /// as blaze sessions using HTTP Upgrade pub async fn tunnel( IpAddress(addr): IpAddress, - Extension(router): Extension>, Extension(tunnel_service): Extension>, Upgrade(upgrade): Upgrade, ) -> Response { // Spawn the upgrading process to its own task - tokio::spawn(handle_upgrade_tunnel(upgrade, addr, router, tunnel_service)); + tokio::spawn(handle_upgrade_tunnel(upgrade, addr, tunnel_service)); // Let the client know to upgrade its connection ( @@ -138,7 +136,6 @@ pub async fn tunnel( pub async fn handle_upgrade_tunnel( upgrade: OnUpgrade, addr: Ipv4Addr, - router: Arc, tunnel_service: Arc, ) { let upgraded = match upgrade.await { @@ -149,10 +146,7 @@ pub async fn handle_upgrade_tunnel( } }; - let handle = Tunnel::start( - tunnel_service.clone(), - Framed::new(upgraded, TunnelCodec::default()), - ); + let handle = Tunnel::start(tunnel_service.clone(), upgraded); tunnel_service.set_tunnel(addr, handle); } diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 04fc527..43062aa 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -149,7 +149,7 @@ impl GameManager { (game.id, slot) }; - let addr = session.addr.clone(); + let addr = session.addr; let handle = self.tunnel_service.get_tunnel(addr); if let Some(handle) = handle { self.tunnel_service.set_pool_handle(game_id, index, handle); diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index bbc1e25..5f83f71 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -342,6 +342,9 @@ impl Game { None => return, }; + // Remove the tunnel + self.tunnel_service.remove_by_slot(self.id, index as u8); + // Remove the player let player = self.players.remove(index); @@ -381,6 +384,9 @@ impl Game { // Mark the game as stopping self.state = GameState::Destructing; + // Remove the tunnel pool + self.tunnel_service.remove_pool(self.id); + if !self.players.is_empty() { warn!("Game {} was stopped with players still present", self.id); } diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 2215f7c..1e3304f 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -20,241 +20,400 @@ //! Server -- Forwards packets to index --> Client //! +use self::codec::{TunnelCodec, TunnelMessage}; +use crate::utils::hashing::IntHashMap; +use crate::utils::types::GameID; +use futures_util::{Sink, Stream}; +use hashbrown::HashMap; +use hyper::upgrade::Upgraded; +use parking_lot::Mutex; +use std::future::Future; use std::{ - io, net::Ipv4Addr, - sync::{ - atomic::{AtomicU32, AtomicUsize}, - Arc, - }, + pin::Pin, + sync::{atomic::AtomicU32, Arc}, + task::{ready, Context, Poll}, }; +use tokio::sync::mpsc; +use tokio_util::codec::Framed; -use bytes::{Buf, BufMut, Bytes}; -use futures_util::{SinkExt, StreamExt}; -use hashbrown::HashMap; -use hyper::upgrade::Upgraded; -use log::debug; -use parking_lot::Mutex; -use tokio::{select, sync::mpsc}; -use tokio_util::codec::{Decoder, Encoder, Framed}; +static TUNNEL_ID: AtomicU32 = AtomicU32::new(1); -use crate::utils::types::GameID; +// ID for a tunnel +type TunnelId = u32; +// Index into a pool of tunnels +type PoolIndex = u8; +#[derive(Default)] pub struct TunnelService { /// Mapping between host addreses and access to their tunnel - pub tunnels: Mutex>, + tunnels: Mutex>, /// Tunnel pooling allocated for games - pub pools: Mutex>>>, + pools: Mutex>, /// Mapping for which game a tunnel is connected to - pub mapping: Mutex>, + mapping: Mutex, } -impl TunnelService { - pub fn new() -> Self { - Self { - tunnels: Mutex::new(Default::default()), - pools: Mutex::new(Default::default()), - mapping: Mutex::new(Default::default()), +/// Stores mappings between tunnels and game slots and +/// the inverse +#[derive(Default)] +pub struct TunnelMapping { + /// Mapping from tunnel IDs to game slots + tunnel_to_slot: IntHashMap, + /// Mapping from game slots to tunnel IDs + slot_to_tunnel: HashMap<(GameID, PoolIndex), TunnelId>, +} + +impl TunnelMapping { + pub fn insert(&mut self, tunnel_id: TunnelId, game_id: GameID, pool_index: PoolIndex) { + self.tunnel_to_slot.insert(tunnel_id, (game_id, pool_index)); + self.slot_to_tunnel.insert((game_id, pool_index), tunnel_id); + } + + pub fn remove_by_slot(&mut self, game_id: GameID, pool_index: PoolIndex) -> Option { + if let Some(tunnel_id) = self.slot_to_tunnel.remove(&(game_id, pool_index)) { + self.tunnel_to_slot.remove(&tunnel_id); + + Some(tunnel_id) + } else { + None + } + } + + pub fn remove_by_tunnel(&mut self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { + if let Some(key) = self.tunnel_to_slot.remove(&tunnel_id) { + self.slot_to_tunnel.remove(&key); + Some(key) + } else { + None } } + + pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { + self.tunnel_to_slot + // Find the mapping for the tunnel + .get(&tunnel_id) + // Take a copy of the values if present + .copied() + } } -static TUNNEL_ID: AtomicU32 = AtomicU32::new(1); +impl TunnelService { + // Removes a game from the pool + #[inline] + pub fn remove_pool(&self, pool: GameID) { + self.pools.lock().remove(&pool); + } + + #[inline] + pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { + self.mapping.lock().get_by_tunnel(tunnel_id) + } + + pub fn remove_by_slot(&self, game_id: GameID, pool_index: PoolIndex) { + self.mapping.lock().remove_by_slot(game_id, pool_index); + + // Remove the handle from its associated pool + let pools = &mut *self.pools.lock(); + if let Some(pool) = pools.get_mut(&game_id) { + if let Some(handle) = pool.handles.get_mut(pool_index as usize) { + *handle = None; + } + } + } + + pub fn remove_by_tunnel(&self, tunnel_id: TunnelId) { + if let Some((game_id, pool_index)) = self.mapping.lock().remove_by_tunnel(tunnel_id) { + // Remove the handle from its associated pool + let pools = &mut *self.pools.lock(); + if let Some(pool) = pools.get_mut(&game_id) { + if let Some(handle) = pool.handles.get_mut(pool_index as usize) { + *handle = None; + } + } + } + } + + pub fn get_pool_handle(&self, pool: GameID, index: PoolIndex) -> Option { + // Access the pools map + let pools = &*self.pools.lock(); + // Ge the pool for the `pool` + let pool = pools.get(&pool)?; + // Get the handle + pool.handles.get(index as usize)?.clone() + } + + /// Gets the tunnel for the provided IP address if one is present + pub fn get_tunnel(&self, addr: Ipv4Addr) -> Option { + let tunnels = &*self.tunnels.lock(); + tunnels.get(&addr).cloned() + } + + pub fn set_tunnel(&self, addr: Ipv4Addr, tunnel: TunnelHandle) { + let tunnels = &mut *self.tunnels.lock(); + tunnels.insert(addr, tunnel); + } + + /// Sets the handle at the provided index within a pool to the provided handle + pub fn set_pool_handle(&self, game_id: GameID, index: usize, handle: TunnelHandle) { + // Assocate the handle with the game + { + self.mapping + .lock() + // Map the handle to its game + .insert(handle.id, game_id, index as PoolIndex); + } + + let pools = &mut *self.pools.lock(); + + // Get the existing pool or insert a new one + let pool = pools.entry(game_id).or_default(); + + if let Some(pool_handle) = pool.handles.get_mut(index) { + *pool_handle = Some(handle); + } + } +} -/// Represents a pool -pub struct TunnelPool { - pub handles: [Option; 4], +/// Represents a pool of tunnel ocnnections +#[derive(Default)] +struct TunnelPool { + /// Collection of client handles + handles: [Option; 4], } /// Handle for sending messages to a tunnel #[derive(Clone)] pub struct TunnelHandle { - id: u32, + /// The ID of the tunnel + id: TunnelId, + /// The sender for sending messages to the tunnel tx: mpsc::UnboundedSender, } +/// Represents a connection to a client tunnel pub struct Tunnel { - service: Arc, - id: u32, + /// ID for this tunnel + id: TunnelId, /// The IO tunnel used to send information to the host and recieve /// respones io: Framed, /// Reciever for messages that should be written to the tunnel rx: mpsc::UnboundedReceiver, + /// The service access + service: Arc, + /// Future state for writing to the `io` + write_state: TunnelWriteState, + /// Whether the future has been stopped + stop: bool, +} + +impl Drop for Tunnel { + fn drop(&mut self) { + // Remove the tunnel from the service + self.service.remove_by_tunnel(self.id); + } +} + +enum TunnelWriteState { + /// Recieve the message to write + Recv, + /// Wait for the stream to be writable + Write { + // The message to write + message: Option, + }, + // Poll flushing the tunnel + Flush, } impl Tunnel { - pub fn start(service: Arc, io: Framed) -> TunnelHandle { + pub fn start(service: Arc, io: Upgraded) -> TunnelHandle { let (tx, rx) = mpsc::unbounded_channel(); - + let io = Framed::new(io, TunnelCodec::default()); let id = TUNNEL_ID.fetch_add(1, std::sync::atomic::Ordering::AcqRel); - let tunnel = Tunnel { + + tokio::spawn(Tunnel { service, id, io, rx, - }; - - tokio::spawn(async move { - tunnel.handle().await; + write_state: TunnelWriteState::Recv, + stop: false, }); TunnelHandle { id, tx } } - pub async fn handle(mut self) { - loop { - select! { - message = self.io.next() => { - if let Some(Ok(mut message)) = message { - - let index =message.index as usize; - debug!("Message for {index}"); - if let Some(pool) = self.service.get_pool_for(self.id) { - let pool = &mut *pool.lock(); - let self_handle_index = pool.handles - .iter() - .flatten() - .position(|handle| handle.id == self.id) - .unwrap(); - if let Some(Some(handle)) = pool.handles.get(index) { - debug!("Sending message as {}", self_handle_index as u8); - message.index = self_handle_index as u8; - handle.tx.send(message).unwrap(); - } else { - debug!("Handle not found"); - } - } else { - debug!("Pool not found"); - } - } else { - debug!("Dropping tunnel"); - break; - } + fn poll_write_state(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match &mut self.write_state { + TunnelWriteState::Recv => { + // Try receive a packet from the write channel + let result = ready!(Pin::new(&mut self.rx).poll_recv(cx)); + + if let Some(message) = result { + self.write_state = TunnelWriteState::Write { + message: Some(message), + }; + } else { + // All writers have closed, session must be closed (Future end) + self.stop = true; } - - message = self.rx.recv() => { - if let Some(message) = message { - debug!("Outgoing message"); - self.io.send(message).await.unwrap(); - } + } + TunnelWriteState::Write { message } => { + // Wait until the inner is ready + if ready!(Pin::new(&mut self.io).poll_ready(cx)).is_ok() { + let message = message + .take() + .expect("Unexpected write state without message"); + + // Write the packet to the buffer + Pin::new(&mut self.io) + .start_send(message) + // Packet encoder impl shouldn't produce errors + .expect("Message encoder errored"); + + self.write_state = TunnelWriteState::Flush; + } else { + // Failed to ready, session must be closed + self.stop = true; + } + } + TunnelWriteState::Flush => { + // Wait until the flush is complete + if ready!(Pin::new(&mut self.io).poll_flush(cx)).is_ok() { + self.write_state = TunnelWriteState::Recv; + } else { + // Failed to flush, session must be closed + self.stop = true } } } - } -} -impl TunnelService { - pub fn get_pool_for(&self, tunnel_id: u32) -> Option>> { - let game_id = *self.mapping.lock().get(&tunnel_id)?; - self.pools.lock().get(&game_id).cloned() + Poll::Ready(()) } - /// Gets the tunnel for the provided IP address if one is present - pub fn get_tunnel(&self, addr: Ipv4Addr) -> Option { - let tunnels = &*self.tunnels.lock(); - tunnels.get(&addr).cloned() - } + fn poll_read_state(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Try receive a message from the `io` + let result = ready!(Pin::new(&mut self.io).poll_next(cx)); - pub fn set_tunnel(&self, addr: Ipv4Addr, tunnel: TunnelHandle) { - let tunnels = &mut *self.tunnels.lock(); - tunnels.insert(addr, tunnel); - } + if let Some(Ok(mut message)) = result { + // Get the tunnel sender details + let (game_id, index) = match self.service.get_by_tunnel(self.id) { + Some(value) => value, + None => return Poll::Ready(()), + }; - pub fn remove_tunnel(&self, addr: Ipv4Addr) { - let tunnels = &mut *self.tunnels.lock(); - tunnels.remove(&addr); + // Get the handle the message is for + let target_handle = match self.service.get_pool_handle(game_id, message.index) { + Some(value) => value, + None => return Poll::Ready(()), + }; + + // Update the message source index using the sender + message.index = index; + + // Send the message to the tunnel + _ = target_handle.tx.send(message); + } else { + // Reader has closed or reading encountered an error (Either way stop reading) + self.stop = true; + } + + Poll::Ready(()) } +} - /// Sets the handle at the provided index within a pool to the provided handle - pub fn set_pool_handle(&self, game_id: GameID, index: usize, handle: TunnelHandle) { - let pools = &mut *self.pools.lock(); +impl Future for Tunnel { + type Output = (); - // Get the existing pool or insert a new one - let pool = pools.entry(game_id).or_insert_with(|| { - let handles = [None, None, None, None]; - Arc::new(Mutex::new(TunnelPool { handles })) - }); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); - let pool = &mut *pool.lock(); - if let Some(pool_handle) = pool.handles.get_mut(index) { - // Assocate the handle with the game - { - let mapping = &mut *self.mapping.lock(); - mapping.insert(handle.id, game_id); - } + while this.poll_write_state(cx).is_ready() {} + while this.poll_read_state(cx).is_ready() {} - *pool_handle = Some(handle); + if this.stop { + Poll::Ready(()) } else { - debug!("Tried to set unknown pool handle"); + Poll::Pending } } } -/// Partially decoded tunnnel message -pub struct TunnelMessagePartial { - pub index: u8, - pub length: u32, -} +/// Encoding an decoding logic for tunnel packet messages +mod codec { + use bytes::{Buf, BufMut, Bytes}; + use tokio_util::codec::{Decoder, Encoder}; + + /// Partially decoded tunnnel message + pub struct TunnelMessagePartial { + /// Socket index to use + pub index: u8, + /// The length of the tunnel message bytes + pub length: u32, + } -/// Message sent through the tunnel -pub struct TunnelMessage { - /// Socket index to use - pub index: u8, + /// Message sent through the tunnel + pub struct TunnelMessage { + /// Socket index to use + pub index: u8, + /// The message contents + pub message: Bytes, + } - /// The message contents - pub message: Bytes, -} + /// Codec for encoding and decoding tunnel messages + #[derive(Default)] + pub struct TunnelCodec { + /// Stores a partially decoded frame if one is present + partial: Option, + } -#[derive(Default)] -pub struct TunnelCodec { - partial: Option, -} + impl Decoder for TunnelCodec { + type Item = TunnelMessage; + type Error = std::io::Error; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + let partial = match self.partial.as_mut() { + Some(value) => value, + None => { + // Not enough room for a partial frame + if src.len() < 5 { + return Ok(None); + } + let index = src.get_u8(); + let length = src.get_u32(); -impl Decoder for TunnelCodec { - type Item = TunnelMessage; - type Error = std::io::Error; - - fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - let partial = match self.partial.as_mut() { - Some(value) => value, - None => { - // Not enough room for a partial frame - if src.len() < 5 { - return Ok(None); + self.partial.insert(TunnelMessagePartial { index, length }) } - let index = src.get_u8(); - let length = src.get_u32(); - - self.partial.insert(TunnelMessagePartial { index, length }) + }; + // Not enough data for the partial frame + if src.len() < partial.length as usize { + return Ok(None); } - }; - // Not enough data for the partial frame - if src.len() < partial.length as usize { - return Ok(None); - } - let partial = self.partial.take().expect("Partial frame missing"); - let bytes = src.split_to(partial.length as usize); + let partial = self.partial.take().expect("Partial frame missing"); + let bytes = src.split_to(partial.length as usize); - Ok(Some(TunnelMessage { - index: partial.index, - message: bytes.freeze(), - })) + Ok(Some(TunnelMessage { + index: partial.index, + message: bytes.freeze(), + })) + } } -} -impl Encoder for TunnelCodec { - type Error = io::Error; - - fn encode( - &mut self, - item: TunnelMessage, - dst: &mut bytes::BytesMut, - ) -> Result<(), Self::Error> { - dst.put_u8(item.index); - dst.put_u32(item.message.len() as u32); - dst.extend_from_slice(&item.message); - Ok(()) + impl Encoder for TunnelCodec { + type Error = std::io::Error; + + fn encode( + &mut self, + item: TunnelMessage, + dst: &mut bytes::BytesMut, + ) -> Result<(), Self::Error> { + dst.put_u8(item.index); + dst.put_u32(item.message.len() as u32); + dst.extend_from_slice(&item.message); + Ok(()) + } } } From 75bf21eedaa0b1e1227a528cce09d601737490bd Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 13:41:27 +1300 Subject: [PATCH 05/19] Use int map for tunnel addr mapping (convert addr bytes to int), Improved tunnel docs and polling --- src/routes/server.rs | 2 +- src/services/game/manager.rs | 3 +- src/services/tunnel/mod.rs | 244 ++++++++++++++++++++--------------- 3 files changed, 142 insertions(+), 107 deletions(-) diff --git a/src/routes/server.rs b/src/routes/server.rs index a091a2e..9e6880f 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -147,7 +147,7 @@ pub async fn handle_upgrade_tunnel( }; let handle = Tunnel::start(tunnel_service.clone(), upgraded); - tunnel_service.set_tunnel(addr, handle); + tunnel_service.set_tunnel(addr.into(), handle); } /// GET /api/server/log diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 43062aa..160583e 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -149,8 +149,7 @@ impl GameManager { (game.id, slot) }; - let addr = session.addr; - let handle = self.tunnel_service.get_tunnel(addr); + let handle = self.tunnel_service.get_tunnel(session.addr.into()); if let Some(handle) = handle { self.tunnel_service.set_pool_handle(game_id, index, handle); }; diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 1e3304f..9ea40ef 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -1,53 +1,38 @@ -//! Clients connect through this service in order to form a connection -//! with a host player without the possible NAT restrictions that may -//! occur on stricter NATs -//! -//! -//! -//! -//! Client(s) -- Sends packets to -> Local host socket -//! -//! Local host socket -- Sends packet with index 0 --> Server -//! -//! Server -- Forwards packets to --> Host local pool -//! -//! Host local pool -- Sends packet pretending to be the other client --> Host -//! -//! Host -- Sends reply to --> Host local pool -//! -//! Host local pool -- Sends reply with index --> Server -//! -//! Server -- Forwards packets to index --> Client +//! Server side portion of the tunneling implementation //! +//! Details can be found on the GitHub issue: https://github.com/PocketRelay/Server/issues/64 use self::codec::{TunnelCodec, TunnelMessage}; -use crate::utils::hashing::IntHashMap; -use crate::utils::types::GameID; +use crate::utils::{hashing::IntHashMap, types::GameID}; use futures_util::{Sink, Stream}; use hashbrown::HashMap; use hyper::upgrade::Upgraded; use parking_lot::Mutex; -use std::future::Future; use std::{ - net::Ipv4Addr, + future::Future, pin::Pin, - sync::{atomic::AtomicU32, Arc}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, task::{ready, Context, Poll}, }; use tokio::sync::mpsc; use tokio_util::codec::Framed; -static TUNNEL_ID: AtomicU32 = AtomicU32::new(1); - -// ID for a tunnel +/// ID for a tunnel type TunnelId = u32; -// Index into a pool of tunnels +/// Index into a pool of tunnels type PoolIndex = u8; +/// Int created from an IPv4 address bytes +type Ipv4Int = u32; #[derive(Default)] pub struct TunnelService { + /// Stores the next available tunnel ID + next_tunnel_id: AtomicU32, /// Mapping between host addreses and access to their tunnel - tunnels: Mutex>, + tunnels: Mutex>, /// Tunnel pooling allocated for games pools: Mutex>, /// Mapping for which game a tunnel is connected to @@ -57,7 +42,7 @@ pub struct TunnelService { /// Stores mappings between tunnels and game slots and /// the inverse #[derive(Default)] -pub struct TunnelMapping { +struct TunnelMapping { /// Mapping from tunnel IDs to game slots tunnel_to_slot: IntHashMap, /// Mapping from game slots to tunnel IDs @@ -65,11 +50,13 @@ pub struct TunnelMapping { } impl TunnelMapping { + /// Inserts mappings for the provided `tunnel_id`, `game_id` and `pool_index` pub fn insert(&mut self, tunnel_id: TunnelId, game_id: GameID, pool_index: PoolIndex) { self.tunnel_to_slot.insert(tunnel_id, (game_id, pool_index)); self.slot_to_tunnel.insert((game_id, pool_index), tunnel_id); } + /// Removes a mapping using a `pool_index` within a `game_id` pub fn remove_by_slot(&mut self, game_id: GameID, pool_index: PoolIndex) -> Option { if let Some(tunnel_id) = self.slot_to_tunnel.remove(&(game_id, pool_index)) { self.tunnel_to_slot.remove(&tunnel_id); @@ -80,6 +67,7 @@ impl TunnelMapping { } } + /// Removes a mapping using the `tunnel_id` pub fn remove_by_tunnel(&mut self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { if let Some(key) = self.tunnel_to_slot.remove(&tunnel_id) { self.slot_to_tunnel.remove(&key); @@ -89,6 +77,7 @@ impl TunnelMapping { } } + /// Gets a tunnel by its `tunnel_id` pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { self.tunnel_to_slot // Find the mapping for the tunnel @@ -99,17 +88,21 @@ impl TunnelMapping { } impl TunnelService { - // Removes a game from the pool + /// Removes a game from the pool using its [`GameID`] #[inline] pub fn remove_pool(&self, pool: GameID) { self.pools.lock().remove(&pool); } + /// Finds the [`GameID`] and [`PoolIndex`] that are associated to + /// the provided [`TunnelId`] if one is present #[inline] pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { self.mapping.lock().get_by_tunnel(tunnel_id) } + /// Removes a tunnel mapping and its handle from the game pool using the + /// [`GameID`] and the [`PoolIndex`] for the mapping pub fn remove_by_slot(&self, game_id: GameID, pool_index: PoolIndex) { self.mapping.lock().remove_by_slot(game_id, pool_index); @@ -121,7 +114,8 @@ impl TunnelService { } } } - + /// Removes a tunnel mapping and its handle from the game pool using the + /// [`TunnelId`] for the mapping pub fn remove_by_tunnel(&self, tunnel_id: TunnelId) { if let Some((game_id, pool_index)) = self.mapping.lock().remove_by_tunnel(tunnel_id) { // Remove the handle from its associated pool @@ -134,6 +128,8 @@ impl TunnelService { } } + /// Gets the [`TunnelHandle`] for the [`PoolIndex`] within the pool for [`GameID`] + /// if there is a [`TunnelHandle`] present at the provided index pub fn get_pool_handle(&self, pool: GameID, index: PoolIndex) -> Option { // Access the pools map let pools = &*self.pools.lock(); @@ -144,17 +140,20 @@ impl TunnelService { } /// Gets the tunnel for the provided IP address if one is present - pub fn get_tunnel(&self, addr: Ipv4Addr) -> Option { - let tunnels = &*self.tunnels.lock(); - tunnels.get(&addr).cloned() + pub fn get_tunnel(&self, addr: Ipv4Int) -> Option { + self.tunnels.lock().get(&addr).cloned() } - pub fn set_tunnel(&self, addr: Ipv4Addr, tunnel: TunnelHandle) { - let tunnels = &mut *self.tunnels.lock(); - tunnels.insert(addr, tunnel); + /// Sets the [`TunnelHandle`] for a specific [`Ipv4Addr`] updates + /// existing tunnel mappings if they are present + pub fn set_tunnel(&self, addr: Ipv4Int, tunnel: TunnelHandle) { + self.tunnels.lock().insert(addr, tunnel); } - /// Sets the handle at the provided index within a pool to the provided handle + /// Associates the provided `handle` with the `index` inside the provided + /// `game_id` poool + /// + /// Creates a mapping and stores the pool handle pub fn set_pool_handle(&self, game_id: GameID, index: usize, handle: TunnelHandle) { // Assocate the handle with the game { @@ -175,7 +174,7 @@ impl TunnelService { } } -/// Represents a pool of tunnel ocnnections +/// Represents a pool of tunnel connections #[derive(Default)] struct TunnelPool { /// Collection of client handles @@ -204,8 +203,6 @@ pub struct Tunnel { service: Arc, /// Future state for writing to the `io` write_state: TunnelWriteState, - /// Whether the future has been stopped - stop: bool, } impl Drop for Tunnel { @@ -215,53 +212,75 @@ impl Drop for Tunnel { } } +/// Holds the state for the current writing progress for a [`Tunnel`] +#[derive(Default)] enum TunnelWriteState { - /// Recieve the message to write + /// Waiting for a message to come through the [`Tunnel::rx`] + #[default] Recv, - /// Wait for the stream to be writable - Write { - // The message to write - message: Option, - }, - // Poll flushing the tunnel + /// Waiting for the [`Tunnel::io`] to be writable, then writing the + /// contained [`TunnelMessage`] + Write(Option), + /// Poll flushing the bytes written to [`Tunnel::io`] Flush, + /// The tunnnel has stopped and should not continue + Stop, +} + +/// Holds the state for the current reading progress for a [`Tunnel`] +enum TunnelReadState { + /// Continue reading + Continue, + /// The tunnnel has stopped and should not continue + Stop, } impl Tunnel { + /// Starts a new tunnel on `io` using the tunnel `service` + /// + /// ## Arguments + /// * `service` - The server to add the tunnel to + /// * `io` - The underlying tunnel IO pub fn start(service: Arc, io: Upgraded) -> TunnelHandle { let (tx, rx) = mpsc::unbounded_channel(); + + // Wrap the `io` with the [`TunnelCodec`] for framing let io = Framed::new(io, TunnelCodec::default()); - let id = TUNNEL_ID.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + + // Aquire the tunnel ID + let id = service.next_tunnel_id.fetch_add(1, Ordering::AcqRel); tokio::spawn(Tunnel { service, id, io, rx, - write_state: TunnelWriteState::Recv, - stop: false, + write_state: Default::default(), }); TunnelHandle { id, tx } } - fn poll_write_state(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match &mut self.write_state { + /// Polls accepting messages from [`Tunnel::rx`] then writing them to [`Tunnel::io`] and + /// flushing the underlying stream. Provides the next [`TunnelWriteState`] + /// when [`Poll::Ready`] is returned + /// + /// Should be repeatedly called until it no-longer returns [`Poll::Ready`] + fn poll_write_state(&mut self, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match &mut self.write_state { TunnelWriteState::Recv => { // Try receive a packet from the write channel let result = ready!(Pin::new(&mut self.rx).poll_recv(cx)); if let Some(message) = result { - self.write_state = TunnelWriteState::Write { - message: Some(message), - }; + TunnelWriteState::Write(Some(message)) } else { - // All writers have closed, session must be closed (Future end) - self.stop = true; + // All writers have closed, tunnel must be closed (Future end) + TunnelWriteState::Stop } } - TunnelWriteState::Write { message } => { - // Wait until the inner is ready + TunnelWriteState::Write(message) => { + // Wait until the `io` is ready if ready!(Pin::new(&mut self.io).poll_ready(cx)).is_ok() { let message = message .take() @@ -273,54 +292,60 @@ impl Tunnel { // Packet encoder impl shouldn't produce errors .expect("Message encoder errored"); - self.write_state = TunnelWriteState::Flush; + TunnelWriteState::Flush } else { - // Failed to ready, session must be closed - self.stop = true; + // Failed to ready, tunnel must be closed + TunnelWriteState::Stop } } TunnelWriteState::Flush => { - // Wait until the flush is complete + // Poll flushing `io` if ready!(Pin::new(&mut self.io).poll_flush(cx)).is_ok() { - self.write_state = TunnelWriteState::Recv; + TunnelWriteState::Recv } else { - // Failed to flush, session must be closed - self.stop = true + // Failed to flush, tunnel must be closed + TunnelWriteState::Stop } } - } - Poll::Ready(()) + // Tunnel should *NOT* be polled if its already stopped + TunnelWriteState::Stop => panic!("Tunnel polled after already stopped"), + }) } - fn poll_read_state(&mut self, cx: &mut Context<'_>) -> Poll<()> { + /// Polls reading messages from [`Tunnel::io`] and sending them to the correct + /// handle within the [`Tunnel::pool`]. Provides the next [`TunnelReadState`] + /// when [`Poll::Ready`] is returned + /// + /// Should be repeatedly called until it no-longer returns [`Poll::Ready`] + fn poll_read_state(&mut self, cx: &mut Context<'_>) -> Poll { // Try receive a message from the `io` - let result = ready!(Pin::new(&mut self.io).poll_next(cx)); - - if let Some(Ok(mut message)) = result { - // Get the tunnel sender details - let (game_id, index) = match self.service.get_by_tunnel(self.id) { - Some(value) => value, - None => return Poll::Ready(()), - }; - - // Get the handle the message is for - let target_handle = match self.service.get_pool_handle(game_id, message.index) { - Some(value) => value, - None => return Poll::Ready(()), - }; - - // Update the message source index using the sender - message.index = index; - - // Send the message to the tunnel - _ = target_handle.tx.send(message); - } else { - // Reader has closed or reading encountered an error (Either way stop reading) - self.stop = true; - } - - Poll::Ready(()) + let Some(Ok(mut message)) = ready!(Pin::new(&mut self.io).poll_next(cx)) else { + // Cannot read next message stop the tunnel + return Poll::Ready(TunnelReadState::Stop); + }; + + // Get the tunnel sender details + let (game_id, index) = match self.service.get_by_tunnel(self.id) { + Some(value) => value, + // Don't have a tunnel to send the message through + None => return Poll::Ready(TunnelReadState::Continue), + }; + + // Get the handle the message is for + let target_handle = match self.service.get_pool_handle(game_id, message.index) { + Some(value) => value, + // Don't have an associated handle to send the message to + None => return Poll::Ready(TunnelReadState::Continue), + }; + + // Update the message source index using the sender + message.index = index; + + // Send the message to the tunnel + _ = target_handle.tx.send(message); + + Poll::Ready(TunnelReadState::Continue) } } @@ -330,14 +355,25 @@ impl Future for Tunnel { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while this.poll_write_state(cx).is_ready() {} - while this.poll_read_state(cx).is_ready() {} + // Poll the write half + while let Poll::Ready(next_state) = this.poll_write_state(cx) { + this.write_state = next_state; - if this.stop { - Poll::Ready(()) - } else { - Poll::Pending + // Tunnel has stopped + if let TunnelWriteState::Stop = this.write_state { + return Poll::Ready(()); + } } + + // Poll the read half + while let Poll::Ready(next_state) = this.poll_read_state(cx) { + // Tunnel has stopped + if let TunnelReadState::Stop = next_state { + return Poll::Ready(()); + } + } + + Poll::Pending } } From 5b7998e3e8e29a12da3933ec568095f84033b1f9 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 13:44:05 +1300 Subject: [PATCH 06/19] Don't apply tunneling to hosts that have Open NAT --- src/session/models/game_manager.rs | 38 +++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index 33c57e5..3adc305 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -12,7 +12,7 @@ use crate::{ utils::types::{GameID, PlayerID}, }; -use super::{util::PING_SITE_ALIAS, NetworkAddress}; +use super::{util::PING_SITE_ALIAS, NatType, NetworkAddress}; #[derive(Debug, Clone)] #[repr(u16)] @@ -800,22 +800,26 @@ impl TdfSerialize for GameSetupResponse<'_> { // Topology host network list (The heat bug is present so this encoded as a group even though its a union) w.tag_list_start(b"HNET", TdfType::Group, 1); - // Forced local host for test dedicated server - w.write_byte(3); - let v = super::PairAddress { - addr: Ipv4Addr::LOCALHOST, - port: 42132, - }; - TdfSerialize::serialize(&v, w); - - // if let NetworkAddress::AddressPair(pair) = &host.net.addr { - // w.write_byte(2 /* Address pair type */); - // TdfSerialize::serialize(pair, w) - // } else { - // // Uh oh.. host networking is missing...? - // w.write_byte(TAGGED_UNSET_KEY); - // w.write_byte(0); - // } + // Override to sever tunnel for stricter NATs + if !matches!(host.net.qos.natt, NatType::Open) { + // Forced local host for test dedicated server + w.write_byte(3); + let v = super::PairAddress { + addr: Ipv4Addr::LOCALHOST, + port: 42132, + }; + TdfSerialize::serialize(&v, w); + } else { + // Open NATs can directly have players connect normally + if let NetworkAddress::AddressPair(pair) = &host.net.addr { + w.write_byte(2 /* Address pair type */); + TdfSerialize::serialize(pair, w) + } else { + // Uh oh.. host networking is missing...? + w.write_byte(TAGGED_UNSET_KEY); + w.write_byte(0); + } + } } // Host session ID From dbd1f01c4aa51355b9a4996281397dd33f277d0e Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 13:45:39 +1300 Subject: [PATCH 07/19] Switched default NAT type to unknown, constant for tunnel host port --- src/services/tunnel/mod.rs | 3 +++ src/session/models/game_manager.rs | 17 +++++++++++------ src/session/models/mod.rs | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 9ea40ef..f555290 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -20,6 +20,9 @@ use std::{ use tokio::sync::mpsc; use tokio_util::codec::Framed; +/// The port bound on clients representing the host player within the socket poool +pub const TUNNEL_HOST_LOCAL_PORT: u16 = 42132; + /// ID for a tunnel type TunnelId = u32; /// Index into a pool of tunnels diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index 3adc305..c432781 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -8,7 +8,10 @@ use tdf::{ }; use crate::{ - services::game::{rules::RuleSet, AttrMap, Game, GamePlayer}, + services::{ + game::{rules::RuleSet, AttrMap, Game, GamePlayer}, + tunnel::TUNNEL_HOST_LOCAL_PORT, + }, utils::types::{GameID, PlayerID}, }; @@ -804,11 +807,13 @@ impl TdfSerialize for GameSetupResponse<'_> { if !matches!(host.net.qos.natt, NatType::Open) { // Forced local host for test dedicated server w.write_byte(3); - let v = super::PairAddress { - addr: Ipv4Addr::LOCALHOST, - port: 42132, - }; - TdfSerialize::serialize(&v, w); + TdfSerialize::serialize( + &super::PairAddress { + addr: Ipv4Addr::LOCALHOST, + port: TUNNEL_HOST_LOCAL_PORT, + }, + w, + ); } else { // Open NATs can directly have players connect normally if let NetworkAddress::AddressPair(pair) = &host.net.addr { diff --git a/src/session/models/mod.rs b/src/session/models/mod.rs index 3d4fd50..1f54dc8 100644 --- a/src/session/models/mod.rs +++ b/src/session/models/mod.rs @@ -139,7 +139,6 @@ pub struct QosNetworkData { #[repr(u8)] pub enum NatType { /// Players behind an open NAT can usually connect to any other player and are ideal game hosts. - #[default] Open = 0x0, /// Players behind a moderate NAT can usually connect to other open or moderate players. Moderate = 0x1, @@ -148,6 +147,7 @@ pub enum NatType { /// Players behind a strict (unsequential) NAT can usually only connect to open players and are the worst game hosts. Strict = 0x3, /// unknown NAT type; possibly timed out trying to detect NAT. + #[default] #[tdf(default)] Unknown = 0x4, } From 8da5e769d1a18de81d08bcdba2e87ac8fd5eb57b Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 13:58:13 +1300 Subject: [PATCH 08/19] Added configuration for tunneling --- src/config.rs | 24 +++++++++++++++++++++++- src/main.rs | 5 +++-- src/services/game/manager.rs | 9 +++++++-- src/services/game/mod.rs | 9 ++++++++- src/session/models/game_manager.rs | 24 ++++++++++++++++++++---- 5 files changed, 61 insertions(+), 10 deletions(-) diff --git a/src/config.rs b/src/config.rs index 016eb4b..39f3fc1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,8 @@ use crate::session::models::Port; /// The server version extracted from the Cargo.toml pub const VERSION: &str = env!("CARGO_PKG_VERSION"); +/// Config variables that are required to always exist during +/// runtime for various tasks #[derive(Default)] pub struct RuntimeConfig { pub qos: QosServerConfig, @@ -19,6 +21,7 @@ pub struct RuntimeConfig { pub galaxy_at_war: GalaxyAtWarConfig, pub menu_message: String, pub dashboard: DashboardConfig, + pub tunnel: TunnelConfig, } /// Environment variable key to load the config from @@ -74,6 +77,7 @@ pub struct Config { pub galaxy_at_war: GalaxyAtWarConfig, pub logging: LevelFilter, pub retriever: RetrieverConfig, + pub tunnel: TunnelConfig, } impl Default for Config { @@ -88,16 +92,34 @@ impl Default for Config { galaxy_at_war: Default::default(), logging: LevelFilter::Info, retriever: Default::default(), + tunnel: Default::default() } } } +/// Configuration for how the server should use tunneling +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TunnelConfig { + /// Only tunnel players with non "Open" NAT types if the QoS + /// server is set to [`QosServerConfig::Disabled`] this is + /// equivilent to [`TunnelConfig::Always`] + #[default] + Stricter, + /// Always tunnel connections through the server regardless + /// of NAT type + Always, + /// Never tunnel connections through the server + Disabled, +} + +/// Configuration for the server QoS setup #[derive(Debug, Default, Deserialize)] #[serde(tag = "type", rename_all = "lowercase")] pub enum QosServerConfig { /// Use the official QoS server Official, - /// Use the local QoS server (might cause issues) + /// Use the local QoS server (might cause issues with WAN) #[default] Local, /// Use a custom QoS server diff --git a/src/main.rs b/src/main.rs index e4685da..8b37c75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,6 +44,7 @@ async fn main() { menu_message: config.menu_message, dashboard: config.dashboard, qos: config.qos, + tunnel: config.tunnel, }; debug!("QoS server: {:?}", &runtime_config.qos); @@ -57,10 +58,10 @@ async fn main() { SigningKey::global() ); + let config = Arc::new(runtime_config); let tunnel_service = Arc::new(TunnelService::default()); - let game_manager = Arc::new(GameManager::new(tunnel_service.clone())); + let game_manager = Arc::new(GameManager::new(tunnel_service.clone(), config.clone())); let sessions = Arc::new(Sessions::new(signing_key)); - let config = Arc::new(runtime_config); let retriever = Arc::new(retriever); // Initialize session router diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 160583e..1c339f0 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -1,5 +1,6 @@ use super::{rules::RuleSet, AttrMap, Game, GameJoinableState, GamePlayer, GameRef, GameSnapshot}; use crate::{ + config::RuntimeConfig, services::tunnel::TunnelService, session::{ models::game_manager::{ @@ -38,7 +39,10 @@ pub struct GameManager { next_id: AtomicU32, /// Matchmaking entry queue queue: Mutex>, + /// Tunneling service tunnel_service: Arc, + /// Runtime configuration + config: Arc, } /// Entry into the matchmaking queue @@ -58,12 +62,13 @@ impl GameManager { const MAX_RELEASE_ATTEMPTS: u8 = 20; /// Starts a new game manager service returning its link - pub fn new(tunnel_service: Arc) -> Self { + pub fn new(tunnel_service: Arc, config: Arc) -> Self { Self { games: Default::default(), next_id: AtomicU32::new(1), queue: Default::default(), tunnel_service, + config, } } @@ -145,7 +150,7 @@ impl GameManager { // Add the player to the game let (game_id, index) = { let game = &mut *game_ref.write().await; - let slot = game.add_player(player, context); + let slot = game.add_player(player, context, self.config.clone()); (game.id, slot) }; diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index 5f83f71..175c5ab 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -1,5 +1,6 @@ use self::{manager::GameManager, rules::RuleSet}; use crate::{ + config::RuntimeConfig, database::entities::Player, session::{ models::{ @@ -233,7 +234,12 @@ impl Game { data.into() } - pub fn add_player(&mut self, player: GamePlayer, context: GameSetupContext) -> usize { + pub fn add_player( + &mut self, + player: GamePlayer, + context: GameSetupContext, + config: Arc, + ) -> usize { let slot = self.players.len(); // Update other players with the client details @@ -265,6 +271,7 @@ impl Game { GameSetupResponse { game: self, context, + config, }, )); diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index c432781..1ae65c6 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -1,4 +1,4 @@ -use std::net::Ipv4Addr; +use std::{net::Ipv4Addr, sync::Arc}; use bitflags::bitflags; use serde::Serialize; @@ -8,6 +8,7 @@ use tdf::{ }; use crate::{ + config::{RuntimeConfig, TunnelConfig}, services::{ game::{rules::RuleSet, AttrMap, Game, GamePlayer}, tunnel::TUNNEL_HOST_LOCAL_PORT, @@ -764,6 +765,7 @@ pub enum SlotType { pub struct GameSetupResponse<'a> { pub game: &'a Game, pub context: GameSetupContext, + pub config: Arc, } impl TdfSerialize for GameSetupResponse<'_> { @@ -799,12 +801,19 @@ impl TdfSerialize for GameSetupResponse<'_> { // Game Type used for game reporting as passed up in the request. w.tag_str_empty(b"GTYP"); + // Whether to tunnel the connection + let tunnel = match &self.config.tunnel { + TunnelConfig::Stricter => !matches!(host.net.qos.natt, NatType::Open), + TunnelConfig::Always => true, + TunnelConfig::Disabled => false, + }; + { // Topology host network list (The heat bug is present so this encoded as a group even though its a union) w.tag_list_start(b"HNET", TdfType::Group, 1); - // Override to sever tunnel for stricter NATs - if !matches!(host.net.qos.natt, NatType::Open) { + // Override for tunneling + if tunnel { // Forced local host for test dedicated server w.write_byte(3); TdfSerialize::serialize( @@ -841,7 +850,14 @@ impl TdfSerialize for GameSetupResponse<'_> { w.tag_bool(b"NRES", false); // Game network topology - w.tag_alt(b"NTOP", GameNetworkTopology::Dedicated); + w.tag_alt( + b"NTOP", + if tunnel { + GameNetworkTopology::Dedicated + } else { + GameNetworkTopology::PeerHosted + }, + ); // Persisted Game id for the game, used only when game setting's enablePersistedGameIds is true. w.tag_str_empty(b"PGID"); From 50c5dfc65e6623c5366eba768c4c5e24bb7264af Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 19:44:11 +1300 Subject: [PATCH 09/19] Swapped read heavy tunnel mapping mutexes with read/write locks --- src/services/tunnel/mod.rs | 51 ++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index f555290..82cf987 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -7,7 +7,7 @@ use crate::utils::{hashing::IntHashMap, types::GameID}; use futures_util::{Sink, Stream}; use hashbrown::HashMap; use hyper::upgrade::Upgraded; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::{ future::Future, pin::Pin, @@ -37,9 +37,9 @@ pub struct TunnelService { /// Mapping between host addreses and access to their tunnel tunnels: Mutex>, /// Tunnel pooling allocated for games - pools: Mutex>, + pools: RwLock>, /// Mapping for which game a tunnel is connected to - mapping: Mutex, + mapping: RwLock, } /// Stores mappings between tunnels and game slots and @@ -91,26 +91,37 @@ impl TunnelMapping { } impl TunnelService { + /// Gets the tunnel for the provided IP address if one is present + pub fn get_tunnel(&self, addr: Ipv4Int) -> Option { + self.tunnels.lock().get(&addr).cloned() + } + + /// Sets the [`TunnelHandle`] for a specific [`Ipv4Addr`] updates + /// existing tunnel mappings if they are present + pub fn set_tunnel(&self, addr: Ipv4Int, tunnel: TunnelHandle) { + self.tunnels.lock().insert(addr, tunnel); + } + /// Removes a game from the pool using its [`GameID`] #[inline] pub fn remove_pool(&self, pool: GameID) { - self.pools.lock().remove(&pool); + self.pools.write().remove(&pool); } /// Finds the [`GameID`] and [`PoolIndex`] that are associated to /// the provided [`TunnelId`] if one is present #[inline] pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { - self.mapping.lock().get_by_tunnel(tunnel_id) + self.mapping.read().get_by_tunnel(tunnel_id) } /// Removes a tunnel mapping and its handle from the game pool using the /// [`GameID`] and the [`PoolIndex`] for the mapping pub fn remove_by_slot(&self, game_id: GameID, pool_index: PoolIndex) { - self.mapping.lock().remove_by_slot(game_id, pool_index); + self.mapping.write().remove_by_slot(game_id, pool_index); // Remove the handle from its associated pool - let pools = &mut *self.pools.lock(); + let pools = &mut *self.pools.write(); if let Some(pool) = pools.get_mut(&game_id) { if let Some(handle) = pool.handles.get_mut(pool_index as usize) { *handle = None; @@ -120,9 +131,9 @@ impl TunnelService { /// Removes a tunnel mapping and its handle from the game pool using the /// [`TunnelId`] for the mapping pub fn remove_by_tunnel(&self, tunnel_id: TunnelId) { - if let Some((game_id, pool_index)) = self.mapping.lock().remove_by_tunnel(tunnel_id) { + if let Some((game_id, pool_index)) = self.mapping.write().remove_by_tunnel(tunnel_id) { // Remove the handle from its associated pool - let pools = &mut *self.pools.lock(); + let pools = &mut *self.pools.write(); if let Some(pool) = pools.get_mut(&game_id) { if let Some(handle) = pool.handles.get_mut(pool_index as usize) { *handle = None; @@ -135,24 +146,13 @@ impl TunnelService { /// if there is a [`TunnelHandle`] present at the provided index pub fn get_pool_handle(&self, pool: GameID, index: PoolIndex) -> Option { // Access the pools map - let pools = &*self.pools.lock(); + let pools = &*self.pools.read(); // Ge the pool for the `pool` let pool = pools.get(&pool)?; // Get the handle pool.handles.get(index as usize)?.clone() } - /// Gets the tunnel for the provided IP address if one is present - pub fn get_tunnel(&self, addr: Ipv4Int) -> Option { - self.tunnels.lock().get(&addr).cloned() - } - - /// Sets the [`TunnelHandle`] for a specific [`Ipv4Addr`] updates - /// existing tunnel mappings if they are present - pub fn set_tunnel(&self, addr: Ipv4Int, tunnel: TunnelHandle) { - self.tunnels.lock().insert(addr, tunnel); - } - /// Associates the provided `handle` with the `index` inside the provided /// `game_id` poool /// @@ -161,12 +161,12 @@ impl TunnelService { // Assocate the handle with the game { self.mapping - .lock() + .write() // Map the handle to its game .insert(handle.id, game_id, index as PoolIndex); } - let pools = &mut *self.pools.lock(); + let pools = &mut *self.pools.write(); // Get the existing pool or insert a new one let pool = pools.entry(game_id).or_default(); @@ -282,6 +282,7 @@ impl Tunnel { TunnelWriteState::Stop } } + TunnelWriteState::Write(message) => { // Wait until the `io` is ready if ready!(Pin::new(&mut self.io).poll_ready(cx)).is_ok() { @@ -301,6 +302,7 @@ impl Tunnel { TunnelWriteState::Stop } } + TunnelWriteState::Flush => { // Poll flushing `io` if ready!(Pin::new(&mut self.io).poll_flush(cx)).is_ok() { @@ -385,7 +387,7 @@ mod codec { use bytes::{Buf, BufMut, Bytes}; use tokio_util::codec::{Decoder, Encoder}; - /// Partially decoded tunnnel message + /// Partially decoded [TunnelMessage] pub struct TunnelMessagePartial { /// Socket index to use pub index: u8, @@ -426,6 +428,7 @@ mod codec { self.partial.insert(TunnelMessagePartial { index, length }) } }; + // Not enough data for the partial frame if src.len() < partial.length as usize { return Ok(None); From dc7783ff03675f76665ecb8b848eb48951917e2f Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 20:47:23 +1300 Subject: [PATCH 10/19] Cleaned up and optimized tunnel service internals, replaced pool structure with hash lookup --- src/routes/server.rs | 2 +- src/services/game/manager.rs | 6 +- src/services/game/mod.rs | 5 +- src/services/tunnel/mod.rs | 262 +++++++++++++++++------------------ 4 files changed, 132 insertions(+), 143 deletions(-) diff --git a/src/routes/server.rs b/src/routes/server.rs index 9e6880f..cd8aea6 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -147,7 +147,7 @@ pub async fn handle_upgrade_tunnel( }; let handle = Tunnel::start(tunnel_service.clone(), upgraded); - tunnel_service.set_tunnel(addr.into(), handle); + tunnel_service.associate_tunnel(addr.into(), handle); } /// GET /api/server/log diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 1c339f0..07db42a 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -154,10 +154,8 @@ impl GameManager { (game.id, slot) }; - let handle = self.tunnel_service.get_tunnel(session.addr.into()); - if let Some(handle) = handle { - self.tunnel_service.set_pool_handle(game_id, index, handle); - }; + self.tunnel_service + .associate_pool(session.addr.into(), game_id, index as u8); // Update the player current game session.set_game(game_id, Arc::downgrade(&game_ref)); diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index 175c5ab..300f99f 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -350,7 +350,7 @@ impl Game { }; // Remove the tunnel - self.tunnel_service.remove_by_slot(self.id, index as u8); + self.tunnel_service.dissocate_by_pool(self.id, index as u8); // Remove the player let player = self.players.remove(index); @@ -391,9 +391,6 @@ impl Game { // Mark the game as stopping self.state = GameState::Destructing; - // Remove the tunnel pool - self.tunnel_service.remove_pool(self.id); - if !self.players.is_empty() { warn!("Game {} was stopped with players still present", self.id); } diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 82cf987..a9f3233 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -5,9 +5,8 @@ use self::codec::{TunnelCodec, TunnelMessage}; use crate::utils::{hashing::IntHashMap, types::GameID}; use futures_util::{Sink, Stream}; -use hashbrown::HashMap; use hyper::upgrade::Upgraded; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use std::{ future::Future, pin::Pin, @@ -27,6 +26,9 @@ pub const TUNNEL_HOST_LOCAL_PORT: u16 = 42132; type TunnelId = u32; /// Index into a pool of tunnels type PoolIndex = u8; +/// ID of a pool +type PoolId = GameID; + /// Int created from an IPv4 address bytes type Ipv4Int = u32; @@ -34,156 +36,155 @@ type Ipv4Int = u32; pub struct TunnelService { /// Stores the next available tunnel ID next_tunnel_id: AtomicU32, - /// Mapping between host addreses and access to their tunnel - tunnels: Mutex>, - /// Tunnel pooling allocated for games - pools: RwLock>, - /// Mapping for which game a tunnel is connected to - mapping: RwLock, + /// Underlying tunnnel mappings + mappings: RwLock, } -/// Stores mappings between tunnels and game slots and -/// the inverse +/// Stores mappings between various tunnel objects #[derive(Default)] -struct TunnelMapping { - /// Mapping from tunnel IDs to game slots - tunnel_to_slot: IntHashMap, - /// Mapping from game slots to tunnel IDs - slot_to_tunnel: HashMap<(GameID, PoolIndex), TunnelId>, +struct TunnelMappings { + /// Mapping from [TunnelId]s to the actual [TunnelHandle] for communicating + /// with the tunnel + id_to_tunnel: IntHashMap, + /// Mapping from [Ipv4Int] (IPv4 addresses) to [TunnelHandle] for finding + /// the tunnel associated with an IP + ip_to_tunnel: IntHashMap, + + /// Mapping assocating a [TunnelId] with a [PoolIndex] within a [PoolId] + /// that it is apart of + tunnel_to_index: IntHashMap, + /// Inverse mapping of `tunnel_to_index` for finding the handle + /// assocated to a specific pool and slot + index_to_tunnel: IntHashMap, } -impl TunnelMapping { - /// Inserts mappings for the provided `tunnel_id`, `game_id` and `pool_index` - pub fn insert(&mut self, tunnel_id: TunnelId, game_id: GameID, pool_index: PoolIndex) { - self.tunnel_to_slot.insert(tunnel_id, (game_id, pool_index)); - self.slot_to_tunnel.insert((game_id, pool_index), tunnel_id); +/// Represents a key that is created from a [PoolId] and [PoolIndex] combined +/// into a single value. +/// +/// This allows it to be used as a key in the [IntHashMap] +#[derive(Hash, PartialEq, Eq)] +struct PoolKey(u64); + +impl PoolKey { + /// Creates a new pool key from its components + const fn new(pool_id: PoolId, pool_index: PoolIndex) -> Self { + Self(((pool_id as u64) << 32) | pool_index as u64) } +} - /// Removes a mapping using a `pool_index` within a `game_id` - pub fn remove_by_slot(&mut self, game_id: GameID, pool_index: PoolIndex) -> Option { - if let Some(tunnel_id) = self.slot_to_tunnel.remove(&(game_id, pool_index)) { - self.tunnel_to_slot.remove(&tunnel_id); - - Some(tunnel_id) - } else { - None - } +impl TunnelMappings { + /// Assocates the provided `tunnel` to the provided `address` + /// + /// Creates a mapping for the [TunnelId] to [TunnelHandle] along + /// with [Ipv4Int] to [TunnelHandle] + fn associate_tunnel(&mut self, address: Ipv4Int, tunnel: TunnelHandle) { + let tunnel_id = tunnel.id; + self.id_to_tunnel.insert(tunnel_id, tunnel); + self.ip_to_tunnel.insert(address, tunnel_id); } - /// Removes a mapping using the `tunnel_id` - pub fn remove_by_tunnel(&mut self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { - if let Some(key) = self.tunnel_to_slot.remove(&tunnel_id) { - self.slot_to_tunnel.remove(&key); - Some(key) - } else { - None - } + /// Attempts to associate the tunnel from `address` to the provided + /// `pool_id` and `pool_index` if there is a tunnel connected to + /// `address` + fn associate_pool(&mut self, address: Ipv4Int, pool_id: PoolId, pool_index: PoolIndex) { + let tunnel_id = match self.ip_to_tunnel.get(&address) { + Some(value) => *value, + None => return, + }; + + self.tunnel_to_index + .insert(tunnel_id, (pool_id, pool_index)); + self.index_to_tunnel + .insert(PoolKey::new(pool_id, pool_index), tunnel_id); } - /// Gets a tunnel by its `tunnel_id` - pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { - self.tunnel_to_slot - // Find the mapping for the tunnel - .get(&tunnel_id) - // Take a copy of the values if present - .copied() + /// Uses the lookup maps to find the [TunnelHandle] at the provided `pool_index` + /// within the current pool of the provided `tunnel_id` if it is apart of a pool + fn get_tunnel_route( + &self, + tunnel_id: TunnelId, + pool_index: PoolIndex, + ) -> Option<(TunnelHandle, PoolIndex)> { + let (game_id, self_index) = *self.tunnel_to_index.get(&tunnel_id)?; + let other_tunnel = self + .index_to_tunnel + .get(&PoolKey::new(game_id, pool_index))?; + let tunnel = self.id_to_tunnel.get(other_tunnel)?; + + Some((tunnel.clone(), self_index)) } -} -impl TunnelService { - /// Gets the tunnel for the provided IP address if one is present - pub fn get_tunnel(&self, addr: Ipv4Int) -> Option { - self.tunnels.lock().get(&addr).cloned() + /// Removes the association between the `tunnel_id` and any games + /// + /// Returns the [PoolId] and [PoolIndex] of the pool if the tunnel + /// was present in one + fn dissociate_by_tunnel(&mut self, tunnel_id: TunnelId) -> Option<(PoolId, PoolIndex)> { + let (pool_id, pool_index) = self.tunnel_to_index.remove(&tunnel_id)?; + + // Remove the inverse relationship + self.index_to_tunnel + .remove(&PoolKey::new(pool_id, pool_index)); + + Some((pool_id, pool_index)) } - /// Sets the [`TunnelHandle`] for a specific [`Ipv4Addr`] updates - /// existing tunnel mappings if they are present - pub fn set_tunnel(&self, addr: Ipv4Int, tunnel: TunnelHandle) { - self.tunnels.lock().insert(addr, tunnel); + /// Removes the association between a [PoolKey] and a [TunnelId] if + /// one is present + /// + /// Returns the [TunnelId] if one was present at the [PoolIndex] with the [PoolId] + fn dissocate_by_pool(&mut self, pool_id: PoolId, pool_index: PoolIndex) -> Option { + let tunnel_id = self + .index_to_tunnel + .remove(&PoolKey::new(pool_id, pool_index))?; + self.tunnel_to_index.remove(&tunnel_id); + + Some(tunnel_id) } +} - /// Removes a game from the pool using its [`GameID`] +impl TunnelService { + /// Wrapper around [`TunnelMappings::associate_tunnel`] that holds the service + /// write lock before operating #[inline] - pub fn remove_pool(&self, pool: GameID) { - self.pools.write().remove(&pool); + pub fn associate_tunnel(&self, address: Ipv4Int, tunnel: TunnelHandle) { + self.mappings.write().associate_tunnel(address, tunnel) } - /// Finds the [`GameID`] and [`PoolIndex`] that are associated to - /// the provided [`TunnelId`] if one is present + /// Wrapper around [`TunnelMappings::associate_pool`] that holds the service + /// write lock before operating #[inline] - pub fn get_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(GameID, PoolIndex)> { - self.mapping.read().get_by_tunnel(tunnel_id) + pub fn associate_pool(&self, address: Ipv4Int, pool_id: PoolId, pool_index: PoolIndex) { + self.mappings + .write() + .associate_pool(address, pool_id, pool_index) } - /// Removes a tunnel mapping and its handle from the game pool using the - /// [`GameID`] and the [`PoolIndex`] for the mapping - pub fn remove_by_slot(&self, game_id: GameID, pool_index: PoolIndex) { - self.mapping.write().remove_by_slot(game_id, pool_index); - - // Remove the handle from its associated pool - let pools = &mut *self.pools.write(); - if let Some(pool) = pools.get_mut(&game_id) { - if let Some(handle) = pool.handles.get_mut(pool_index as usize) { - *handle = None; - } - } - } - /// Removes a tunnel mapping and its handle from the game pool using the - /// [`TunnelId`] for the mapping - pub fn remove_by_tunnel(&self, tunnel_id: TunnelId) { - if let Some((game_id, pool_index)) = self.mapping.write().remove_by_tunnel(tunnel_id) { - // Remove the handle from its associated pool - let pools = &mut *self.pools.write(); - if let Some(pool) = pools.get_mut(&game_id) { - if let Some(handle) = pool.handles.get_mut(pool_index as usize) { - *handle = None; - } - } - } + /// Wrapper around [`TunnelMappings::get_tunnel_route`] that holds the service + /// read lock before operating + #[inline] + pub fn get_tunnel_route( + &self, + tunnel_id: TunnelId, + pool_index: PoolIndex, + ) -> Option<(TunnelHandle, PoolIndex)> { + self.mappings.read().get_tunnel_route(tunnel_id, pool_index) } - /// Gets the [`TunnelHandle`] for the [`PoolIndex`] within the pool for [`GameID`] - /// if there is a [`TunnelHandle`] present at the provided index - pub fn get_pool_handle(&self, pool: GameID, index: PoolIndex) -> Option { - // Access the pools map - let pools = &*self.pools.read(); - // Ge the pool for the `pool` - let pool = pools.get(&pool)?; - // Get the handle - pool.handles.get(index as usize)?.clone() + /// Wrapper around [`TunnelMappings::dissociate_by_tunnel`] that holds the service + /// write lock before operating + #[inline] + pub fn dissociate_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(PoolId, PoolIndex)> { + self.mappings.write().dissociate_by_tunnel(tunnel_id) } - /// Associates the provided `handle` with the `index` inside the provided - /// `game_id` poool - /// - /// Creates a mapping and stores the pool handle - pub fn set_pool_handle(&self, game_id: GameID, index: usize, handle: TunnelHandle) { - // Assocate the handle with the game - { - self.mapping - .write() - // Map the handle to its game - .insert(handle.id, game_id, index as PoolIndex); - } - - let pools = &mut *self.pools.write(); - - // Get the existing pool or insert a new one - let pool = pools.entry(game_id).or_default(); - - if let Some(pool_handle) = pool.handles.get_mut(index) { - *pool_handle = Some(handle); - } + /// Wrapper around [`TunnelMappings::dissocate_by_pool`] that holds the service + /// write lock before operating + #[inline] + pub fn dissocate_by_pool(&self, pool_id: PoolId, pool_index: PoolIndex) -> Option { + self.mappings.write().dissocate_by_pool(pool_id, pool_index) } } -/// Represents a pool of tunnel connections -#[derive(Default)] -struct TunnelPool { - /// Collection of client handles - handles: [Option; 4], -} - /// Handle for sending messages to a tunnel #[derive(Clone)] pub struct TunnelHandle { @@ -202,16 +203,16 @@ pub struct Tunnel { io: Framed, /// Reciever for messages that should be written to the tunnel rx: mpsc::UnboundedReceiver, - /// The service access - service: Arc, /// Future state for writing to the `io` write_state: TunnelWriteState, + /// The service access + service: Arc, } impl Drop for Tunnel { fn drop(&mut self) { // Remove the tunnel from the service - self.service.remove_by_tunnel(self.id); + self.service.dissociate_by_tunnel(self.id); } } @@ -330,20 +331,13 @@ impl Tunnel { return Poll::Ready(TunnelReadState::Stop); }; - // Get the tunnel sender details - let (game_id, index) = match self.service.get_by_tunnel(self.id) { + // Get the path through the tunnel + let (target_handle, index) = match self.service.get_tunnel_route(self.id, message.index) { Some(value) => value, // Don't have a tunnel to send the message through None => return Poll::Ready(TunnelReadState::Continue), }; - // Get the handle the message is for - let target_handle = match self.service.get_pool_handle(game_id, message.index) { - Some(value) => value, - // Don't have an associated handle to send the message to - None => return Poll::Ready(TunnelReadState::Continue), - }; - // Update the message source index using the sender message.index = index; From 14b4164c8784acf1d07c98e08b8aaf7bb64fbf9f Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Mon, 11 Dec 2023 21:20:58 +1300 Subject: [PATCH 11/19] Properly handle releasing inverse address relationship, use pool key for value aswell, improved dissociate, Removed id from handle --- src/routes/server.rs | 4 +- src/services/game/mod.rs | 2 +- src/services/tunnel/mod.rs | 115 ++++++++++++++++++++++--------------- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/src/routes/server.rs b/src/routes/server.rs index cd8aea6..a5c4605 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -146,8 +146,8 @@ pub async fn handle_upgrade_tunnel( } }; - let handle = Tunnel::start(tunnel_service.clone(), upgraded); - tunnel_service.associate_tunnel(addr.into(), handle); + let tunnel_id = Tunnel::start(tunnel_service.clone(), upgraded); + tunnel_service.associate_tunnel(addr.into(), tunnel_id); } /// GET /api/server/log diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index 300f99f..55b0500 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -350,7 +350,7 @@ impl Game { }; // Remove the tunnel - self.tunnel_service.dissocate_by_pool(self.id, index as u8); + self.tunnel_service.dissocate_pool(self.id, index as u8); // Remove the player let player = self.players.remove(index); diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index a9f3233..acfae59 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -46,13 +46,17 @@ struct TunnelMappings { /// Mapping from [TunnelId]s to the actual [TunnelHandle] for communicating /// with the tunnel id_to_tunnel: IntHashMap, + /// Mapping from [Ipv4Int] (IPv4 addresses) to [TunnelHandle] for finding /// the tunnel associated with an IP ip_to_tunnel: IntHashMap, + /// Inverse mapping of `ip_to_tunnel` for finding a IPv4 address + /// associated to a [TunnelId] + tunnel_to_ip: IntHashMap, /// Mapping assocating a [TunnelId] with a [PoolIndex] within a [PoolId] /// that it is apart of - tunnel_to_index: IntHashMap, + tunnel_to_index: IntHashMap, /// Inverse mapping of `tunnel_to_index` for finding the handle /// assocated to a specific pool and slot index_to_tunnel: IntHashMap, @@ -62,7 +66,7 @@ struct TunnelMappings { /// into a single value. /// /// This allows it to be used as a key in the [IntHashMap] -#[derive(Hash, PartialEq, Eq)] +#[derive(Hash, PartialEq, Eq, Clone, Copy)] struct PoolKey(u64); impl PoolKey { @@ -70,17 +74,28 @@ impl PoolKey { const fn new(pool_id: PoolId, pool_index: PoolIndex) -> Self { Self(((pool_id as u64) << 32) | pool_index as u64) } + + /// Converts the key into its underlying parts + const fn parts(&self) -> (PoolId, PoolIndex) { + ((self.0 >> 32) as PoolId, self.0 as PoolIndex) + } } impl TunnelMappings { - /// Assocates the provided `tunnel` to the provided `address` - /// - /// Creates a mapping for the [TunnelId] to [TunnelHandle] along - /// with [Ipv4Int] to [TunnelHandle] - fn associate_tunnel(&mut self, address: Ipv4Int, tunnel: TunnelHandle) { - let tunnel_id = tunnel.id; + // Inserts a new tunnel into the mappings + fn insert_tunnel(&mut self, tunnel_id: TunnelId, tunnel: TunnelHandle) { + // Insert the tunnel into the mappings self.id_to_tunnel.insert(tunnel_id, tunnel); + } + + /// Assocates the provided `address` to the provided `tunnel` + /// + /// Creates a mapping for the [Ipv4Int] to [TunnelHandle] along + /// with [TunnelHandle] to [Ipv4Int] + fn associate_tunnel(&mut self, address: Ipv4Int, tunnel_id: TunnelId) { + // Create the IP relationship self.ip_to_tunnel.insert(address, tunnel_id); + self.tunnel_to_ip.insert(tunnel_id, address); } /// Attempts to associate the tunnel from `address` to the provided @@ -92,10 +107,10 @@ impl TunnelMappings { None => return, }; - self.tunnel_to_index - .insert(tunnel_id, (pool_id, pool_index)); - self.index_to_tunnel - .insert(PoolKey::new(pool_id, pool_index), tunnel_id); + let key = PoolKey::new(pool_id, pool_index); + + self.tunnel_to_index.insert(tunnel_id, key); + self.index_to_tunnel.insert(key, tunnel_id); } /// Uses the lookup maps to find the [TunnelHandle] at the provided `pool_index` @@ -105,7 +120,7 @@ impl TunnelMappings { tunnel_id: TunnelId, pool_index: PoolIndex, ) -> Option<(TunnelHandle, PoolIndex)> { - let (game_id, self_index) = *self.tunnel_to_index.get(&tunnel_id)?; + let (game_id, self_index) = self.tunnel_to_index.get(&tunnel_id)?.parts(); let other_tunnel = self .index_to_tunnel .get(&PoolKey::new(game_id, pool_index))?; @@ -114,31 +129,36 @@ impl TunnelMappings { Some((tunnel.clone(), self_index)) } - /// Removes the association between the `tunnel_id` and any games + /// Removes the association between the `tunnel_id` and any games and + /// removes the tunnel itself. /// - /// Returns the [PoolId] and [PoolIndex] of the pool if the tunnel - /// was present in one - fn dissociate_by_tunnel(&mut self, tunnel_id: TunnelId) -> Option<(PoolId, PoolIndex)> { - let (pool_id, pool_index) = self.tunnel_to_index.remove(&tunnel_id)?; - - // Remove the inverse relationship - self.index_to_tunnel - .remove(&PoolKey::new(pool_id, pool_index)); + /// Used when a tunnel disconnects to remove any associations + /// related to the tunnel + fn dissociate_tunnel(&mut self, tunnel_id: TunnelId) { + // Remove tunnel itself + _ = self.id_to_tunnel.remove(&tunnel_id); + + // Remove the IP association + if let Some(ip) = self.tunnel_to_ip.remove(&tunnel_id) { + self.ip_to_tunnel.remove(&ip); + } - Some((pool_id, pool_index)) + // Remove the slot association + if let Some(key) = self.tunnel_to_index.remove(&tunnel_id) { + // Remove the inverse relationship + self.index_to_tunnel.remove(&key); + } } /// Removes the association between a [PoolKey] and a [TunnelId] if /// one is present - /// - /// Returns the [TunnelId] if one was present at the [PoolIndex] with the [PoolId] - fn dissocate_by_pool(&mut self, pool_id: PoolId, pool_index: PoolIndex) -> Option { - let tunnel_id = self + fn dissocate_pool(&mut self, pool_id: PoolId, pool_index: PoolIndex) { + if let Some(tunnel_id) = self .index_to_tunnel - .remove(&PoolKey::new(pool_id, pool_index))?; - self.tunnel_to_index.remove(&tunnel_id); - - Some(tunnel_id) + .remove(&PoolKey::new(pool_id, pool_index)) + { + self.tunnel_to_index.remove(&tunnel_id); + } } } @@ -146,8 +166,8 @@ impl TunnelService { /// Wrapper around [`TunnelMappings::associate_tunnel`] that holds the service /// write lock before operating #[inline] - pub fn associate_tunnel(&self, address: Ipv4Int, tunnel: TunnelHandle) { - self.mappings.write().associate_tunnel(address, tunnel) + pub fn associate_tunnel(&self, address: Ipv4Int, tunnel_id: TunnelId) { + self.mappings.write().associate_tunnel(address, tunnel_id) } /// Wrapper around [`TunnelMappings::associate_pool`] that holds the service @@ -170,26 +190,24 @@ impl TunnelService { self.mappings.read().get_tunnel_route(tunnel_id, pool_index) } - /// Wrapper around [`TunnelMappings::dissociate_by_tunnel`] that holds the service + /// Wrapper around [`TunnelMappings::dissociate_tunnel`] that holds the service /// write lock before operating #[inline] - pub fn dissociate_by_tunnel(&self, tunnel_id: TunnelId) -> Option<(PoolId, PoolIndex)> { - self.mappings.write().dissociate_by_tunnel(tunnel_id) + pub fn dissociate_tunnel(&self, tunnel_id: TunnelId) { + self.mappings.write().dissociate_tunnel(tunnel_id); } - /// Wrapper around [`TunnelMappings::dissocate_by_pool`] that holds the service + /// Wrapper around [`TunnelMappings::dissocate_pool`] that holds the service /// write lock before operating #[inline] - pub fn dissocate_by_pool(&self, pool_id: PoolId, pool_index: PoolIndex) -> Option { - self.mappings.write().dissocate_by_pool(pool_id, pool_index) + pub fn dissocate_pool(&self, pool_id: PoolId, pool_index: PoolIndex) { + self.mappings.write().dissocate_pool(pool_id, pool_index); } } /// Handle for sending messages to a tunnel #[derive(Clone)] pub struct TunnelHandle { - /// The ID of the tunnel - id: TunnelId, /// The sender for sending messages to the tunnel tx: mpsc::UnboundedSender, } @@ -212,7 +230,7 @@ pub struct Tunnel { impl Drop for Tunnel { fn drop(&mut self) { // Remove the tunnel from the service - self.service.dissociate_by_tunnel(self.id); + self.service.dissociate_tunnel(self.id); } } @@ -243,9 +261,9 @@ impl Tunnel { /// Starts a new tunnel on `io` using the tunnel `service` /// /// ## Arguments - /// * `service` - The server to add the tunnel to + /// * `service` - The service to add the tunnel to /// * `io` - The underlying tunnel IO - pub fn start(service: Arc, io: Upgraded) -> TunnelHandle { + pub fn start(service: Arc, io: Upgraded) -> TunnelId { let (tx, rx) = mpsc::unbounded_channel(); // Wrap the `io` with the [`TunnelCodec`] for framing @@ -254,6 +272,13 @@ impl Tunnel { // Aquire the tunnel ID let id = service.next_tunnel_id.fetch_add(1, Ordering::AcqRel); + // Store the tunnel mapping + service + .mappings + .write() + .insert_tunnel(id, TunnelHandle { tx }); + + // Spawn the tunnel task tokio::spawn(Tunnel { service, id, @@ -262,7 +287,7 @@ impl Tunnel { write_state: Default::default(), }); - TunnelHandle { id, tx } + id } /// Polls accepting messages from [`Tunnel::rx`] then writing them to [`Tunnel::io`] and From b89ab3749bdf9e6d48252fb881c1beba7e56298c Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Tue, 12 Dec 2023 12:03:41 +1300 Subject: [PATCH 12/19] Updated tunnel codec docs --- src/services/tunnel/mod.rs | 54 +++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index acfae59..4fac6b5 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -113,8 +113,11 @@ impl TunnelMappings { self.index_to_tunnel.insert(key, tunnel_id); } - /// Uses the lookup maps to find the [TunnelHandle] at the provided `pool_index` - /// within the current pool of the provided `tunnel_id` if it is apart of a pool + /// Uses the lookup maps to find the [TunnelHandle] of another tunnel within the same + /// pool as `tunnel_id` at the provided `pool_index`. + /// + /// Returns both the [TunnelHandle] at `pool_index` and the [PoolIndex] of the + /// provided `tunnel_id` fn get_tunnel_route( &self, tunnel_id: TunnelId, @@ -212,7 +215,7 @@ pub struct TunnelHandle { tx: mpsc::UnboundedSender, } -/// Represents a connection to a client tunnel +/// Tunnel connection to a client pub struct Tunnel { /// ID for this tunnel id: TunnelId, @@ -363,7 +366,7 @@ impl Tunnel { None => return Poll::Ready(TunnelReadState::Continue), }; - // Update the message source index using the sender + // Update the message target index to be from the correct index message.index = index; // Send the message to the tunnel @@ -401,17 +404,43 @@ impl Future for Tunnel { } } -/// Encoding an decoding logic for tunnel packet messages mod codec { + //! This modules contains the codec and message structures for [TunnelMessage]s + //! + //! # Tunnel Messages + //! + //! Tunnel message frames are as follows: + //! + //! ```norun + //! 0 1 2 3 + //! 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! | Index | Length | + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! | : + //! : Payload : + //! : | + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! ``` + //! + //! Tunnel message frames contain the following fields: + //! + //! Index: 8 bits. Determines the destination of the message within the current pool. + //! + //! Length: 32 bits. Determines the size in bytes of the payload that follows + //! + //! Payload: Variable length. The message bytes payload of `Length` + use bytes::{Buf, BufMut, Bytes}; use tokio_util::codec::{Decoder, Encoder}; - /// Partially decoded [TunnelMessage] - pub struct TunnelMessagePartial { + /// Header portion of a [TunnelMessage] that contains the + /// index of the message and the length of the expected payload + struct TunnelMessageHeader { /// Socket index to use - pub index: u8, + index: u8, /// The length of the tunnel message bytes - pub length: u32, + length: u32, } /// Message sent through the tunnel @@ -425,8 +454,9 @@ mod codec { /// Codec for encoding and decoding tunnel messages #[derive(Default)] pub struct TunnelCodec { - /// Stores a partially decoded frame if one is present - partial: Option, + /// Stores the current message header while its waiting + /// for the full payload to become available + partial: Option, } impl Decoder for TunnelCodec { @@ -444,7 +474,7 @@ mod codec { let index = src.get_u8(); let length = src.get_u32(); - self.partial.insert(TunnelMessagePartial { index, length }) + self.partial.insert(TunnelMessageHeader { index, length }) } }; From 1e64c261d02ccb19eddd2680f464c13fb26d29b3 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Tue, 12 Dec 2023 12:34:42 +1300 Subject: [PATCH 13/19] Reduced tunnel message length from 32bit to 16bit --- src/services/tunnel/mod.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 4fac6b5..1585007 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -412,22 +412,22 @@ mod codec { //! Tunnel message frames are as follows: //! //! ```norun - //! 0 1 2 3 - //! 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 - //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - //! | Index | Length | - //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - //! | : - //! : Payload : - //! : | - //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! 0 1 2 + //! 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! | Index | Length | + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + //! | : + //! : Payload : + //! : | + //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! ``` //! //! Tunnel message frames contain the following fields: //! - //! Index: 8 bits. Determines the destination of the message within the current pool. + //! Index: 8-bits. Determines the destination of the message within the current pool. //! - //! Length: 32 bits. Determines the size in bytes of the payload that follows + //! Length: 16-bits. Determines the size in bytes of the payload that follows //! //! Payload: Variable length. The message bytes payload of `Length` @@ -440,7 +440,7 @@ mod codec { /// Socket index to use index: u8, /// The length of the tunnel message bytes - length: u32, + length: u16, } /// Message sent through the tunnel @@ -472,7 +472,7 @@ mod codec { return Ok(None); } let index = src.get_u8(); - let length = src.get_u32(); + let length = src.get_u16(); self.partial.insert(TunnelMessageHeader { index, length }) } @@ -502,7 +502,7 @@ mod codec { dst: &mut bytes::BytesMut, ) -> Result<(), Self::Error> { dst.put_u8(item.index); - dst.put_u32(item.message.len() as u32); + dst.put_u16(item.message.len() as u16); dst.extend_from_slice(&item.message); Ok(()) } From be75c1c8fbc05e9305fc5fb200989a116c1746a5 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Wed, 13 Dec 2023 12:57:23 +1300 Subject: [PATCH 14/19] Handling for missing NLMP field when QoS is disabled, bump tdf version for new heat fixes --- Cargo.lock | 6 +----- Cargo.toml | 2 +- src/session/models/user_sessions.rs | 25 +++++++++++++++++++------ src/session/routes/user_sessions.rs | 6 +++++- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7506fb4..6c51083 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2438,9 +2438,7 @@ dependencies = [ [[package]] name = "tdf" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "140a16f6272972ab2718a1152f5844faa7ab23c31771e6bfe8271b1e59dc7884" +version = "0.3.0" dependencies = [ "serde", "tdf-derive", @@ -2449,8 +2447,6 @@ dependencies = [ [[package]] name = "tdf-derive" version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd55cd8859658cb77bea257d1964502f12068ba6f80c88eee00acb64122bab73" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index a1071d0..3a5da5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ hyper = "0.14.25" tower = "0.4" bitflags = { version = "2.3.1", features = ["serde"] } -tdf = { version = "0.1" } +tdf = { version = "0.3", path = "../../tdf" } bytes = "1.4.0" indoc = "2" diff --git a/src/session/models/user_sessions.rs b/src/session/models/user_sessions.rs index b297650..416acdc 100644 --- a/src/session/models/user_sessions.rs +++ b/src/session/models/user_sessions.rs @@ -7,7 +7,7 @@ use crate::{ }; use bitflags::bitflags; use serde::Serialize; -use tdf::{ObjectId, TdfDeserialize, TdfMap, TdfSerialize, TdfTyped}; +use tdf::{ObjectId, TdfDeserialize, TdfDeserializeOwned, TdfMap, TdfSerialize, TdfTyped}; use super::{util::PING_SITE_ALIAS, NetworkAddress, QosNetworkData}; @@ -27,19 +27,32 @@ pub struct ResumeSessionRequest { } /// Request to update the stored networking information for a session -#[derive(TdfDeserialize)] pub struct UpdateNetworkRequest { /// The client address net groups - #[tdf(tag = "ADDR")] pub address: NetworkAddress, /// Latency to the different ping sites - #[tdf(tag = "NLMP")] - pub ping_site_latency: TdfMap, + pub ping_site_latency: Option>, /// The client Quality of Service data - #[tdf(tag = "NQOS")] pub qos: QosNetworkData, } +// Contains optional field so must manually deserialize +impl TdfDeserializeOwned for UpdateNetworkRequest { + fn deserialize_owned( + r: &mut tdf::prelude::TdfDeserializer<'_>, + ) -> tdf::prelude::DecodeResult { + let address: NetworkAddress = r.tag(b"ADDR")?; + let ping_site_latency: Option> = r.try_tag(b"NLMP")?; + let qos: QosNetworkData = r.tag(b"NQOS")?; + + Ok(Self { + address, + ping_site_latency, + qos, + }) + } +} + /// Request to update the stored hardware flags for a session #[derive(TdfDeserialize)] pub struct UpdateHardwareFlagsRequest { diff --git a/src/session/routes/user_sessions.rs b/src/session/routes/user_sessions.rs index ff0a60e..a06ad80 100644 --- a/src/session/routes/user_sessions.rs +++ b/src/session/routes/user_sessions.rs @@ -156,7 +156,11 @@ pub async fn handle_update_network( } } - let ping_site_latency: Vec = ping_site_latency.values().copied().collect(); + let ping_site_latency: Vec = if let Some(ping_site_latency) = ping_site_latency { + ping_site_latency.values().copied().collect() + } else { + Vec::new() + }; session.set_network_info(address, qos, ping_site_latency); } From 8252b1f4c9835754bb3dda5c9e05f7b49e737546 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Thu, 14 Dec 2023 17:19:40 +1300 Subject: [PATCH 15/19] Pass config to game setup by reference instead of cloning --- src/services/game/manager.rs | 2 +- src/services/game/mod.rs | 2 +- src/session/models/game_manager.rs | 21 +++++++++------------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index 07db42a..d783bff 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -150,7 +150,7 @@ impl GameManager { // Add the player to the game let (game_id, index) = { let game = &mut *game_ref.write().await; - let slot = game.add_player(player, context, self.config.clone()); + let slot = game.add_player(player, context, &self.config); (game.id, slot) }; diff --git a/src/services/game/mod.rs b/src/services/game/mod.rs index 55b0500..03d04dd 100644 --- a/src/services/game/mod.rs +++ b/src/services/game/mod.rs @@ -238,7 +238,7 @@ impl Game { &mut self, player: GamePlayer, context: GameSetupContext, - config: Arc, + config: &RuntimeConfig, ) -> usize { let slot = self.players.len(); diff --git a/src/session/models/game_manager.rs b/src/session/models/game_manager.rs index 1ae65c6..c70afe8 100644 --- a/src/session/models/game_manager.rs +++ b/src/session/models/game_manager.rs @@ -1,12 +1,4 @@ -use std::{net::Ipv4Addr, sync::Arc}; - -use bitflags::bitflags; -use serde::Serialize; -use tdf::{ - types::tagged_union::TAGGED_UNSET_KEY, Blob, GroupSlice, TdfDeserialize, TdfDeserializeOwned, - TdfSerialize, TdfType, TdfTyped, -}; - +use super::{util::PING_SITE_ALIAS, NatType, NetworkAddress}; use crate::{ config::{RuntimeConfig, TunnelConfig}, services::{ @@ -15,8 +7,13 @@ use crate::{ }, utils::types::{GameID, PlayerID}, }; - -use super::{util::PING_SITE_ALIAS, NatType, NetworkAddress}; +use bitflags::bitflags; +use serde::Serialize; +use std::net::Ipv4Addr; +use tdf::{ + types::tagged_union::TAGGED_UNSET_KEY, Blob, GroupSlice, TdfDeserialize, TdfDeserializeOwned, + TdfSerialize, TdfType, TdfTyped, +}; #[derive(Debug, Clone)] #[repr(u16)] @@ -765,7 +762,7 @@ pub enum SlotType { pub struct GameSetupResponse<'a> { pub game: &'a Game, pub context: GameSetupContext, - pub config: Arc, + pub config: &'a RuntimeConfig, } impl TdfSerialize for GameSetupResponse<'_> { From 79ad07df2251ce5bbc01eb92670e30b3ba583955 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Fri, 22 Dec 2023 13:35:26 +1300 Subject: [PATCH 16/19] Association tokens implementation --- Cargo.lock | 12 +++++++ Cargo.toml | 2 ++ src/middleware/association.rs | 47 +++++++++++++++++++++++++++ src/middleware/auth.rs | 2 +- src/middleware/mod.rs | 2 ++ src/routes/server.rs | 44 +++++++++++++++++++------ src/services/game/manager.rs | 7 ++-- src/services/sessions/mod.rs | 45 ++++++++++++++++++++++++++ src/services/tunnel/mod.rs | 60 +++++++++++++++++++++-------------- src/session/mod.rs | 9 +++++- 10 files changed, 193 insertions(+), 37 deletions(-) create mode 100644 src/middleware/association.rs diff --git a/Cargo.lock b/Cargo.lock index 6c51083..b380a82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1526,6 +1526,7 @@ dependencies = [ "tokio", "tokio-util", "tower", + "uuid", ] [[package]] @@ -2730,6 +2731,17 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "uuid" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" +dependencies = [ + "getrandom", + "rand", + "serde", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 3a5da5d..1025642 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ hashbrown = { version = "0.14.3", default-features = false, features = [ "inline-more", ] } +uuid = { version = "^1", features = ["v4", "serde", "fast-rng"] } + # SeaORM [dependencies.sea-orm] version = "^0" diff --git a/src/middleware/association.rs b/src/middleware/association.rs new file mode 100644 index 0000000..3938872 --- /dev/null +++ b/src/middleware/association.rs @@ -0,0 +1,47 @@ +use crate::services::sessions::{AssociationId, Sessions}; +use axum::{extract::FromRequestParts, response::IntoResponse}; +use futures_util::future::BoxFuture; +use hyper::StatusCode; +use std::{future::ready, sync::Arc}; + +/// Extractor for retireving the association token from a request headers +pub struct Association(pub Option); + +/// The HTTP header that contains the association token +const TOKEN_HEADER: &str = "x-association"; + +impl FromRequestParts for Association { + type Rejection = InvalidAssociation; + + fn from_request_parts<'a, 'b, 'c>( + parts: &'a mut axum::http::request::Parts, + _state: &'b S, + ) -> BoxFuture<'c, Result> + where + 'a: 'c, + 'b: 'c, + Self: 'c, + { + let sessions = parts + .extensions + .get::>() + .expect("Sessions extension missing"); + + let assocation_id = parts + .headers + .get(TOKEN_HEADER) + .and_then(|value| value.to_str().ok()) + .and_then(|token| sessions.verify_assoc_token(token).ok()); + + Box::pin(ready(Ok(Self(assocation_id)))) + } +} + +/// Association token was invalid +pub struct InvalidAssociation; + +impl IntoResponse for InvalidAssociation { + fn into_response(self) -> axum::response::Response { + (StatusCode::BAD_REQUEST, "Invalid association token").into_response() + } +} diff --git a/src/middleware/auth.rs b/src/middleware/auth.rs index c221ba0..74f5302 100644 --- a/src/middleware/auth.rs +++ b/src/middleware/auth.rs @@ -65,7 +65,7 @@ impl FromRequestParts for Auth { let sessions = parts .extensions .get::>() - .expect("Database connection extension missing"); + .expect("Sessions extension missing"); // Extract the token from the headers and verify it as a player id let player_id = parts diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index d9732a2..eb5ece2 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -1,3 +1,5 @@ +/// Extractor for association tokens +pub mod association; /// Middleware functions an enums related to token authentication pub mod auth; /// Middleware functions related to CORS implementation diff --git a/src/routes/server.rs b/src/routes/server.rs index a5c4605..21516ec 100644 --- a/src/routes/server.rs +++ b/src/routes/server.rs @@ -4,9 +4,11 @@ use crate::{ config::{RuntimeConfig, VERSION}, database::entities::players::PlayerRole, - middleware::{auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade}, + middleware::{ + association::Association, auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade, + }, services::{ - sessions::Sessions, + sessions::{AssociationId, Sessions}, tunnel::{Tunnel, TunnelService}, }, session::{router::BlazeRouter, Session}, @@ -25,12 +27,17 @@ use tokio::fs::{read_to_string, OpenOptions}; /// Response detailing the information about this Pocket Relay server /// contains the version information as well as the server information +/// +/// As of v0.6.0 it also includes an association token for the client +/// to use in order to associate multiple connections #[derive(Serialize)] pub struct ServerDetails { /// Identifier used to ensure the server is a Pocket Relay server ident: &'static str, /// The server version version: &'static str, + /// Random association token for the client to use + association: String, } /// GET /api/server @@ -38,10 +45,12 @@ pub struct ServerDetails { /// Handles providing the server details. The Pocket Relay client tool /// uses this endpoint to validate that the provided host is a valid /// Pocket Relay server. -pub async fn server_details() -> Json { +pub async fn server_details(Extension(sessions): Extension>) -> Json { + let association = sessions.create_assoc_token(); Json(ServerDetails { ident: "POCKET_RELAY_SERVER", version: VERSION, + association, }) } @@ -72,12 +81,19 @@ pub async fn dashboard_details( /// as blaze sessions using HTTP Upgrade pub async fn upgrade( IpAddress(addr): IpAddress, + Association(association_id): Association, Extension(router): Extension>, Extension(sessions): Extension>, Upgrade(upgrade): Upgrade, ) -> Response { // Spawn the upgrading process to its own task - tokio::spawn(handle_upgrade(upgrade, addr, router, sessions)); + tokio::spawn(handle_upgrade( + upgrade, + addr, + association_id, + router, + sessions, + )); // Let the client know to upgrade its connection ( @@ -94,6 +110,7 @@ pub async fn upgrade( pub async fn handle_upgrade( upgrade: OnUpgrade, addr: Ipv4Addr, + association_id: Option, router: Arc, sessions: Arc, ) { @@ -105,7 +122,7 @@ pub async fn handle_upgrade( } }; - Session::start(upgraded, addr, router, sessions).await; + Session::start(upgraded, addr, association_id, router, sessions).await; } /// GET /api/server/tunnel @@ -114,12 +131,21 @@ pub async fn handle_upgrade( /// from HTTP over to the Blaze protocol for proxing the game traffic /// as blaze sessions using HTTP Upgrade pub async fn tunnel( - IpAddress(addr): IpAddress, + Association(association_id): Association, Extension(tunnel_service): Extension>, Upgrade(upgrade): Upgrade, ) -> Response { + // Handle missing token + let Some(association_id) = association_id else { + return (StatusCode::BAD_REQUEST, "Missing association token").into_response(); + }; + // Spawn the upgrading process to its own task - tokio::spawn(handle_upgrade_tunnel(upgrade, addr, tunnel_service)); + tokio::spawn(handle_upgrade_tunnel( + upgrade, + association_id, + tunnel_service, + )); // Let the client know to upgrade its connection ( @@ -135,7 +161,7 @@ pub async fn tunnel( /// from the connection pub async fn handle_upgrade_tunnel( upgrade: OnUpgrade, - addr: Ipv4Addr, + association: AssociationId, tunnel_service: Arc, ) { let upgraded = match upgrade.await { @@ -147,7 +173,7 @@ pub async fn handle_upgrade_tunnel( }; let tunnel_id = Tunnel::start(tunnel_service.clone(), upgraded); - tunnel_service.associate_tunnel(addr.into(), tunnel_id); + tunnel_service.associate_tunnel(association, tunnel_id); } /// GET /api/server/log diff --git a/src/services/game/manager.rs b/src/services/game/manager.rs index d783bff..81fdfbf 100644 --- a/src/services/game/manager.rs +++ b/src/services/game/manager.rs @@ -154,8 +154,11 @@ impl GameManager { (game.id, slot) }; - self.tunnel_service - .associate_pool(session.addr.into(), game_id, index as u8); + // Allocate tunnel if supported by client + if let Some(association) = session.association { + self.tunnel_service + .associate_pool(association, game_id, index as u8); + } // Update the player current game session.set_game(game_id, Arc::downgrade(&game_ref)); diff --git a/src/services/sessions/mod.rs b/src/services/sessions/mod.rs index 6f60d7e..da3004d 100644 --- a/src/services/sessions/mod.rs +++ b/src/services/sessions/mod.rs @@ -8,6 +8,7 @@ use crate::utils::types::PlayerID; use base64ct::{Base64UrlUnpadded, Encoding}; use parking_lot::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use uuid::Uuid; type SessionMap = IntHashMap; @@ -25,6 +26,11 @@ pub struct Sessions { key: SigningKey, } +/// Unique ID given to clients before connecting so that session +/// connections can be associated with network tunnels without +/// relying on IP addresses: https://github.com/PocketRelay/Server/issues/64#issuecomment-1867015578 +pub type AssociationId = Uuid; + impl Sessions { /// Expiry time for tokens const EXPIRY_TIME: Duration = Duration::from_secs(60 * 60 * 24 * 30 /* 30 Days */); @@ -37,6 +43,45 @@ impl Sessions { } } + /// Creates a new association token + pub fn create_assoc_token(&self) -> String { + let uuid = Uuid::new_v4(); + let data: &[u8; 16] = uuid.as_bytes(); + // Encode the message + let msg = Base64UrlUnpadded::encode_string(data); + + // Create a signature from the raw message bytes + let sig = self.key.sign(data); + let sig = Base64UrlUnpadded::encode_string(sig.as_ref()); + + // Join the message and signature to create the token + [msg, sig].join(".") + } + + /// Verifies an association token + pub fn verify_assoc_token(&self, token: &str) -> Result { + // Split the token parts + let (msg_raw, sig_raw) = match token.split_once('.') { + Some(value) => value, + None => return Err(VerifyError::Invalid), + }; + + // Decode the 16 byte token message + let mut msg = [0u8; 16]; + Base64UrlUnpadded::decode(msg_raw, &mut msg).map_err(|_| VerifyError::Invalid)?; + + // Decode 32byte signature (SHA256) + let mut sig = [0u8; 32]; + Base64UrlUnpadded::decode(sig_raw, &mut sig).map_err(|_| VerifyError::Invalid)?; + + // Verify the signature + if !self.key.verify(&msg, &sig) { + return Err(VerifyError::Invalid); + } + let uuid = *Uuid::from_bytes_ref(&msg); + Ok(uuid) + } + pub fn create_token(&self, player_id: PlayerID) -> String { // Compute expiry timestamp let exp = SystemTime::now() diff --git a/src/services/tunnel/mod.rs b/src/services/tunnel/mod.rs index 1585007..bdd11b6 100644 --- a/src/services/tunnel/mod.rs +++ b/src/services/tunnel/mod.rs @@ -8,6 +8,7 @@ use futures_util::{Sink, Stream}; use hyper::upgrade::Upgraded; use parking_lot::RwLock; use std::{ + collections::HashMap, future::Future, pin::Pin, sync::{ @@ -19,6 +20,8 @@ use std::{ use tokio::sync::mpsc; use tokio_util::codec::Framed; +use super::sessions::AssociationId; + /// The port bound on clients representing the host player within the socket poool pub const TUNNEL_HOST_LOCAL_PORT: u16 = 42132; @@ -29,9 +32,6 @@ type PoolIndex = u8; /// ID of a pool type PoolId = GameID; -/// Int created from an IPv4 address bytes -type Ipv4Int = u32; - #[derive(Default)] pub struct TunnelService { /// Stores the next available tunnel ID @@ -47,12 +47,11 @@ struct TunnelMappings { /// with the tunnel id_to_tunnel: IntHashMap, - /// Mapping from [Ipv4Int] (IPv4 addresses) to [TunnelHandle] for finding - /// the tunnel associated with an IP - ip_to_tunnel: IntHashMap, - /// Inverse mapping of `ip_to_tunnel` for finding a IPv4 address + /// Mapping from [AssociationId] (Client association) to [TunnelHandle] + association_to_tunnel: HashMap, + /// Inverse mapping of `association_to_tunnel` for finding a [AssociationId] /// associated to a [TunnelId] - tunnel_to_ip: IntHashMap, + tunnel_to_association: HashMap, /// Mapping assocating a [TunnelId] with a [PoolIndex] within a [PoolId] /// that it is apart of @@ -88,21 +87,26 @@ impl TunnelMappings { self.id_to_tunnel.insert(tunnel_id, tunnel); } - /// Assocates the provided `address` to the provided `tunnel` + /// Assocates the provided `association` to the provided `tunnel` /// - /// Creates a mapping for the [Ipv4Int] to [TunnelHandle] along - /// with [TunnelHandle] to [Ipv4Int] - fn associate_tunnel(&mut self, address: Ipv4Int, tunnel_id: TunnelId) { + /// Creates a mapping for the [AssociationId] to [TunnelHandle] along + /// with [TunnelHandle] to [AssociationId] + fn associate_tunnel(&mut self, association: AssociationId, tunnel_id: TunnelId) { // Create the IP relationship - self.ip_to_tunnel.insert(address, tunnel_id); - self.tunnel_to_ip.insert(tunnel_id, address); + self.association_to_tunnel.insert(association, tunnel_id); + self.tunnel_to_association.insert(tunnel_id, association); } /// Attempts to associate the tunnel from `address` to the provided /// `pool_id` and `pool_index` if there is a tunnel connected to /// `address` - fn associate_pool(&mut self, address: Ipv4Int, pool_id: PoolId, pool_index: PoolIndex) { - let tunnel_id = match self.ip_to_tunnel.get(&address) { + fn associate_pool( + &mut self, + association: AssociationId, + pool_id: PoolId, + pool_index: PoolIndex, + ) { + let tunnel_id = match self.association_to_tunnel.get(&association) { Some(value) => *value, None => return, }; @@ -142,8 +146,8 @@ impl TunnelMappings { _ = self.id_to_tunnel.remove(&tunnel_id); // Remove the IP association - if let Some(ip) = self.tunnel_to_ip.remove(&tunnel_id) { - self.ip_to_tunnel.remove(&ip); + if let Some(association) = self.tunnel_to_association.remove(&tunnel_id) { + self.association_to_tunnel.remove(&association); } // Remove the slot association @@ -169,17 +173,24 @@ impl TunnelService { /// Wrapper around [`TunnelMappings::associate_tunnel`] that holds the service /// write lock before operating #[inline] - pub fn associate_tunnel(&self, address: Ipv4Int, tunnel_id: TunnelId) { - self.mappings.write().associate_tunnel(address, tunnel_id) + pub fn associate_tunnel(&self, association: AssociationId, tunnel_id: TunnelId) { + self.mappings + .write() + .associate_tunnel(association, tunnel_id) } /// Wrapper around [`TunnelMappings::associate_pool`] that holds the service /// write lock before operating #[inline] - pub fn associate_pool(&self, address: Ipv4Int, pool_id: PoolId, pool_index: PoolIndex) { + pub fn associate_pool( + &self, + association: AssociationId, + pool_id: PoolId, + pool_index: PoolIndex, + ) { self.mappings .write() - .associate_pool(address, pool_id, pool_index) + .associate_pool(association, pool_id, pool_index) } /// Wrapper around [`TunnelMappings::get_tunnel_route`] that holds the service @@ -264,8 +275,9 @@ impl Tunnel { /// Starts a new tunnel on `io` using the tunnel `service` /// /// ## Arguments - /// * `service` - The service to add the tunnel to - /// * `io` - The underlying tunnel IO + /// * `service` - The service to add the tunnel to + /// * `io` - The underlying tunnel IO + /// * `association` - The client association ID for this tunnel pub fn start(service: Arc, io: Upgraded) -> TunnelId { let (tx, rx) = mpsc::unbounded_channel(); diff --git a/src/session/mod.rs b/src/session/mod.rs index bcdd49c..bb8490d 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -18,7 +18,7 @@ use crate::{ database::entities::Player, services::{ game::{GameRef, WeakGameRef}, - sessions::Sessions, + sessions::{AssociationId, Sessions}, }, session::models::{NetworkAddress, QosNetworkData}, utils::{ @@ -57,6 +57,11 @@ pub type WeakSessionLink = Weak; pub struct Session { id: u32, pub addr: Ipv4Addr, + + /// User will not have an association if they are using an outdated + /// client version. + pub association: Option, + busy_lock: QueueLock, tx: mpsc::UnboundedSender, data: Mutex>, @@ -214,6 +219,7 @@ impl Session { pub async fn start( io: Upgraded, addr: Ipv4Addr, + association: Option, router: Arc, sessions: Arc, ) { @@ -224,6 +230,7 @@ impl Session { let session = Arc::new(Self { id, + association, busy_lock: QueueLock::new(), tx, data: Default::default(), From b5dbedc87c271d40057199da423924e0db7bb632 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Fri, 22 Dec 2023 20:14:21 +1300 Subject: [PATCH 17/19] Swapped local tdf version for published release --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1025642..df1e622 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ hyper = "0.14.25" tower = "0.4" bitflags = { version = "2.3.1", features = ["serde"] } -tdf = { version = "0.3", path = "../../tdf" } +tdf = { version = "0.3" } bytes = "1.4.0" indoc = "2" From 335f17ee5c7e31a19aaf4e595077846fc92817af Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Fri, 22 Dec 2023 20:25:27 +1300 Subject: [PATCH 18/19] Bumped tdf for fixed macros --- Cargo.lock | 196 ++++++++++++++++++++++++++--------------------------- Cargo.toml | 2 +- 2 files changed, 96 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b380a82..510c7f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,9 +68,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" [[package]] name = "arc-swap" @@ -109,18 +109,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -318,9 +318,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] name = "core-foundation" @@ -373,9 +373,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +checksum = "b9bcf5bdbfdd6030fb4a1c497b5d5fc5921aa2f60d359a17e249c0e6df3de153" dependencies = [ "cfg-if", "crossbeam-utils", @@ -383,9 +383,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if", ] @@ -421,7 +421,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -432,7 +432,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -534,7 +534,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf0fab0b584e67341bbfedce7c8d59d9cebaa9088fa494338ed4f8be92130bd3" dependencies = [ "quote", - "syn 2.0.39", + "syn 2.0.42", "walkdir", ] @@ -701,7 +701,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -822,9 +822,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hkdf" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" dependencies = [ "hmac", ] @@ -840,11 +840,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.5" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -860,9 +860,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http", @@ -883,9 +883,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", @@ -898,7 +898,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", @@ -982,7 +982,7 @@ checksum = "ce243b1bfa62ffc028f1cc3b6034ec63d649f3031bc8a4fbbb004e1ac17d1f68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -993,18 +993,18 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" dependencies = [ "either", ] [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" @@ -1026,9 +1026,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.150" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libm" @@ -1185,9 +1185,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1309,9 +1309,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "ordered-float" @@ -1343,7 +1343,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1427,7 +1427,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1487,9 +1487,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "pocket-relay" @@ -1561,9 +1561,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] @@ -1662,9 +1662,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64", "bytes", @@ -1702,9 +1702,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.6" +version = "0.17.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" dependencies = [ "cc", "getrandom", @@ -1762,9 +1762,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.26" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", @@ -1775,9 +1775,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.9" +version = "0.21.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring", @@ -1812,9 +1812,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "same-file" @@ -1851,14 +1851,14 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] name = "sea-orm" -version = "0.12.8" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8e2dd2f8a2d129c1632ec45dcfc15c44cc3d8b969adc8ac58c5f011ca7aecee" +checksum = "cf9195a2b2a182cbee3f76cf2a97c20204022f91259bdf8a48b537788202775b" dependencies = [ "async-stream", "async-trait", @@ -1879,23 +1879,23 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "0.12.6" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "816183a751bf9c22087679b20b6142da0b5c6d8981835ebb7b99bf1bf924640a" +checksum = "66c6acfe3d49625c679955c7e7e7cd2d72b512a5c77bcd535a74aa41590b9f28" dependencies = [ "heck", "proc-macro2", "quote", "sea-bae", - "syn 2.0.39", + "syn 2.0.42", "unicode-ident", ] [[package]] name = "sea-orm-migration" -version = "0.12.6" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d45937e5d4869a0dcf0222bb264df564c077cbe9b312265f3717401d023a633" +checksum = "b3d06dac448288ceb630994b420cd8d5410543c0b09367f40ed505c2f03b266a" dependencies = [ "async-trait", "futures", @@ -1907,9 +1907,9 @@ dependencies = [ [[package]] name = "sea-query" -version = "0.30.4" +version = "0.30.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41558fa9bb5f4d73952dac0b9d9c2ce23966493fc9ee0008037b01d709838a68" +checksum = "e40446e3c048cec0802375f52462a05cc774b9ea6af1dffba6c646b7825e4cf9" dependencies = [ "chrono", "derivative", @@ -1938,7 +1938,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "thiserror", ] @@ -1982,7 +1982,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2083,16 +2083,6 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.5" @@ -2140,9 +2130,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" +checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" dependencies = [ "itertools", "nom", @@ -2401,9 +2391,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" dependencies = [ "proc-macro2", "quote", @@ -2440,6 +2430,8 @@ dependencies = [ [[package]] name = "tdf" version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6672758b627bfc3a3a3c44e841b5340d9dc8420b705e3f6aa72d904da9964a5f" dependencies = [ "serde", "tdf-derive", @@ -2448,11 +2440,13 @@ dependencies = [ [[package]] name = "tdf-derive" version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd55cd8859658cb77bea257d1964502f12068ba6f80c88eee00acb64122bab73" dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2470,22 +2464,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2525,9 +2519,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -2537,7 +2531,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -2550,7 +2544,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2636,7 +2630,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2665,9 +2659,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" @@ -2677,9 +2671,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -2800,7 +2794,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "wasm-bindgen-shared", ] @@ -2834,7 +2828,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3063,22 +3057,22 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.28" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.28" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index df1e622..1293e3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ hyper = "0.14.25" tower = "0.4" bitflags = { version = "2.3.1", features = ["serde"] } -tdf = { version = "0.3" } +tdf = { version = "0.4" } bytes = "1.4.0" indoc = "2" From 25d288bb231645083c35a97bde0c44dd22574cab Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Fri, 22 Dec 2023 20:28:05 +1300 Subject: [PATCH 19/19] Updated lock file --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 510c7f2..9b587f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2429,9 +2429,9 @@ dependencies = [ [[package]] name = "tdf" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6672758b627bfc3a3a3c44e841b5340d9dc8420b705e3f6aa72d904da9964a5f" +checksum = "ed0edae747f9ac834784197050fb76507dab969b159a1170f3005c3c0fdf9a01" dependencies = [ "serde", "tdf-derive", @@ -2439,9 +2439,9 @@ dependencies = [ [[package]] name = "tdf-derive" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd55cd8859658cb77bea257d1964502f12068ba6f80c88eee00acb64122bab73" +checksum = "239ecb9a7409064b2121deffcb0afc3a21131f22418d55a4bdb907c576587ec1" dependencies = [ "darling", "proc-macro2",