Skip to content

Commit

Permalink
feat: added static channel sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghamza-Jd committed Dec 30, 2023
1 parent 1f955b2 commit 670a001
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
10 changes: 6 additions & 4 deletions jarust/src/jaconfig.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Debug)]
pub struct JaConfig {
pub uri: String,
pub apisecret: Option<String>,
pub transport_type: TransportType,
pub root_namespace: String,
pub(crate) uri: String,
pub(crate) apisecret: Option<String>,
pub(crate) transport_type: TransportType,
pub(crate) root_namespace: String,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
5 changes: 3 additions & 2 deletions jarust/src/jahandle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::japrotocol::JaHandleRequestProtocol;
use crate::japrotocol::JaResponse;
use crate::japrotocol::JaResponseProtocol;
Expand Down Expand Up @@ -52,8 +53,8 @@ impl JaHandle {
mut receiver: mpsc::Receiver<JaResponse>,
id: u64,
) -> (Self, mpsc::Receiver<JaResponse>) {
let (ack_sender, ack_receiver) = mpsc::channel(100);
let (event_sender, event_receiver) = mpsc::channel(100);
let (ack_sender, ack_receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);

let join_handle = tokio::spawn(async move {
while let Some(item) = receiver.recv().await {
Expand Down
3 changes: 2 additions & 1 deletion jarust/src/nsp_registry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::japrotocol::JaResponse;
use crate::prelude::*;
use std::collections::HashMap;
Expand Down Expand Up @@ -34,7 +35,7 @@ impl NamespaceRegistry {
}

pub(crate) fn create_namespace(&mut self, namespace: &str) -> mpsc::Receiver<JaResponse> {
let (tx, rx) = mpsc::channel(10);
let (tx, rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
{
self.write()
.unwrap()
Expand Down
3 changes: 2 additions & 1 deletion jarust/src/plugins/echotest/handle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::events::EchoTestPluginData;
use super::messages::EchoTestStartMsg;
use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::jahandle::JaHandle;
use crate::japrotocol::JaEventProtocol;
use crate::japrotocol::JaResponseProtocol;
Expand Down Expand Up @@ -33,7 +34,7 @@ impl EchoTest for JaSession {
&self,
) -> JaResult<(EchoTestHandle, mpsc::Receiver<EchoTestPluginData>)> {
let (handle, mut receiver) = self.attach(PLUGIN_ID).await?;
let (tx, rx) = mpsc::channel(100);
let (tx, rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
let msg = match msg.janus {
Expand Down
3 changes: 2 additions & 1 deletion jarust/src/transport/wss.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::trans::Transport;
use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::prelude::*;
use async_trait::async_trait;
use futures_util::stream::SplitSink;
Expand Down Expand Up @@ -35,7 +36,7 @@ impl Transport for WebsocketTransport {
headers.insert("Sec-Websocket-Protocol", "janus-protocol".parse()?);
let (stream, _) = connect_async(request).await?;
let (sender, mut receiver) = stream.split();
let (tx, rx) = mpsc::channel(32);
let (tx, rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);

let forward_join_handle = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
Expand Down

0 comments on commit 670a001

Please sign in to comment.