Skip to content

Commit

Permalink
refactor: made publish private and migrate to use pub_root and pub_su…
Browse files Browse the repository at this point in the history
…broute
  • Loading branch information
Ghamza-Jd committed Jan 31, 2024
1 parent 3df1c09 commit 549555b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 46 deletions.
6 changes: 3 additions & 3 deletions jarust/src/jaconfig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub const CHANNEL_BUFFER_SIZE: usize = 32;
pub struct JaConfig {
pub(crate) uri: String,
pub(crate) apisecret: Option<String>,
pub(crate) root_namespace: String,
pub(crate) namespace: String,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -13,11 +13,11 @@ pub enum TransportType {
}

impl JaConfig {
pub fn new(uri: &str, apisecret: Option<String>, root_namespace: &str) -> Self {
pub fn new(uri: &str, apisecret: Option<String>, namespace: &str) -> Self {
Self {
uri: uri.into(),
apisecret,
root_namespace: root_namespace.into(),
namespace: namespace.into(),
}
}
}
51 changes: 20 additions & 31 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::prelude::*;
use crate::tmanager::TransactionManager;
use crate::transport::trans::Transport;
use crate::transport::trans::TransportProtocol;
use crate::utils::get_subnamespace_from_request;
use crate::utils::get_subnamespace_from_response;
use crate::utils::get_route_path_from_request;
use crate::utils::get_route_path_from_response;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
Expand Down Expand Up @@ -65,57 +65,49 @@ impl JaConnection {
inbound_stream: mpsc::Receiver<String>,
router: JaRouter,
transaction_manager: TransactionManager,
root_namespace: &str,
) -> JaResult<()> {
let mut stream = inbound_stream;
while let Some(next) = stream.recv().await {
let message = serde_json::from_str::<JaResponse>(&next)?;

// Check if we have a pending transaction and demux to the proper namespace
// Check if we have a pending transaction and demux to the proper route
if let Some(pending) = message
.transaction
.clone()
.and_then(|x| transaction_manager.get(&x))
{
router.publish(&pending.namespace, message).await?;
// router.
if pending.path == router.root_path() {
router.pub_root(message).await?;
} else {
router.pub_subroute(&pending.path, message).await?;
}
transaction_manager.success_close(&pending.id);
continue;
}

// Try get the namespace from the response
if let Some(namespace) = get_subnamespace_from_response(message.clone()) {
let namespace = format!("{root_namespace}/{namespace}");
router.publish(&namespace, message).await?;
// Try get the route from the response
if let Some(path) = get_route_path_from_response(message.clone()) {
router.pub_subroute(&path, message).await?;
continue;
}

// Fallback to publishing on the root namespace
// Fallback to publishing on the root route
router.pub_root(message).await?;
}
Ok(())
}

pub(crate) async fn open(config: JaConfig, transport: impl Transport) -> JaResult<Self> {
let (router, root_channel) = JaRouter::new(&config.root_namespace);
let (router, root_channel) = JaRouter::new(&config.namespace);
let transaction_manager = TransactionManager::new();

let root_namespace = config.root_namespace.clone();
let (transport_protocol, receiver) =
TransportProtocol::connect(transport, &config.uri).await?;

let demux_join_handle = tokio::spawn({
let router = router.clone();
let transaction_manager = transaction_manager.clone();
async move {
JaConnection::demux_task(
receiver,
router,
transaction_manager,
&root_namespace.clone(),
)
.await
}
async move { JaConnection::demux_task(receiver, router, transaction_manager).await }
});

let shared = Shared {
Expand Down Expand Up @@ -169,7 +161,7 @@ impl JaConnection {
}
};

let channel = self.create_subnamespace(&format!("{session_id}")).await;
let channel = self.add_subroute(&format!("{session_id}")).await;

let session = JaSession::new(self.clone(), channel, session_id, ka_interval).await;
self.exclusive
Expand Down Expand Up @@ -213,16 +205,13 @@ impl JaConnection {
return Err(err);
};

let root_namespace = self.shared.config.root_namespace.clone();
let namespace = match get_subnamespace_from_request(&request) {
Some(namespace) => format!("{root_namespace}/{namespace}"),
None => root_namespace,
};
let path =
get_route_path_from_request(&request).unwrap_or(self.shared.config.namespace.clone());

let mut guard = self.exclusive.lock().await;
guard
.transaction_manager
.create_transaction(transaction, janus_request, &namespace);
.create_transaction(transaction, janus_request, &path);
guard.transport_protocol.send(message.as_bytes()).await
}

Expand All @@ -233,8 +222,8 @@ impl JaConnection {
request
}

pub(crate) async fn create_subnamespace(&self, namespace: &str) -> mpsc::Receiver<JaResponse> {
self.exclusive.lock().await.router.add_subroute(&namespace)
pub(crate) async fn add_subroute(&self, end: &str) -> mpsc::Receiver<JaResponse> {
self.exclusive.lock().await.router.add_subroute(&end)
}
}

Expand Down
8 changes: 7 additions & 1 deletion jarust/src/jarouter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl JaRouter {
self.make_route(&abs_path)
}

pub async fn publish(&self, path: &str, message: JaResponse) -> JaResult<()> {
async fn publish(&self, path: &str, message: JaResponse) -> JaResult<()> {
let channel = {
let guard = self.exclusive.read().expect("Failed to acquire read lock");
guard.routes.get(path).cloned()
Expand All @@ -105,6 +105,12 @@ impl JaRouter {
}
}

impl JaRouter {
pub fn root_path(&self) -> String {
self.shared.root_path.clone()
}
}

#[cfg(test)]
mod tests {
use super::JaRouter;
Expand Down
2 changes: 1 addition & 1 deletion jarust/src/jasession.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Attach for JaSession {
let connection = self.shared.connection.clone();

let receiver = connection
.create_subnamespace(&format!("{}/{}", self.shared.id, handle_id))
.add_subroute(&format!("{}/{}", self.shared.id, handle_id))
.await;

let (handle, event_receiver) = JaHandle::new(self.clone(), receiver, handle_id);
Expand Down
14 changes: 6 additions & 8 deletions jarust/src/tmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::RwLock;
pub(crate) struct PendingTransaction {
pub id: String,
request: String,
pub namespace: String,
pub path: String,
}

#[derive(Debug)]
Expand Down Expand Up @@ -78,31 +78,29 @@ impl TransactionManager {
.remove(id);
}

pub(crate) fn create_transaction(&self, id: &str, request: &str, namespace: &str) {
pub(crate) fn create_transaction(&self, id: &str, request: &str, path: &str) {
if self.contains(id) {
return;
}

let pending_transaction = PendingTransaction {
id: id.into(),
request: request.into(),
namespace: namespace.into(),
path: path.into(),
};

self.insert(id, pending_transaction);
log::trace!(
"Transaction created {{ id: {id}, namespace: {namespace}, request: {request} }}"
);
log::trace!("Transaction created {{ id: {id}, path: {path}, request: {request} }}");
}

pub(crate) fn success_close(&self, id: &str) {
let tx = self.get(id);
if let Some(tx) = tx {
self.remove(&tx.id);
log::trace!(
"Transaction closed successfully {{ id: {}, namespace: {}, request: {} }}",
"Transaction closed successfully {{ id: {}, path: {}, request: {} }}",
tx.id,
tx.namespace,
tx.path,
tx.request
);
}
Expand Down
4 changes: 2 additions & 2 deletions jarust/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::japrotocol::JaResponse;
use serde_json::Value;

pub fn get_subnamespace_from_request(request: &Value) -> Option<String> {
pub fn get_route_path_from_request(request: &Value) -> Option<String> {
if let (Some(session_id), Some(handle_id)) = (
request["session_id"].as_u64(),
request["handle_id"].as_u64(),
Expand All @@ -14,7 +14,7 @@ pub fn get_subnamespace_from_request(request: &Value) -> Option<String> {
}
}

pub fn get_subnamespace_from_response(response: JaResponse) -> Option<String> {
pub fn get_route_path_from_response(response: JaResponse) -> Option<String> {
let Some(session_id) = response.session_id else {
return None;
};
Expand Down

0 comments on commit 549555b

Please sign in to comment.