Skip to content

Commit

Permalink
refactor(core): rename TransportProtocol -> TransportSession & Tranps…
Browse files Browse the repository at this point in the history
…ort -> TransportProtocol
  • Loading branch information
Ghamza-Jd committed May 7, 2024
1 parent 22a970c commit 9aaa8b4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 19 deletions.
17 changes: 10 additions & 7 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::jasession::WeakJaSession;
use crate::jatask;
use crate::prelude::*;
use crate::tmanager::TransactionManager;
use crate::transport::trans::Transport;
use crate::transport::trans::TransportProtocol;
use crate::transport::trans::TransportSession;
use jatask::AbortHandle;
use serde_json::json;
use serde_json::Value;
Expand All @@ -31,7 +31,7 @@ struct Shared {
#[derive(Debug)]
struct Exclusive {
router: JaRouter,
transport_protocol: TransportProtocol,
transport_session: TransportSession,
receiver: JaResponseStream,
sessions: HashMap<u64, WeakJaSession>,
transaction_manager: TransactionManager,
Expand All @@ -49,12 +49,15 @@ pub struct JaConnection {
}

impl JaConnection {
pub(crate) async fn open(config: JaConfig, transport: impl Transport) -> JaResult<Self> {
pub(crate) async fn open(
config: JaConfig,
transport: impl TransportProtocol,
) -> JaResult<Self> {
let (router, root_channel) = JaRouter::new(&config.namespace).await;
let transaction_manager = TransactionManager::new(32);

let (transport_protocol, receiver) =
TransportProtocol::connect(transport, &config.uri).await?;
let (transport_session, receiver) =
TransportSession::connect(transport, &config.uri).await?;

let demux_abort_handle = jatask::spawn({
let router = router.clone();
Expand All @@ -68,7 +71,7 @@ impl JaConnection {
};
let safe = Exclusive {
router,
transport_protocol,
transport_session,
receiver: root_channel,
sessions: HashMap::new(),
transaction_manager,
Expand Down Expand Up @@ -151,7 +154,7 @@ impl JaConnection {
.create_transaction(transaction, &path)
.await;
tracing::debug!("Sending {message}");
guard.transport_protocol.send(message.as_bytes()).await?;
guard.transport_session.send(message.as_bytes()).await?;
Ok(transaction.into())
}

Expand Down
4 changes: 2 additions & 2 deletions jarust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod demuxer;
mod jarouter;
mod tmanager;

use crate::transport::trans::Transport;
use crate::transport::trans::TransportProtocol;
use jaconfig::JaConfig;
use jaconfig::TransportType;
use jaconnection::JaConnection;
Expand Down Expand Up @@ -56,7 +56,7 @@ pub async fn connect(jaconfig: JaConfig, transport_type: TransportType) -> JaRes
#[tracing::instrument(level = Level::TRACE)]
pub async fn connect_with_transport(
jaconfig: JaConfig,
transport: impl Transport,
transport: impl TransportProtocol,
) -> JaResult<JaConnection> {
tracing::info!("Creating new connection");
JaConnection::open(jaconfig, transport).await
Expand Down
10 changes: 5 additions & 5 deletions jarust/src/transport/trans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::mpsc;
pub type MessageStream = mpsc::UnboundedReceiver<String>;

#[async_trait]
pub trait Transport: Debug + Send + Sync + 'static {
pub trait TransportProtocol: Debug + Send + Sync + 'static {
/// Creates a new transport
fn create_transport() -> Self
where
Expand All @@ -19,11 +19,11 @@ pub trait Transport: Debug + Send + Sync + 'static {
async fn send(&mut self, data: &[u8]) -> JaResult<()>;
}

pub struct TransportProtocol(Box<dyn Transport + Send + Sync>);
pub struct TransportSession(Box<dyn TransportProtocol + Send + Sync>);

impl TransportProtocol {
impl TransportSession {
pub async fn connect(
mut transport: impl Transport,
mut transport: impl TransportProtocol,
uri: &str,
) -> JaResult<(Self, MessageStream)> {
let rx = transport.connect(uri).await?;
Expand All @@ -36,7 +36,7 @@ impl TransportProtocol {
}
}

impl Debug for TransportProtocol {
impl Debug for TransportSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("TransportProtocol").finish()
}
Expand Down
4 changes: 2 additions & 2 deletions jarust/src/transport/web_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ compile_error!("Feature \"rustls\" and feature \"native-tls\" cannot be enabled
#[cfg(not(any(feature = "use-rustls", feature = "use-native-tls")))]
compile_error!("Either feature \"rustls\" or \"native-tls\" must be enabled for this crate");

use super::trans::Transport;
use super::trans::TransportProtocol;
use crate::jatask;
use crate::jatask::AbortHandle;
use crate::prelude::*;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct WebsocketTransport {
}

#[async_trait]
impl Transport for WebsocketTransport {
impl TransportProtocol for WebsocketTransport {
fn create_transport() -> Self {
Self {
sender: None,
Expand Down
2 changes: 1 addition & 1 deletion jarust/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jarust::japrotocol::JaData;
use jarust::japrotocol::JaResponse;
use jarust::japrotocol::JaSuccessProtocol;
use jarust::japrotocol::ResponseType;
use jarust::transport::trans::Transport;
use jarust::transport::trans::TransportProtocol;

#[tokio::test]
async fn test_connection() {
Expand Down
4 changes: 2 additions & 2 deletions jarust/tests/mocks/mock_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use async_trait::async_trait;
use jarust::jatask;
use jarust::jatask::AbortHandle;
use jarust::prelude::*;
use jarust::transport::trans::Transport;
use jarust::transport::trans::TransportProtocol;
use std::fmt::Debug;
use tokio::sync::mpsc;

Expand All @@ -29,7 +29,7 @@ impl MockTransport {
}

#[async_trait]
impl Transport for MockTransport {
impl TransportProtocol for MockTransport {
fn create_transport() -> Self
where
Self: Sized,
Expand Down

0 comments on commit 9aaa8b4

Please sign in to comment.