Skip to content

Commit

Permalink
refactor: automatically handle cleanup of session resources through d…
Browse files Browse the repository at this point in the history
…rop impl
  • Loading branch information
jacobtread committed Oct 13, 2024
1 parent 77f3cd2 commit dd865b1
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 108 deletions.
12 changes: 2 additions & 10 deletions src/routes/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,10 @@ pub async fn upgrade(
IpAddress(addr): IpAddress,
Association(association_id): Association,
Extension(router): Extension<Arc<BlazeRouter>>,
Extension(sessions): Extension<Arc<Sessions>>,
Upgrade(upgrade): Upgrade,
) -> Response {
// Spawn the upgrading process to its own task
tokio::spawn(handle_upgrade(
upgrade,
addr,
association_id,
router,
sessions,
));
tokio::spawn(handle_upgrade(upgrade, addr, association_id, router));

// Let the client know to upgrade its connection
(
Expand All @@ -118,7 +111,6 @@ pub async fn handle_upgrade(
addr: Ipv4Addr,
association_id: Option<AssociationId>,
router: Arc<BlazeRouter>,
sessions: Arc<Sessions>,
) {
let upgraded = match upgrade.await {
Ok(upgraded) => upgraded,
Expand All @@ -128,7 +120,7 @@ pub async fn handle_upgrade(
}
};

Session::start(upgraded, addr, association_id, router, sessions).await;
Session::start(upgraded, addr, association_id, router).await;
}

/// GET /api/server/tunnel
Expand Down
47 changes: 41 additions & 6 deletions src/services/sessions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Service for storing links to all the currenly active
//! Service for storing links to all the currently active
//! authenticated sessions on the server
use crate::database::entities::Player;
use crate::session::{SessionLink, WeakSessionLink};
use crate::utils::hashing::IntHashMap;
use crate::utils::signing::SigningKey;
Expand All @@ -11,6 +12,7 @@ use parking_lot::Mutex;
use rand::distributions::Distribution;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
use uuid::Uuid;
Expand Down Expand Up @@ -222,14 +224,29 @@ impl Sessions {
Ok(id)
}

pub fn remove_session(&self, player_id: PlayerID) {
let sessions = &mut *self.sessions.lock();
sessions.remove(&player_id);
/// Creates an association between a session and a player, returning a
/// [SessionPlayerAssociation] which will release the
pub fn add_session(
self: &Arc<Self>,
player: Player,
link: WeakSessionLink,
) -> SessionPlayerAssociation {
// Add the session mapping
{
let sessions = &mut *self.sessions.lock();
sessions.insert(player.id, link);
}

SessionPlayerAssociation {
player: Arc::new(player),
sessions: self.clone(),
}
}

pub fn add_session(&self, player_id: PlayerID, link: WeakSessionLink) {
/// Removes an association between a session and player
fn remove_session(&self, player_id: PlayerID) {
let sessions = &mut *self.sessions.lock();
sessions.insert(player_id, link);
sessions.remove(&player_id);
}

pub fn lookup_session(&self, player_id: PlayerID) -> Option<SessionLink> {
Expand All @@ -248,6 +265,24 @@ impl Sessions {
}
}

/// Association between a session and a player, as long as this is held the
/// sessions service will maintain the link between the session and the player
///
/// Upon dropping this the session will remove the association
pub struct SessionPlayerAssociation {
/// Player belonging to the session
pub player: Arc<Player>,

// Access to the session service to remove on drop
sessions: Arc<Sessions>,
}

impl Drop for SessionPlayerAssociation {
fn drop(&mut self) {
self.sessions.remove_session(self.player.id);
}
}

/// Errors that can occur while verifying a token
#[derive(Debug, Error)]
pub enum VerifyError {
Expand Down
147 changes: 70 additions & 77 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
database::entities::Player,
services::{
game::{GameRef, WeakGameRef},
sessions::{AssociationId, Sessions},
sessions::{AssociationId, SessionPlayerAssociation},
},
session::models::{NetworkAddress, QosNetworkData},
utils::{
Expand Down Expand Up @@ -74,9 +74,6 @@ pub struct Session {

/// Mutable data associated with the session
data: Mutex<Option<SessionExtData>>,

/// Access to the sessions store for removing the session -> player association
sessions: Arc<Sessions>,
}

#[derive(Clone)]
Expand All @@ -101,21 +98,22 @@ impl SessionNotifyHandle {
}

pub struct SessionExtData {
player: Arc<Player>,
/// Session -> Player association, currently authenticated player
player_assoc: Arc<SessionPlayerAssociation>,
/// Networking information for current session
net: Arc<NetData>,
/// Currently connected game for the session
game: Option<SessionGameData>,
/// Subscribers listening for changes to this session
///
/// They will be notified when information about this session changes.
subscribers: Vec<(PlayerID, SessionNotifyHandle)>,
}

struct SessionGameData {
game_id: GameID,
game_ref: WeakGameRef,
}

impl SessionExtData {
pub fn new(player: Player) -> Self {
pub fn new(player: SessionPlayerAssociation) -> Self {
Self {
player: Arc::new(player),
player_assoc: Arc::new(player),
net: Default::default(),
game: Default::default(),
subscribers: Default::default(),
Expand All @@ -136,7 +134,7 @@ impl SessionExtData {
user_sessions::USER_ADDED,
NotifyUserAdded {
session_data: self.ext(),
user: UserIdentification::from_player(&self.player),
user: UserIdentification::from_player(&self.player_assoc.player),
},
));

Expand All @@ -146,7 +144,7 @@ impl SessionExtData {
user_sessions::USER_UPDATED,
NotifyUserUpdated {
flags: UserDataFlags::SUBSCRIBED | UserDataFlags::ONLINE,
player_id: self.player.id,
player_id: self.player_assoc.player.id,
},
));

Expand Down Expand Up @@ -180,7 +178,7 @@ impl SessionExtData {
user_sessions::COMPONENT,
user_sessions::USER_SESSION_EXTENDED_DATA_UPDATE,
UserSessionExtendedDataUpdate {
user_id: self.player.id,
user_id: self.player_assoc.player.id,
data: self.ext(),
},
);
Expand All @@ -191,6 +189,40 @@ impl SessionExtData {
}
}

impl Drop for SessionExtData {
fn drop(&mut self) {}
}

/// When dropped if the player is still connected to the game they will
/// be disconnected from the game
struct SessionGameData {
/// ID of the player session when they joined the game
player_id: PlayerID,
/// ID of the game that was joined
game_id: GameID,
/// Reference for accessing the game
game_ref: WeakGameRef,
}

impl Drop for SessionGameData {
fn drop(&mut self) {
// Attempt to access the game
let game_ref = match self.game_ref.upgrade() {
Some(value) => value,
// Game doesn't exist anymore
None => return,
};

let player_id = self.player_id;

// Spawn an async task to handle removing the player
tokio::spawn(async move {
let game = &mut *game_ref.write().await;
game.remove_player(player_id, RemoveReason::PlayerLeft);
});
}
}

#[derive(Debug, Default, Clone, Serialize)]
pub struct NetData {
pub addr: NetworkAddress,
Expand Down Expand Up @@ -234,7 +266,6 @@ impl Session {
addr: Ipv4Addr,
association: Option<AssociationId>,
router: Arc<BlazeRouter>,
sessions: Arc<Sessions>,
) {
// Obtain a session ID
let id = SESSION_IDS.fetch_add(1, Ordering::AcqRel);
Expand All @@ -248,7 +279,6 @@ impl Session {
tx,
data: Default::default(),
addr,
sessions,
});

SessionFuture {
Expand All @@ -275,8 +305,8 @@ impl Session {
/// Called when the session is considered stopped (Reader/Writer future has completed)
/// in order to clean up any remaining references to the session before dropping
fn stop(self: Arc<Self>) {
// Clear authentication
self.clear_player();
// Clear session data
self.clear_data();

// Session should now be the sole owner
let session = match Arc::try_unwrap(self) {
Expand All @@ -300,78 +330,36 @@ impl Session {
data.add_subscriber(player_id, subscriber);
}
}

pub fn remove_subscriber(&self, player_id: PlayerID) {
let data = &mut *self.data.lock();
if let Some(data) = data {
data.remove_subscriber(player_id)
}
}

pub fn set_player(&self, player: Player) -> Arc<Player> {
// Clear the current authentication
self.clear_player();

pub fn set_player(&self, player: SessionPlayerAssociation) -> Arc<Player> {
let data = &mut *self.data.lock();
let data = data.insert(SessionExtData::new(player));

data.player.clone()
data.player_assoc.player.clone()
}

/// Clears the current game returning the game data if
/// the player was in a game
///
/// Called by the game itself when the player has been removed
pub fn clear_game(&self) -> Option<(PlayerID, WeakGameRef)> {
let mut game: Option<SessionGameData> = None;
let mut player_id: Option<PlayerID> = None;

pub fn clear_game(&self) {
self.update_data(|data| {
game = data.game.take();
player_id = Some(data.player.id);
});

let game = game?;
let player_id = player_id?;

Some((player_id, game.game_ref))
}

/// Called to remove the player from its current game
pub fn remove_from_game(&self) {
let (player_id, game_ref) = match self.clear_game() {
Some(value) => value,
// Player isn't in a game
None => return,
};

let game_ref = match game_ref.upgrade() {
Some(value) => value,
// Game doesn't exist anymore
None => return,
};

// Spawn an async task to handle removing the player
tokio::spawn(async move {
let game = &mut *game_ref.write().await;
game.remove_player(player_id, RemoveReason::PlayerLeft);
// Clear active game
data.game = None;
});
}

pub fn clear_player(&self) {
self.remove_from_game();

// Check that theres authentication
let data = &mut *self.data.lock();
let data = match data {
Some(value) => value,
None => return,
};

// Existing sessions must be unsubscribed
data.subscribers.clear();

// Remove the session from the sessions service
self.sessions.remove_session(data.player.id);
/// Clears data associated with the session (Authentication/Current game/Networking)
pub fn clear_data(&self) {
// Take the current data and drop it
self.data.lock().take();
}

pub fn get_game(&self) -> Option<(GameID, GameRef)> {
Expand All @@ -390,7 +378,7 @@ impl Session {
pub fn get_lookup(&self) -> Option<LookupResponse> {
let data = &*self.data.lock();
data.as_ref().map(|data| LookupResponse {
player: data.player.clone(),
player: data.player_assoc.player.clone(),
extended_data: data.ext(),
})
}
Expand All @@ -408,12 +396,13 @@ impl Session {
}

pub fn set_game(&self, game_id: GameID, game_ref: WeakGameRef) {
// Remove the player from the game if they are already present in one
self.remove_from_game();

// Set the current game
self.update_data(|data| {
data.game = Some(SessionGameData { game_id, game_ref });
data.game = Some(SessionGameData {
player_id: data.player_assoc.player.id,
game_id,
game_ref,
});
});
}

Expand Down Expand Up @@ -461,7 +450,11 @@ impl Session {
}

// Get the authenticated player to include in the debug message
let auth = self.data.lock().as_ref().map(|data| data.player.clone());
let auth = self
.data
.lock()
.as_ref()
.map(|data| data.player_assoc.player.clone());

let debug_data = DebugSessionData {
action,
Expand Down
Loading

0 comments on commit dd865b1

Please sign in to comment.