Skip to content

Commit

Permalink
chore: renamed SafeShared -> Exclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghamza-Jd committed Jan 13, 2024
1 parent 38215fb commit 710ef79
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
18 changes: 9 additions & 9 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Shared {
config: JaConfig,
}

struct SafeShared {
struct Exclusive {
nsp_registry: NamespaceRegistry,
transport_protocol: TransportProtocol,
receiver: mpsc::Receiver<JaResponse>,
Expand All @@ -37,7 +37,7 @@ struct SafeShared {

pub struct InnerConnection {
shared: Shared,
safe: Mutex<SafeShared>,
exclusive: Mutex<Exclusive>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -120,7 +120,7 @@ impl JaConnection {
demux_abort_handle: demux_join_handle.abort_handle(),
config,
};
let safe = SafeShared {
let safe = Exclusive {
nsp_registry,
transport_protocol,
receiver: namespace_receiver,
Expand All @@ -129,7 +129,7 @@ impl JaConnection {
};
let connection = Arc::new(InnerConnection {
shared,
safe: Mutex::new(safe),
exclusive: Mutex::new(safe),
});
Ok(Self(connection))
}
Expand All @@ -143,7 +143,7 @@ impl JaConnection {
});

self.send_request(request).await?;
let response = { self.safe.lock().await.receiver.recv().await.unwrap() };
let response = { self.exclusive.lock().await.receiver.recv().await.unwrap() };
let session_id = match response.janus {
JaResponseProtocol::Success { data } => data.id,
JaResponseProtocol::Error { error } => {
Expand All @@ -163,7 +163,7 @@ impl JaConnection {
let channel = self.create_subnamespace(&format!("{session_id}")).await;

let session = JaSession::new(self.clone(), channel, session_id, ka_interval).await;
self.safe
self.exclusive
.lock()
.await
.sessions
Expand All @@ -180,7 +180,7 @@ impl JaConnection {
});

self.send_request(request).await?;
let response = { self.safe.lock().await.receiver.recv().await.unwrap() };
let response = { self.exclusive.lock().await.receiver.recv().await.unwrap() };
Ok(response)
}

Expand All @@ -201,7 +201,7 @@ impl JaConnection {
None => root_namespace,
};

let mut guard = self.safe.lock().await;
let mut guard = self.exclusive.lock().await;
guard
.transaction_manager
.create_transaction(transaction, janus_request, &namespace);
Expand All @@ -216,7 +216,7 @@ impl JaConnection {
}

pub(crate) async fn create_subnamespace(&self, namespace: &str) -> mpsc::Receiver<JaResponse> {
self.safe
self.exclusive
.lock()
.await
.nsp_registry
Expand Down
10 changes: 5 additions & 5 deletions jarust/src/jahandle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ struct Shared {
abort_handle: AbortHandle,
}

struct SafeShared {
struct Exclusive {
ack_receiver: mpsc::Receiver<JaResponse>,
}

pub struct InnerHandle {
shared: Shared,
safe: Mutex<SafeShared>,
exclusive: Mutex<Exclusive>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -76,12 +76,12 @@ impl JaHandle {
session,
abort_handle: join_handle.abort_handle(),
};
let safe = SafeShared { ack_receiver };
let safe = Exclusive { ack_receiver };

(
Self(Arc::new(InnerHandle {
shared,
safe: Mutex::new(safe),
exclusive: Mutex::new(safe),
})),
event_receiver,
)
Expand Down Expand Up @@ -109,7 +109,7 @@ impl JaHandle {
});
self.send_request(request).await?;
let response = {
let mut guard = self.safe.lock().await;
let mut guard = self.exclusive.lock().await;
guard.ack_receiver.recv().await.unwrap()
};
Ok(response)
Expand Down
18 changes: 9 additions & 9 deletions jarust/src/jasession.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ pub struct Shared {
connection: JaConnection,
}

pub struct SafeShared {
pub struct Exclusive {
receiver: mpsc::Receiver<JaResponse>,
handles: HashMap<u64, WeakJaHandle>,
abort_handle: Option<AbortHandle>,
}

pub struct InnerSession {
shared: Shared,
safe: Mutex<SafeShared>,
exclusive: Mutex<Exclusive>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -61,15 +61,15 @@ impl JaSession {
ka_interval: u32,
) -> Self {
let shared = Shared { id, connection };
let safe = SafeShared {
let safe = Exclusive {
receiver,
handles: HashMap::new(),
abort_handle: None,
};

let session = Self(Arc::new(InnerSession {
shared,
safe: Mutex::new(safe),
exclusive: Mutex::new(safe),
}));

let this = session.clone();
Expand All @@ -78,7 +78,7 @@ impl JaSession {
let _ = this.keep_alive(ka_interval).await;
});

session.safe.lock().await.abort_handle = Some(join_handle.abort_handle());
session.exclusive.lock().await.abort_handle = Some(join_handle.abort_handle());

session
}
Expand All @@ -99,7 +99,7 @@ impl JaSession {
"janus": JaSessionRequestProtocol::KeepAlive,
}))
.await?;
self.safe.lock().await.receiver.recv().await.unwrap();
self.exclusive.lock().await.receiver.recv().await.unwrap();
log::trace!("keep-alive OK {{ id: {id} }}");
}
}
Expand All @@ -109,7 +109,7 @@ impl JaSession {
}
}

impl Drop for SafeShared {
impl Drop for Exclusive {
fn drop(&mut self) {
if let Some(join_handle) = self.abort_handle.take() {
log::trace!("Keepalive task aborted");
Expand All @@ -130,7 +130,7 @@ impl Attach for JaSession {

self.send_request(request).await?;
let response = {
let mut guard = self.safe.lock().await;
let mut guard = self.exclusive.lock().await;
guard.receiver.recv().await.unwrap()
};

Expand Down Expand Up @@ -158,7 +158,7 @@ impl Attach for JaSession {

let (handle, event_receiver) = JaHandle::new(self.clone(), receiver, handle_id);

self.safe
self.exclusive
.lock()
.await
.handles
Expand Down

0 comments on commit 710ef79

Please sign in to comment.