Skip to content

Commit

Permalink
Merge pull request #65 from PocketRelay/dedicated-test
Browse files Browse the repository at this point in the history
Tunneling Support
  • Loading branch information
jacobtread authored Jan 1, 2024
2 parents 456cf33 + 25d288b commit 9519515
Show file tree
Hide file tree
Showing 20 changed files with 961 additions and 147 deletions.
212 changes: 107 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ hyper = "0.14.25"
tower = "0.4"

bitflags = { version = "2.3.1", features = ["serde"] }
tdf = { version = "0.1" }
tdf = { version = "0.4" }
bytes = "1.4.0"

indoc = "2"
Expand All @@ -63,6 +63,8 @@ hashbrown = { version = "0.14.3", default-features = false, features = [
"inline-more",
] }

uuid = { version = "^1", features = ["v4", "serde", "fast-rng"] }

# SeaORM
[dependencies.sea-orm]
version = "^0"
Expand Down
24 changes: 23 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ use crate::session::models::Port;
/// The server version extracted from the Cargo.toml
pub const VERSION: &str = env!("CARGO_PKG_VERSION");

/// Config variables that are required to always exist during
/// runtime for various tasks
#[derive(Default)]
pub struct RuntimeConfig {
pub qos: QosServerConfig,
pub reverse_proxy: bool,
pub galaxy_at_war: GalaxyAtWarConfig,
pub menu_message: String,
pub dashboard: DashboardConfig,
pub tunnel: TunnelConfig,
}

/// Environment variable key to load the config from
Expand Down Expand Up @@ -74,6 +77,7 @@ pub struct Config {
pub galaxy_at_war: GalaxyAtWarConfig,
pub logging: LevelFilter,
pub retriever: RetrieverConfig,
pub tunnel: TunnelConfig,
}

impl Default for Config {
Expand All @@ -88,16 +92,34 @@ impl Default for Config {
galaxy_at_war: Default::default(),
logging: LevelFilter::Info,
retriever: Default::default(),
tunnel: Default::default()
}
}
}

/// Configuration for how the server should use tunneling
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TunnelConfig {
/// Only tunnel players with non "Open" NAT types if the QoS
/// server is set to [`QosServerConfig::Disabled`] this is
/// equivilent to [`TunnelConfig::Always`]
#[default]
Stricter,
/// Always tunnel connections through the server regardless
/// of NAT type
Always,
/// Never tunnel connections through the server
Disabled,
}

/// Configuration for the server QoS setup
#[derive(Debug, Default, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum QosServerConfig {
/// Use the official QoS server
Official,
/// Use the local QoS server (might cause issues)
/// Use the local QoS server (might cause issues with WAN)
#[default]
Local,
/// Use a custom QoS server
Expand Down
11 changes: 8 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use crate::{
config::{RuntimeConfig, VERSION},
services::{game::manager::GameManager, retriever::Retriever, sessions::Sessions},
services::{
game::manager::GameManager, retriever::Retriever, sessions::Sessions, tunnel::TunnelService,
},
utils::signing::SigningKey,
};
use axum::{Extension, Server};
Expand Down Expand Up @@ -42,6 +44,7 @@ async fn main() {
menu_message: config.menu_message,
dashboard: config.dashboard,
qos: config.qos,
tunnel: config.tunnel,
};

debug!("QoS server: {:?}", &runtime_config.qos);
Expand All @@ -55,9 +58,10 @@ async fn main() {
SigningKey::global()
);

let game_manager = Arc::new(GameManager::new());
let sessions = Arc::new(Sessions::new(signing_key));
let config = Arc::new(runtime_config);
let tunnel_service = Arc::new(TunnelService::default());
let game_manager = Arc::new(GameManager::new(tunnel_service.clone(), config.clone()));
let sessions = Arc::new(Sessions::new(signing_key));
let retriever = Arc::new(retriever);

