diff --git a/jarust/src/jaconfig.rs b/jarust/src/jaconfig.rs index a706cf1..4bedd29 100644 --- a/jarust/src/jaconfig.rs +++ b/jarust/src/jaconfig.rs @@ -1,5 +1,3 @@ -pub const BUFFER_SIZE: usize = 32; - #[derive(Debug)] pub struct JaConfig { pub(crate) uri: String, diff --git a/jarust/src/jaconnection.rs b/jarust/src/jaconnection.rs index aa3f463..e115123 100644 --- a/jarust/src/jaconnection.rs +++ b/jarust/src/jaconnection.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::Mutex; -pub type JaResponseStream = mpsc::Receiver; +pub type JaResponseStream = mpsc::UnboundedReceiver; #[derive(Debug)] struct Shared { @@ -51,7 +51,7 @@ pub struct JaConnection { impl JaConnection { pub(crate) async fn open(config: JaConfig, transport: impl Transport) -> JaResult { let (router, root_channel) = JaRouter::new(&config.namespace).await; - let transaction_manager = TransactionManager::new(BUFFER_SIZE); + let transaction_manager = TransactionManager::new(32); let (transport_protocol, receiver) = TransportProtocol::connect(transport, &config.uri).await?; diff --git a/jarust/src/jahandle.rs b/jarust/src/jahandle.rs index 765f95e..5dc56ec 100644 --- a/jarust/src/jahandle.rs +++ b/jarust/src/jahandle.rs @@ -1,4 +1,3 @@ -use crate::jaconfig::BUFFER_SIZE; use crate::japrotocol::EstablishmentProtocol; use crate::japrotocol::JaHandleRequestProtocol; use crate::japrotocol::JaResponse; @@ -44,7 +43,7 @@ impl JaHandle { inbound_stream: JaResponseStream, ack_map: Arc>, result_map: Arc>, - event_sender: mpsc::Sender, + event_sender: mpsc::UnboundedSender, ) { let mut stream = inbound_stream; while let Some(item) = stream.recv().await { @@ -55,7 +54,7 @@ impl JaHandle { } } JaResponseProtocol::Event { .. } => { - event_sender.send(item).await.expect("Event channel closed"); + event_sender.send(item).expect("Event channel closed"); } JaResponseProtocol::Success(JaSuccessProtocol::Plugin { .. }) => { if let Some(transaction) = item.transaction.clone() { @@ -65,7 +64,6 @@ impl JaHandle { JaResponseProtocol::Error { .. } => { event_sender .send(item) - .await .expect("Result channel closed"); } _ => {} @@ -78,7 +76,7 @@ impl JaHandle { receiver: JaResponseStream, id: u64, ) -> (Self, JaResponseStream) { - let (event_sender, event_receiver) = mpsc::channel(BUFFER_SIZE); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); let ack_map = Arc::new(napmap::unbounded::()); let result_map = Arc::new(napmap::unbounded::()); diff --git a/jarust/src/jarouter.rs b/jarust/src/jarouter.rs index c18eed3..a09c9d6 100644 --- a/jarust/src/jarouter.rs +++ b/jarust/src/jarouter.rs @@ -1,4 +1,3 @@ -use crate::jaconfig::BUFFER_SIZE; use crate::japrotocol::JaResponse; use crate::prelude::*; use serde_json::Value; @@ -14,7 +13,7 @@ pub struct Shared { #[derive(Debug)] pub struct Exclusive { - routes: HashMap>, + routes: HashMap>, } #[derive(Debug)] @@ -49,7 +48,7 @@ impl JaRouter { #[tracing::instrument(level = tracing::Level::TRACE, skip(self))] async fn make_route(&mut self, path: &str) -> JaResponseStream { - let (tx, rx) = mpsc::channel(BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); { self.inner .exclusive @@ -79,7 +78,7 @@ impl JaRouter { guard.routes.get(path).cloned() }; if let Some(channel) = channel { - if channel.send(message.clone()).await.is_err() { + if channel.send(message.clone()).is_err() { return Err(JaError::SendError); } } diff --git a/jarust/src/prelude.rs b/jarust/src/prelude.rs index 1940484..66e624f 100644 --- a/jarust/src/prelude.rs +++ b/jarust/src/prelude.rs @@ -1,5 +1,4 @@ pub use crate::error::JaError; -pub use crate::jaconfig::BUFFER_SIZE; pub use crate::jaconnection::JaResponseStream; pub use crate::jahandle::JaHandle; pub use crate::japlugin::Attach; diff --git a/jarust/src/transport/trans.rs b/jarust/src/transport/trans.rs index 5a60aa3..ce766a4 100644 --- a/jarust/src/transport/trans.rs +++ b/jarust/src/transport/trans.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use std::fmt::Debug; use tokio::sync::mpsc; -pub type MessageStream = mpsc::Receiver; +pub type MessageStream = mpsc::UnboundedReceiver; #[async_trait] pub trait Transport: Debug + Send + Sync + 'static { diff --git a/jarust/src/transport/web_socket.rs b/jarust/src/transport/web_socket.rs index f9a247d..d5a509d 100644 --- a/jarust/src/transport/web_socket.rs +++ b/jarust/src/transport/web_socket.rs @@ -5,7 +5,6 @@ compile_error!("Feature \"rustls\" and feature \"native-tls\" cannot be enabled compile_error!("Either feature \"rustls\" or \"native-tls\" must be enabled for this crate"); use super::trans::Transport; -use crate::jaconfig::BUFFER_SIZE; use crate::jatask; use crate::jatask::AbortHandle; use crate::prelude::*; @@ -56,12 +55,12 @@ impl Transport for WebsocketTransport { let stream = Self::connect_async(request).await?; let (sender, mut receiver) = stream.split(); - let (tx, rx) = mpsc::channel(BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let abort_handle = jatask::spawn(async move { while let Some(Ok(message)) = receiver.next().await { if let Message::Text(text) = message { - let _ = tx.send(text).await; + let _ = tx.send(text); } } }); @@ -109,7 +108,7 @@ impl WebsocketTransport { #[cfg(feature = "use-native-tls")] { let (stream, ..) = connect_async_with_config(request, None, false).await?; - return Ok(stream); + Ok(stream) } } } diff --git a/jarust/tests/mocks/mock_transport.rs b/jarust/tests/mocks/mock_transport.rs index 6ee8044..2a92d86 100644 --- a/jarust/tests/mocks/mock_transport.rs +++ b/jarust/tests/mocks/mock_transport.rs @@ -7,12 +7,12 @@ use std::fmt::Debug; use tokio::sync::mpsc; pub struct MockServer { - tx: mpsc::Sender, + tx: mpsc::UnboundedSender, } impl MockServer { pub async fn mock_send_to_client(&self, msg: &str) { - self.tx.send(msg.to_string()).await.unwrap(); + self.tx.send(msg.to_string()).unwrap(); } } @@ -34,7 +34,7 @@ impl Transport for MockTransport { where Self: Sized, { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::unbounded_channel(); Self { rx: Some(rx), server: Some(MockServer { tx }), @@ -43,12 +43,12 @@ impl Transport for MockTransport { } async fn connect(&mut self, _: &str) -> JaResult { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::unbounded_channel(); if let Some(mut receiver) = self.rx.take() { let abort_handle = jatask::spawn(async move { while let Some(msg) = receiver.recv().await { - tx.send(msg).await.unwrap(); + tx.send(msg).unwrap(); } }); self.abort_handle = Some(abort_handle); diff --git a/jarust_make_plugin/src/lib.rs b/jarust_make_plugin/src/lib.rs index b785898..7fc308a 100644 --- a/jarust_make_plugin/src/lib.rs +++ b/jarust_make_plugin/src/lib.rs @@ -47,13 +47,13 @@ pub fn make_plugin(input: TokenStream) -> TokenStream { async fn #attach_fn_name( &self, - ) -> JaResult<(Self::Handle, tokio::sync::mpsc::Receiver)> { + ) -> JaResult<(Self::Handle, tokio::sync::mpsc::UnboundedReceiver)> { let (handle, mut receiver) = self.attach(#id).await?; - let (tx, rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let abort_handle = jatask::spawn(async move { while let Some(msg) = receiver.recv().await { let msg = Self::#parse_fn_name(msg)?; - let _ = tx.send(msg).await; + let _ = tx.send(msg); } Ok::<(), JaError>(()) });