From 539a26f87465d4fb99cc8f7d42abd3ffe53ca815 Mon Sep 17 00:00:00 2001 From: PotentialStyx <62217716+PotentialStyx@users.noreply.github.com> Date: Sat, 13 Apr 2024 16:53:47 -0700 Subject: [PATCH] refactor: Use special new type for session ids instead of i32 --- services/src/chat.rs | 6 +- services/src/dotreplit.rs | 4 +- services/src/exec.rs | 4 +- services/src/fsevents.rs | 4 +- services/src/gcsfiles.rs | 4 +- services/src/git.rs | 8 +-- services/src/lib.rs | 6 +- services/src/ot.rs | 6 +- services/src/output.rs | 8 +-- services/src/presence.rs | 29 ++++---- services/src/shell.rs | 8 +-- services/src/snapshot.rs | 4 +- services/src/toolchain.rs | 4 +- services/src/traits.rs | 10 +-- services/src/types/channel_info.rs | 15 +++-- services/src/types/messaging.rs | 12 ++-- services/src/types/mod.rs | 3 + services/src/types/pty.rs | 16 ++--- services/src/types/session.rs | 33 +++++++++ src/goval_server.rs | 105 ++++++++++++++--------------- src/main.rs | 3 +- src/replspace_server.rs | 11 +-- 22 files changed, 173 insertions(+), 130 deletions(-) create mode 100644 services/src/types/session.rs diff --git a/services/src/chat.rs b/services/src/chat.rs index b8abf55..019059b 100644 --- a/services/src/chat.rs +++ b/services/src/chat.rs @@ -2,7 +2,7 @@ pub struct Chat { history: Vec, } -use crate::{ClientInfo, IPCMessage, SendSessions}; +use crate::{ClientInfo, IPCMessage, SendSessions, Session}; use super::traits; use anyhow::{format_err, Result}; @@ -21,7 +21,7 @@ impl traits::Service for Chat { &mut self, info: &super::types::ChannelInfo, message: goval::Command, - session: i32, + session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), @@ -51,7 +51,7 @@ impl traits::Service for Chat { &mut self, _info: &super::types::ChannelInfo, _client: ClientInfo, - _session: i32, + _session: Session, _sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { let mut scrollback = goval::Command::default(); diff --git a/services/src/dotreplit.rs b/services/src/dotreplit.rs index ae33cbb..0e96f98 100644 --- a/services/src/dotreplit.rs +++ b/services/src/dotreplit.rs @@ -1,4 +1,6 @@ pub struct DotReplit {} +use crate::Session; + use super::traits; use anyhow::{format_err, Result}; @@ -10,7 +12,7 @@ impl traits::Service for DotReplit { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/exec.rs b/services/src/exec.rs index 8cd0bb9..6e32253 100644 --- a/services/src/exec.rs +++ b/services/src/exec.rs @@ -4,7 +4,7 @@ pub struct Exec { current_ref: String, } -use crate::Proc; +use crate::{Proc, Session}; use super::traits; use anyhow::{format_err, Result}; @@ -16,7 +16,7 @@ impl traits::Service for Exec { &mut self, info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/fsevents.rs b/services/src/fsevents.rs index fe3f2ff..86fc295 100644 --- a/services/src/fsevents.rs +++ b/services/src/fsevents.rs @@ -4,7 +4,7 @@ pub struct FSEvents { } use super::traits; -use crate::{FSEvent, FSWatcher}; +use crate::{FSEvent, FSWatcher, Session}; use anyhow::{format_err, Result}; use goval::{Command, File, FileEvent}; use tracing::{error, trace, warn}; @@ -30,7 +30,7 @@ impl traits::Service for FSEvents { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/gcsfiles.rs b/services/src/gcsfiles.rs index 0249271..19d0bcc 100644 --- a/services/src/gcsfiles.rs +++ b/services/src/gcsfiles.rs @@ -1,5 +1,7 @@ pub struct GCSFiles {} +use crate::Session; + use super::traits; use anyhow::{format_err, Result}; use async_trait::async_trait; @@ -12,7 +14,7 @@ impl traits::Service for GCSFiles { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/git.rs b/services/src/git.rs index 0d8e273..4eae8d6 100644 --- a/services/src/git.rs +++ b/services/src/git.rs @@ -3,7 +3,7 @@ pub struct Git { } use std::collections::HashMap; -use crate::ReplspaceMessage; +use crate::{ReplspaceMessage, Session}; use super::traits; use anyhow::{format_err, Result}; @@ -17,7 +17,7 @@ impl traits::Service for Git { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), @@ -61,10 +61,10 @@ impl traits::Service for Git { &mut self, info: &super::types::ChannelInfo, msg: ReplspaceMessage, - session: i32, + session: Session, respond: Option>, ) -> Result<()> { - if session == 0 { + if session == Session::zero() { warn!( ?msg, "Got replspace message from an unknown session, ignoring" diff --git a/services/src/lib.rs b/services/src/lib.rs index 6ecf37a..9da3ea9 100644 --- a/services/src/lib.rs +++ b/services/src/lib.rs @@ -115,7 +115,7 @@ impl Channel { // Private functions impl Channel { - async fn message(&mut self, message: goval::Command, session: i32) -> Result<()> { + async fn message(&mut self, message: goval::Command, session: Session) -> Result<()> { if let Some(mut msg) = self ._inner .message(&self.info, message.clone(), session) @@ -130,7 +130,7 @@ impl Channel { async fn attach( &mut self, - session: i32, + session: Session, client: ClientInfo, sender: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { @@ -149,7 +149,7 @@ impl Channel { Ok(()) } - async fn detach(&mut self, session: i32) -> Result<()> { + async fn detach(&mut self, session: Session) -> Result<()> { self.info.sessions.retain(|sess, _| sess != &session); self.info.clients.retain(|sess, _| sess != &session); self._inner.detach(&self.info, session).await?; diff --git a/services/src/ot.rs b/services/src/ot.rs index 361fead..9fa34e8 100644 --- a/services/src/ot.rs +++ b/services/src/ot.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use crate::{client::ClientInfo, fs_watcher::FSWatcher, FSEvent, IPCMessage}; +use crate::{client::ClientInfo, fs_watcher::FSWatcher, FSEvent, IPCMessage, Session}; use super::traits; use anyhow::{format_err, Result}; @@ -48,7 +48,7 @@ impl traits::Service for OT { &mut self, info: &super::types::ChannelInfo, message: goval::Command, - session: i32, + session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), @@ -384,7 +384,7 @@ impl traits::Service for OT { &mut self, _info: &super::types::ChannelInfo, _client: ClientInfo, - _session: i32, + _session: Session, _sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { if self.path.is_empty() { diff --git a/services/src/output.rs b/services/src/output.rs index 9054426..123c9d1 100644 --- a/services/src/output.rs +++ b/services/src/output.rs @@ -16,7 +16,7 @@ use tracing::{debug, warn}; use super::traits; use super::types::pty::Pty; -use crate::{ClientInfo, IPCMessage}; +use crate::{ClientInfo, IPCMessage, Session}; use anyhow::{format_err, Result}; #[async_trait] @@ -25,7 +25,7 @@ impl traits::Service for Output { &mut self, info: &super::types::ChannelInfo, _client: ClientInfo, - session: i32, + session: Session, sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { if let Some(pty) = &mut self.pty { @@ -58,7 +58,7 @@ impl traits::Service for Output { Ok(Some(status)) } - async fn detach(&mut self, _info: &super::types::ChannelInfo, session: i32) -> Result<()> { + async fn detach(&mut self, _info: &super::types::ChannelInfo, session: Session) -> Result<()> { if let Some(pty) = &mut self.pty { pty.session_leave(session).await?; } @@ -69,7 +69,7 @@ impl traits::Service for Output { &mut self, info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/presence.rs b/services/src/presence.rs index 3cdd0ad..fcb2ff7 100644 --- a/services/src/presence.rs +++ b/services/src/presence.rs @@ -1,8 +1,8 @@ pub struct Presence { users: Vec, - files: HashMap, + files: HashMap, } -use crate::{ClientInfo, IPCMessage, SendSessions}; +use crate::{ClientInfo, IPCMessage, SendSessions, Session}; use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, @@ -28,7 +28,7 @@ impl traits::Service for Presence { &mut self, info: &super::types::ChannelInfo, message: goval::Command, - session: i32, + session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), @@ -39,25 +39,28 @@ impl traits::Service for Presence { goval::command::Body::FollowUser(follow) => { let follow_notif = goval::Command { body: Some(goval::command::Body::FollowUser(goval::FollowUser { - session, + session: session.into(), })), ..Default::default() }; - info.send(follow_notif, SendSessions::Only(follow.session)) + info.send(follow_notif, SendSessions::Only(Session(follow.session))) .await?; Ok(None) } goval::command::Body::UnfollowUser(unfollow) => { let unfollow_notif = goval::Command { body: Some(goval::command::Body::UnfollowUser(goval::UnfollowUser { - session, + session: session.into(), })), ..Default::default() }; - info.send(unfollow_notif, SendSessions::Only(unfollow.session)) - .await?; + info.send( + unfollow_notif, + SendSessions::Only(Session(unfollow.session)), + ) + .await?; Ok(None) } goval::command::Body::OpenFile(file) => { @@ -76,7 +79,7 @@ impl traits::Service for Presence { let _inner = goval::FileOpened { user_id: user.id, file: file.file, - session, + session: session.into(), timestamp, }; @@ -98,7 +101,7 @@ impl traits::Service for Presence { &mut self, info: &super::types::ChannelInfo, client: ClientInfo, - session: i32, + session: Session, _sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { let mut roster = goval::Command::default(); @@ -115,7 +118,7 @@ impl traits::Service for Presence { roster.body = Some(goval::command::Body::Roster(_inner)); let user = goval::User { - session, + session: session.into(), id: client.id, name: client.username, ..Default::default() @@ -134,14 +137,14 @@ impl traits::Service for Presence { Ok(Some(roster)) } - async fn detach(&mut self, info: &super::types::ChannelInfo, session: i32) -> Result<()> { + async fn detach(&mut self, info: &super::types::ChannelInfo, session: Session) -> Result<()> { self.files.remove(&session); let mut part = goval::Command::default(); let mut flag = false; let users = self.users.clone(); for (idx, user) in users.iter().enumerate() { - if user.session == session { + if user.session == session.0 { flag = true; part.body = Some(goval::command::Body::Part(user.clone())); self.users.swap_remove(idx); diff --git a/services/src/shell.rs b/services/src/shell.rs index d74f041..69523d0 100644 --- a/services/src/shell.rs +++ b/services/src/shell.rs @@ -9,7 +9,7 @@ use tracing::debug; use super::traits; use super::types::pty::Pty; -use crate::{ClientInfo, IPCMessage}; +use crate::{ClientInfo, IPCMessage, Session}; use anyhow::{format_err, Result}; #[async_trait] @@ -18,14 +18,14 @@ impl traits::Service for Shell { &mut self, _info: &super::types::ChannelInfo, _client: ClientInfo, - session: i32, + session: Session, sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { self.pty.session_join(session, sender).await?; Ok(None) } - async fn detach(&mut self, _info: &super::types::ChannelInfo, session: i32) -> Result<()> { + async fn detach(&mut self, _info: &super::types::ChannelInfo, session: Session) -> Result<()> { self.pty.session_leave(session).await?; Ok(()) } @@ -34,7 +34,7 @@ impl traits::Service for Shell { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/snapshot.rs b/services/src/snapshot.rs index 7def841..21e568d 100644 --- a/services/src/snapshot.rs +++ b/services/src/snapshot.rs @@ -1,5 +1,7 @@ pub struct Snapshot {} +use crate::Session; + use super::traits; use anyhow::{format_err, Result}; use async_trait::async_trait; @@ -10,7 +12,7 @@ impl traits::Service for Snapshot { &mut self, _info: &super::types::ChannelInfo, message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/toolchain.rs b/services/src/toolchain.rs index 9610df5..48b7217 100644 --- a/services/src/toolchain.rs +++ b/services/src/toolchain.rs @@ -1,4 +1,6 @@ pub struct Toolchain {} +use crate::Session; + use super::traits; use async_trait::async_trait; use tracing::debug; @@ -11,7 +13,7 @@ impl traits::Service for Toolchain { &mut self, _info: &super::types::ChannelInfo, // TODO: use this to give real toolchain info message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { let body = match message.body.clone() { None => return Err(format_err!("Expected command body")), diff --git a/services/src/traits.rs b/services/src/traits.rs index a3ae67c..e0e19b8 100644 --- a/services/src/traits.rs +++ b/services/src/traits.rs @@ -2,7 +2,7 @@ use anyhow::Result; use async_trait::async_trait; use tokio::sync::mpsc::Sender; -use crate::{ClientInfo, FSEvent, IPCMessage, ReplspaceMessage}; +use crate::{ClientInfo, FSEvent, IPCMessage, ReplspaceMessage, Session}; #[async_trait] pub(crate) trait Service { @@ -18,7 +18,7 @@ pub(crate) trait Service { &mut self, _info: &super::types::ChannelInfo, _message: goval::Command, - _session: i32, + _session: Session, ) -> Result> { Ok(None) } @@ -35,7 +35,7 @@ pub(crate) trait Service { &mut self, _info: &super::types::ChannelInfo, _msg: ReplspaceMessage, - _session: i32, + _session: Session, _respond: Option>, ) -> Result<()> { Ok(()) @@ -49,13 +49,13 @@ pub(crate) trait Service { &mut self, _info: &super::types::ChannelInfo, _client: ClientInfo, - _session: i32, + _session: Session, _sender: tokio::sync::mpsc::UnboundedSender, ) -> Result> { Ok(None) } - async fn detach(&mut self, _info: &super::types::ChannelInfo, _session: i32) -> Result<()> { + async fn detach(&mut self, _info: &super::types::ChannelInfo, _session: Session) -> Result<()> { Ok(()) } } diff --git a/services/src/types/channel_info.rs b/services/src/types/channel_info.rs index 3b51ab9..3b93ad9 100644 --- a/services/src/types/channel_info.rs +++ b/services/src/types/channel_info.rs @@ -6,23 +6,24 @@ use tokio::sync::RwLock; use tracing::error; use crate::config::dotreplit::DotReplit; +use crate::Session; use super::client::ClientInfo; use super::messaging::IPCMessage; #[derive(Clone, Copy, Debug)] pub enum SendSessions { - Only(i32), - EveryoneExcept(i32), + Only(Session), + EveryoneExcept(Session), Everyone, } pub struct ChannelInfo { pub id: i32, - pub clients: HashMap>, + pub clients: HashMap>, pub service: String, pub name: Option, - pub sessions: HashMap, + pub sessions: HashMap, pub sender: tokio::sync::mpsc::UnboundedSender, pub dotreplit: Arc>, pub child_env_vars: Arc>>, @@ -30,7 +31,7 @@ pub struct ChannelInfo { impl ChannelInfo { pub async fn send(&self, mut message: goval::Command, sessions: SendSessions) -> Result<()> { - let clients: Vec; + let clients: Vec; message.channel = self.id; match sessions { SendSessions::Everyone => { @@ -43,7 +44,7 @@ impl ChannelInfo { clients = _clients; } SendSessions::EveryoneExcept(excluded) => { - message.session = -excluded; + message.session = (-excluded).into(); let mut _clients = vec![]; for client in self.clients.keys() { if client != &excluded { @@ -54,7 +55,7 @@ impl ChannelInfo { clients = _clients; } SendSessions::Only(session) => { - message.session = session; + message.session = session.into(); clients = vec![session] } } diff --git a/services/src/types/messaging.rs b/services/src/types/messaging.rs index 5c90f9f..ac542f1 100644 --- a/services/src/types/messaging.rs +++ b/services/src/types/messaging.rs @@ -3,7 +3,7 @@ use prost::Message; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::Sender; -use crate::SendSessions; +use crate::{SendSessions, Session}; use super::client::ClientInfo; @@ -22,14 +22,14 @@ pub enum ReplspaceMessage { pub enum ChannelMessage { IPC(IPCMessage), Attach( - i32, + Session, ClientInfo, tokio::sync::mpsc::UnboundedSender, ), - Detach(i32), + Detach(Session), ProcessDead(i32), FSEvent(super::FSEvent), - Replspace(i32, ReplspaceMessage, Option>), // session, message + Replspace(Session, ReplspaceMessage, Option>), // session, message Shutdown, ExternalMessage(goval::Command, SendSessions), } @@ -37,7 +37,7 @@ pub enum ChannelMessage { #[derive(Debug, Clone)] pub struct IPCMessage { pub command: goval::Command, - pub session: i32, + pub session: Session, } impl IPCMessage { @@ -59,7 +59,7 @@ impl TryFrom> for IPCMessage { fn try_from(value: Vec) -> Result { Ok(Self { command: goval::Command::decode(value.as_slice())?, - session: 0, + session: Session::zero(), }) } } diff --git a/services/src/types/mod.rs b/services/src/types/mod.rs index b3ecc3d..c7fb2df 100644 --- a/services/src/types/mod.rs +++ b/services/src/types/mod.rs @@ -21,3 +21,6 @@ pub use config::dotreplit::DotReplit; pub mod proc; pub use proc::Proc; + +pub mod session; +pub use session::Session; diff --git a/services/src/types/pty.rs b/services/src/types/pty.rs index 7749ea0..443ab04 100644 --- a/services/src/types/pty.rs +++ b/services/src/types/pty.rs @@ -8,7 +8,7 @@ use std::{ }; use tracing::error; -use crate::ChannelMessage; +use crate::{ChannelMessage, Session}; use super::IPCMessage; @@ -23,7 +23,7 @@ use tokio::sync::{Mutex, RwLock}; struct PtyWriter { channel: i32, - sessions: Arc>>>, + sessions: Arc>>>, cancelled: Arc, scrollback: Arc>, } @@ -67,7 +67,7 @@ impl Write for PtyWriter { for (session, sender) in sessions.iter() { let mut to_send = cmd.clone(); - to_send.session = *session; + to_send.session = (*session).into(); match sender.send(IPCMessage { command: to_send, @@ -90,7 +90,7 @@ impl Write for PtyWriter { pub struct Pty { channel: i32, - pub sessions: Arc>>>, + pub sessions: Arc>>>, writer: Box, cancelled: Arc, child_lock: Arc>>, @@ -102,7 +102,7 @@ impl Pty { pub async fn start( _args: Vec, channel: i32, - sessions: Arc>>>, + sessions: Arc>>>, contact: tokio::sync::mpsc::UnboundedSender, _env: Option>, ) -> Result { @@ -269,7 +269,7 @@ impl Pty { pub async fn session_join( &mut self, - session: i32, + session: Session, sender: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { if self.cancelled.load(std::sync::atomic::Ordering::SeqCst) { @@ -280,7 +280,7 @@ impl Pty { body: Some(goval::command::Body::Output( self.scrollback.read().await.clone(), )), - session, + session: session.into(), channel: self.channel, ..Default::default() }; @@ -295,7 +295,7 @@ impl Pty { Ok(()) } - pub async fn session_leave(&mut self, session: i32) -> Result<()> { + pub async fn session_leave(&mut self, session: Session) -> Result<()> { if self.cancelled.load(std::sync::atomic::Ordering::SeqCst) { return Err(format_err!("Can't remove a session from a cancelled pty")); } diff --git a/services/src/types/session.rs b/services/src/types/session.rs new file mode 100644 index 0000000..2255fd1 --- /dev/null +++ b/services/src/types/session.rs @@ -0,0 +1,33 @@ +use std::{fmt::Display, ops::Neg}; + +#[repr(transparent)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] +pub struct Session(pub i32); + +static SESSION_ZERO: Session = Session(0); + +impl Session { + pub fn zero() -> Session { + SESSION_ZERO + } +} + +impl Display for Session { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("Session {{ id: {} }}", self.0)) + } +} + +impl Neg for Session { + type Output = Self; + + fn neg(self) -> Self::Output { + Self(-self.0) + } +} + +impl From for i32 { + fn from(session: Session) -> Self { + session.0 + } +} diff --git a/src/goval_server.rs b/src/goval_server.rs index fc5b5c6..4b856f2 100644 --- a/src/goval_server.rs +++ b/src/goval_server.rs @@ -13,7 +13,7 @@ use chrono::Datelike; use anyhow::Result; use goval::{Command, OpenChannel}; -use homeval_services::{ClientInfo, ServiceMetadata}; +use homeval_services::{ClientInfo, ServiceMetadata, Session}; use prost::Message; use std::{collections::HashMap, net::SocketAddr, sync::LazyLock}; use tokio::sync::{mpsc::UnboundedSender, Mutex, RwLock}; @@ -29,16 +29,16 @@ use crate::{ static MAX_SESSION: LazyLock> = LazyLock::new(|| Mutex::new(0)); -static SESSION_CHANNELS: LazyLock>>> = +static SESSION_CHANNELS: LazyLock>>> = LazyLock::new(|| RwLock::new(HashMap::new())); -static SESSION_CLIENT_INFO: LazyLock>> = +static SESSION_CLIENT_INFO: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); static CHANNEL_METADATA: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); -static CHANNEL_SESSIONS: LazyLock>>> = +static CHANNEL_SESSIONS: LazyLock>>> = LazyLock::new(|| RwLock::new(HashMap::new())); -static SESSION_MAP: LazyLock>>> = +static SESSION_MAP: LazyLock>>> = LazyLock::new(|| RwLock::new(HashMap::new())); #[derive(Clone)] @@ -90,7 +90,7 @@ async fn on_wsv2_upgrade(socket: WebSocket, token: String, state: AppState, addr debug!("Mutex acquired..."); *max_session += 1; - let session_id = *max_session; + let session_id = Session(*max_session); drop(max_session); let (send_to_session, session_recv) = mpsc::unbounded_channel::(); @@ -129,7 +129,7 @@ async fn wsv2( async fn handle_message( message: IPCMessage, session_map: &LazyLock< - tokio::sync::RwLock>>, + tokio::sync::RwLock>>, >, max_channel: &Mutex, ) { @@ -176,7 +176,7 @@ async fn handle_message( match detach_channel(close_chan.id, message.session, true).await { Ok(_) => {} Err(err) => { - error!(%err, session = message.session, channel = close_chan.id, + error!(%err, session = %message.session, channel = close_chan.id, "Error occured while detaching from channel") } } @@ -208,7 +208,7 @@ async fn open_channel( message: IPCMessage, max_channel: &Mutex, session_map: &LazyLock< - tokio::sync::RwLock>>, + tokio::sync::RwLock>>, >, ) -> Result<()> { let searcher: &str = &open_chan.service; @@ -376,8 +376,8 @@ async fn open_channel( Ok(()) } -async fn detach_channel(channel: i32, session: i32, forced: bool) -> Result<()> { - trace!(session, channel, forced, "Client is closing a channel"); +async fn detach_channel(channel: i32, session: Session, forced: bool) -> Result<()> { + trace!(%session, channel, forced, "Client is closing a channel"); SESSION_CHANNELS .write() @@ -437,7 +437,7 @@ async fn accept_connection( ws_stream: WebSocket, propagate: mpsc::UnboundedSender, mut sent: mpsc::UnboundedReceiver, - session: i32, + session: Session, client: ClientInfo, addr: SocketAddr, ) -> Result<()> { @@ -495,55 +495,48 @@ async fn accept_connection( tokio::spawn(async move { while let Some(_msg) = read.next().await { match _msg { - Ok(msg) => { - match msg { - WsMessage::Binary(buf) => { - let _message: anyhow::Result = buf.try_into(); - let message = match _message { - Ok(mut msg) => { - msg.session = session; - msg - } - Err(err) => { - error!(%err, session, "Error decoding message from client"); - continue; - } - }; - - if let Err(err) = propagate.send(message) { - error!(session = session, ?err, "An error occured when enqueing message to global message queue") + Ok(msg) => match msg { + WsMessage::Binary(buf) => { + let _message: anyhow::Result = buf.try_into(); + let message = match _message { + Ok(mut msg) => { + msg.session = session; + msg } - } - WsMessage::Close(_) => { - warn!(session, "CLOSING SESSION"); - for _channel in - SESSION_CHANNELS.read().await.get(&session).unwrap().iter() - { - let channel = *_channel; - tokio::spawn(async move { - match detach_channel(channel, session, true).await { - Ok(_) => {} - Err(err) => { - error!(%err, session, channel, "Error occured while detaching from channel") - } - } - }); + Err(err) => { + error!(%err, %session, "Error decoding message from client"); + continue; } + }; - SESSION_MAP.write().await.remove(&session); - SESSION_CLIENT_INFO.write().await.remove(&session); - SESSION_CHANNELS.write().await.remove(&session); - warn!(session, "CLOSED SESSION"); + if let Err(err) = propagate.send(message) { + error!(%session, ?err, "An error occured when enqueing message to global message queue") } - _ => {} } - } + WsMessage::Close(_) => { + warn!(%session, "CLOSING SESSION"); + for _channel in SESSION_CHANNELS.read().await.get(&session).unwrap().iter() + { + let channel = *_channel; + tokio::spawn(async move { + match detach_channel(channel, session, true).await { + Ok(_) => {} + Err(err) => { + error!(%err, %session, channel, "Error occured while detaching from channel") + } + } + }); + } + + SESSION_MAP.write().await.remove(&session); + SESSION_CLIENT_INFO.write().await.remove(&session); + SESSION_CHANNELS.write().await.remove(&session); + warn!(%session, "CLOSED SESSION"); + } + _ => {} + }, Err(err) => { - error!( - session = session, - ?err, - "An error occured while reading messages" - ); + error!(%session, ?err, "An error occured while reading messages"); } }; } @@ -554,7 +547,7 @@ async fn accept_connection( Ok(_) => {} Err(err) => { error!( - session = i.session, + session = %i.session, ?err, "An error occured while sending a message" ) diff --git a/src/main.rs b/src/main.rs index 4fc4311..dc6aff9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use std::sync::LazyLock; use std::time::Instant; use std::{collections::HashMap, io::Error, sync::Arc}; +use homeval_services::Session; use tokio::sync::RwLock; use tracing::{debug, info}; @@ -37,7 +38,7 @@ static CHANNEL_MESSAGES: LazyLock< RwLock>>, > = LazyLock::new(|| RwLock::new(HashMap::new())); // Hashmap for channel id -> last session that sent it a message -static LAST_SESSION_USING_CHANNEL: LazyLock>> = +static LAST_SESSION_USING_CHANNEL: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); // static REPLSPACE_CALLBACKS: LazyLock< diff --git a/src/replspace_server.rs b/src/replspace_server.rs index 1cb6188..72d248f 100644 --- a/src/replspace_server.rs +++ b/src/replspace_server.rs @@ -7,6 +7,7 @@ use axum::{ routing::{get, post}, Json, Router, }; +use homeval_services::Session; use serde::{Deserialize, Serialize}; use textnonce::TextNonce; use tokio::sync::mpsc::channel; @@ -53,10 +54,10 @@ async fn get_gh_token(_query: Option>) -> (StatusCode, Jso debug!(channel = query.channel, "Got git askpass"); let last_session = crate::LAST_SESSION_USING_CHANNEL.read().await; - session = *last_session.get(&query.channel).unwrap_or(&0); + session = *last_session.get(&query.channel).unwrap_or(&Session::zero()); } else { debug!("Got git askpass without channel id"); - session = 0; + session = Session::zero(); } let nonce = TextNonce::new().into_string(); @@ -137,14 +138,14 @@ async fn open_file(Json(query): Json) -> (StatusCode, Json