Skip to content

Commit

Permalink
refactor: use unbounded channels
Browse files Browse the repository at this point in the history
  • Loading branch information
aGFteg committed Apr 25, 2024
1 parent d207911 commit 90d48c2
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 27 deletions.
2 changes: 0 additions & 2 deletions jarust/src/jaconfig.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub const BUFFER_SIZE: usize = 32;

#[derive(Debug)]
pub struct JaConfig {
pub(crate) uri: String,
Expand Down
4 changes: 2 additions & 2 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;

pub type JaResponseStream = mpsc::Receiver<JaResponse>;
pub type JaResponseStream = mpsc::UnboundedReceiver<JaResponse>;

#[derive(Debug)]
struct Shared {
Expand Down Expand Up @@ -51,7 +51,7 @@ pub struct JaConnection {
impl JaConnection {
pub(crate) async fn open(config: JaConfig, transport: impl Transport) -> JaResult<Self> {
let (router, root_channel) = JaRouter::new(&config.namespace).await;
let transaction_manager = TransactionManager::new(BUFFER_SIZE);
let transaction_manager = TransactionManager::new(32);

let (transport_protocol, receiver) =
TransportProtocol::connect(transport, &config.uri).await?;
Expand Down
8 changes: 3 additions & 5 deletions jarust/src/jahandle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::jaconfig::BUFFER_SIZE;
use crate::japrotocol::EstablishmentProtocol;
use crate::japrotocol::JaHandleRequestProtocol;
use crate::japrotocol::JaResponse;
Expand Down Expand Up @@ -44,7 +43,7 @@ impl JaHandle {
inbound_stream: JaResponseStream,
ack_map: Arc<UnboundedNapMap<String, JaResponse>>,
result_map: Arc<UnboundedNapMap<String, JaResponse>>,
event_sender: mpsc::Sender<JaResponse>,
event_sender: mpsc::UnboundedSender<JaResponse>,
) {
let mut stream = inbound_stream;
while let Some(item) = stream.recv().await {
Expand All @@ -55,7 +54,7 @@ impl JaHandle {
}
}
JaResponseProtocol::Event { .. } => {
event_sender.send(item).await.expect("Event channel closed");
event_sender.send(item).expect("Event channel closed");
}
JaResponseProtocol::Success(JaSuccessProtocol::Plugin { .. }) => {
if let Some(transaction) = item.transaction.clone() {
Expand All @@ -65,7 +64,6 @@ impl JaHandle {
JaResponseProtocol::Error { .. } => {
event_sender
.send(item)
.await
.expect("Result channel closed");
}
_ => {}
Expand All @@ -78,7 +76,7 @@ impl JaHandle {
receiver: JaResponseStream,
id: u64,
) -> (Self, JaResponseStream) {
let (event_sender, event_receiver) = mpsc::channel(BUFFER_SIZE);
let (event_sender, event_receiver) = mpsc::unbounded_channel();

let ack_map = Arc::new(napmap::unbounded::<String, JaResponse>());
let result_map = Arc::new(napmap::unbounded::<String, JaResponse>());
Expand Down
7 changes: 3 additions & 4 deletions jarust/src/jarouter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::jaconfig::BUFFER_SIZE;
use crate::japrotocol::JaResponse;
use crate::prelude::*;
use serde_json::Value;
Expand All @@ -14,7 +13,7 @@ pub struct Shared {

#[derive(Debug)]
pub struct Exclusive {
routes: HashMap<String, mpsc::Sender<JaResponse>>,
routes: HashMap<String, mpsc::UnboundedSender<JaResponse>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -49,7 +48,7 @@ impl JaRouter {

#[tracing::instrument(level = tracing::Level::TRACE, skip(self))]
async fn make_route(&mut self, path: &str) -> JaResponseStream {
let (tx, rx) = mpsc::channel(BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();
{
self.inner
.exclusive
Expand Down Expand Up @@ -79,7 +78,7 @@ impl JaRouter {
guard.routes.get(path).cloned()
};
if let Some(channel) = channel {
if channel.send(message.clone()).await.is_err() {
if channel.send(message.clone()).is_err() {
return Err(JaError::SendError);
}
}
Expand Down
1 change: 0 additions & 1 deletion jarust/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub use crate::error::JaError;
pub use crate::jaconfig::BUFFER_SIZE;
pub use crate::jaconnection::JaResponseStream;
pub use crate::jahandle::JaHandle;
pub use crate::japlugin::Attach;
Expand Down
2 changes: 1 addition & 1 deletion jarust/src/transport/trans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use std::fmt::Debug;
use tokio::sync::mpsc;

pub type MessageStream = mpsc::Receiver<String>;
pub type MessageStream = mpsc::UnboundedReceiver<String>;

#[async_trait]
pub trait Transport: Debug + Send + Sync + 'static {
Expand Down
7 changes: 3 additions & 4 deletions jarust/src/transport/web_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ compile_error!("Feature \"rustls\" and feature \"native-tls\" cannot be enabled
compile_error!("Either feature \"rustls\" or \"native-tls\" must be enabled for this crate");

use super::trans::Transport;
use crate::jaconfig::BUFFER_SIZE;
use crate::jatask;
use crate::jatask::AbortHandle;
use crate::prelude::*;
Expand Down Expand Up @@ -56,12 +55,12 @@ impl Transport for WebsocketTransport {
let stream = Self::connect_async(request).await?;

let (sender, mut receiver) = stream.split();
let (tx, rx) = mpsc::channel(BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();

let abort_handle = jatask::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
if let Message::Text(text) = message {
let _ = tx.send(text).await;
let _ = tx.send(text);
}
}
});
Expand Down Expand Up @@ -109,7 +108,7 @@ impl WebsocketTransport {
#[cfg(feature = "use-native-tls")]
{
let (stream, ..) = connect_async_with_config(request, None, false).await?;
return Ok(stream);
Ok(stream)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions jarust/tests/mocks/mock_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::fmt::Debug;
use tokio::sync::mpsc;

pub struct MockServer {
tx: mpsc::Sender<String>,
tx: mpsc::UnboundedSender<String>,
}

impl MockServer {
pub async fn mock_send_to_client(&self, msg: &str) {
self.tx.send(msg.to_string()).await.unwrap();
self.tx.send(msg.to_string()).unwrap();
}
}

Expand All @@ -34,7 +34,7 @@ impl Transport for MockTransport {
where
Self: Sized,
{
let (tx, rx) = mpsc::channel(32);
let (tx, rx) = mpsc::unbounded_channel();
Self {
rx: Some(rx),
server: Some(MockServer { tx }),
Expand All @@ -43,12 +43,12 @@ impl Transport for MockTransport {
}

async fn connect(&mut self, _: &str) -> JaResult<MessageStream> {
let (tx, rx) = mpsc::channel(32);
let (tx, rx) = mpsc::unbounded_channel();

if let Some(mut receiver) = self.rx.take() {
let abort_handle = jatask::spawn(async move {
while let Some(msg) = receiver.recv().await {
tx.send(msg).await.unwrap();
tx.send(msg).unwrap();
}
});
self.abort_handle = Some(abort_handle);
Expand Down
6 changes: 3 additions & 3 deletions jarust_make_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ pub fn make_plugin(input: TokenStream) -> TokenStream {

async fn #attach_fn_name(
&self,
) -> JaResult<(Self::Handle, tokio::sync::mpsc::Receiver<Self::Event>)> {
) -> JaResult<(Self::Handle, tokio::sync::mpsc::UnboundedReceiver<Self::Event>)> {
let (handle, mut receiver) = self.attach(#id).await?;
let (tx, rx) = tokio::sync::mpsc::channel(BUFFER_SIZE);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let abort_handle = jatask::spawn(async move {
while let Some(msg) = receiver.recv().await {
let msg = Self::#parse_fn_name(msg)?;
let _ = tx.send(msg).await;
let _ = tx.send(msg);
}
Ok::<(), JaError>(())
});
Expand Down

0 comments on commit 90d48c2

Please sign in to comment.