// Initialize session router
Expand All @@ -79,6 +83,7 @@ async fn main() {
.layer(Extension(router))
.layer(Extension(game_manager))
.layer(Extension(sessions))
.layer(Extension(tunnel_service))
.into_make_service_with_connect_info::<SocketAddr>();

info!("Starting server on {} (v{})", addr, VERSION);
Expand Down
47 changes: 47 additions & 0 deletions src/middleware/association.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::services::sessions::{AssociationId, Sessions};
use axum::{extract::FromRequestParts, response::IntoResponse};
use futures_util::future::BoxFuture;
use hyper::StatusCode;
use std::{future::ready, sync::Arc};

/// Extractor for retireving the association token from a request headers
pub struct Association(pub Option<AssociationId>);

/// The HTTP header that contains the association token
const TOKEN_HEADER: &str = "x-association";

impl<S> FromRequestParts<S> for Association {
type Rejection = InvalidAssociation;

fn from_request_parts<'a, 'b, 'c>(
parts: &'a mut axum::http::request::Parts,
_state: &'b S,
) -> BoxFuture<'c, Result<Self, Self::Rejection>>
where
'a: 'c,
'b: 'c,
Self: 'c,
{
let sessions = parts
.extensions
.get::<Arc<Sessions>>()
.expect("Sessions extension missing");

let assocation_id = parts
.headers
.get(TOKEN_HEADER)
.and_then(|value| value.to_str().ok())
.and_then(|token| sessions.verify_assoc_token(token).ok());

Box::pin(ready(Ok(Self(assocation_id))))
}
}

/// Association token was invalid
pub struct InvalidAssociation;

impl IntoResponse for InvalidAssociation {
fn into_response(self) -> axum::response::Response {
(StatusCode::BAD_REQUEST, "Invalid association token").into_response()
}
}
2 changes: 1 addition & 1 deletion src/middleware/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<S> FromRequestParts<S> for Auth {
let sessions = parts
.extensions
.get::<Arc<Sessions>>()
.expect("Database connection extension missing");
.expect("Sessions extension missing");

// Extract the token from the headers and verify it as a player id
let player_id = parts
Expand Down
2 changes: 2 additions & 0 deletions src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// Extractor for association tokens
pub mod association;
/// Middleware functions an enums related to token authentication
pub mod auth;
/// Middleware functions related to CORS implementation
Expand Down
1 change: 1 addition & 0 deletions src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn router() -> Router {
.route("/", get(server::server_details))
.route("/log", get(server::get_log).delete(clear_log))
.route("/upgrade", get(server::upgrade))
.route("/tunnel", get(server::tunnel))
.route("/telemetry", post(server::submit_telemetry))
.route("/dashboard", get(server::dashboard_details)),
)
Expand Down
81 changes: 76 additions & 5 deletions src/routes/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
use crate::{
config::{RuntimeConfig, VERSION},
database::entities::players::PlayerRole,
middleware::{auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade},
services::sessions::Sessions,
middleware::{
association::Association, auth::AdminAuth, ip_address::IpAddress, upgrade::Upgrade,
},
services::{
sessions::{AssociationId, Sessions},
tunnel::{Tunnel, TunnelService},
},
session::{router::BlazeRouter, Session},
utils::logging::LOG_FILE_NAME,
};
Expand All @@ -22,23 +27,30 @@ use tokio::fs::{read_to_string, OpenOptions};

/// Response detailing the information about this Pocket Relay server
/// contains the version information as well as the server information
///
/// As of v0.6.0 it also includes an association token for the client
/// to use in order to associate multiple connections
#[derive(Serialize)]
pub struct ServerDetails {
/// Identifier used to ensure the server is a Pocket Relay server
ident: &'static str,
/// The server version
version: &'static str,
/// Random association token for the client to use
association: String,
}

/// GET /api/server
///
/// Handles providing the server details. The Pocket Relay client tool
/// uses this endpoint to validate that the provided host is a valid
/// Pocket Relay server.
pub async fn server_details() -> Json<ServerDetails> {
pub async fn server_details(Extension(sessions): Extension<Arc<Sessions>>) -> Json<ServerDetails> {
let association = sessions.create_assoc_token();
Json(ServerDetails {
ident: "POCKET_RELAY_SERVER",
version: VERSION,
association,
})
}

Expand Down Expand Up @@ -69,12 +81,19 @@ pub async fn dashboard_details(
/// as blaze sessions using HTTP Upgrade
pub async fn upgrade(
IpAddress(addr): IpAddress,
Association(association_id): Association,
Extension(router): Extension<Arc<BlazeRouter>>,
Extension(sessions): Extension<Arc<Sessions>>,
Upgrade(upgrade): Upgrade,
) -> Response {
// Spawn the upgrading process to its own task
tokio::spawn(handle_upgrade(upgrade, addr, router, sessions));
tokio::spawn(handle_upgrade(
upgrade,
addr,
association_id,
router,
sessions,
));

// Let the client know to upgrade its connection
(
Expand All @@ -91,6 +110,7 @@ pub async fn upgrade(
pub async fn handle_upgrade(
upgrade: OnUpgrade,
addr: Ipv4Addr,
association_id: Option<AssociationId>,
router: Arc<BlazeRouter>,
sessions: Arc<Sessions>,
) {
Expand All @@ -102,7 +122,58 @@ pub async fn handle_upgrade(
}
};

Session::start(upgraded, addr, router, sessions).await;
Session::start(upgraded, addr, association_id, router, sessions).await;
}

/// GET /api/server/tunnel
///
/// Handles upgrading connections from the Pocket Relay Client tool
/// from HTTP over to the Blaze protocol for proxing the game traffic
/// as blaze sessions using HTTP Upgrade
pub async fn tunnel(
Association(association_id): Association,
Extension(tunnel_service): Extension<Arc<TunnelService>>,
Upgrade(upgrade): Upgrade,
) -> Response {
// Handle missing token
let Some(association_id) = association_id else {
return (StatusCode::BAD_REQUEST, "Missing association token").into_response();
};

// Spawn the upgrading process to its own task
tokio::spawn(handle_upgrade_tunnel(
upgrade,
association_id,
tunnel_service,
));

// Let the client know to upgrade its connection
(
// Switching protocols status code
StatusCode::SWITCHING_PROTOCOLS,
// Headers required for upgrading
[(header::CONNECTION, "upgrade"), (header::UPGRADE, "tunnel")],
)
.into_response()
}

/// Handles upgrading a connection and starting a new session
/// from the connection
pub async fn handle_upgrade_tunnel(
upgrade: OnUpgrade,
association: AssociationId,
tunnel_service: Arc<TunnelService>,
) {
let upgraded = match upgrade.await {
Ok(upgraded) => upgraded,
Err(err) => {
error!("Failed to upgrade client connection: {}", err);
return;
}
};

let tunnel_id = Tunnel::start(tunnel_service.clone(), upgraded);
tunnel_service.associate_tunnel(association, tunnel_id);
}

/// GET /api/server/log
Expand Down
30 changes: 25 additions & 5 deletions src/services/game/manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{rules::RuleSet, AttrMap, Game, GameJoinableState, GamePlayer, GameRef, GameSnapshot};
use crate::{
config::RuntimeConfig,
services::tunnel::TunnelService,
session::{
models::game_manager::{
AsyncMatchmakingStatus, GameSettings, GameSetupContext, MatchmakingResult,
Expand Down Expand Up @@ -37,6 +39,10 @@ pub struct GameManager {
next_id: AtomicU32,
/// Matchmaking entry queue
queue: Mutex<VecDeque<MatchmakingEntry>>,
/// Tunneling service
tunnel_service: Arc<TunnelService>,
/// Runtime configuration
config: Arc<RuntimeConfig>,
}

/// Entry into the matchmaking queue
Expand All @@ -56,11 +62,13 @@ impl GameManager {
const MAX_RELEASE_ATTEMPTS: u8 = 20;

/// Starts a new game manager service returning its link
pub fn new() -> Self {
pub fn new(tunnel_service: Arc<TunnelService>, config: Arc<RuntimeConfig>) -> Self {
Self {
games: Default::default(),
next_id: AtomicU32::new(1),
queue: Default::default(),
tunnel_service,
config,
}
}

Expand Down Expand Up @@ -140,12 +148,18 @@ impl GameManager {
context: GameSetupContext,
) {
// Add the player to the game
let game_id = {
let (game_id, index) = {
let game = &mut *game_ref.write().await;
game.add_player(player, context);
game.id
let slot = game.add_player(player, context, &self.config);
(game.id, slot)
};

// Allocate tunnel if supported by client
if let Some(association) = session.association {
self.tunnel_service
.associate_pool(association, game_id, index as u8);
}

// Update the player current game
session.set_game(game_id, Arc::downgrade(&game_ref));
}
Expand Down Expand Up @@ -188,7 +202,13 @@ impl GameManager {
setting: GameSettings,
) -> (GameRef, GameID) {
let id = self.next_id.fetch_add(1, Ordering::AcqRel);
let game = Game::new(id, attributes, setting, self.clone());
let game = Game::new(
id,
attributes,
setting,
self.clone(),
self.tunnel_service.clone(),
);
let link = Arc::new(RwLock::new(game));
{
let games = &mut *self.games.write().await;
Expand Down
Loading

0 comments on commit 9519515

Please sign in to comment.