Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/make plugin #19

Merged
merged 3 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["jarust", "client"]
members = ["jarust", "jarust_make_plugin", "jarust_plugins", "client"]
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
jarust = { version = "*", path = "../jarust" }
jarust_plugins = { version = "*", path = "../jarust_plugins" }
log = "0.4.20"
tokio = { version = "1.34.0", features = ["time", "macros", "rt-multi-thread"] }
simple_logger = "4.3.0"
Expand Down
28 changes: 13 additions & 15 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use jarust::jaconfig::JaConfig;
use jarust::jaconfig::TransportType;
use jarust::plugins::echotest::events::EchoTestPluginEvent;
use jarust::plugins::echotest::handle::EchoTest;
use jarust::plugins::echotest::messages::EchoTestStartMsg;
use jarust_plugins::echotest::events::EchoTestPluginEvent;
use jarust_plugins::echotest::messages::EchoTestStartMsg;
use jarust_plugins::echotest::EchoTest;
use log::LevelFilter;
use log::SetLoggerError;
use simple_logger::SimpleLogger;
Expand All @@ -12,17 +12,15 @@ async fn main() -> anyhow::Result<()> {
init_logger()?;

// To make sure handle is working even after dropping the session and the connection
let (handle, mut event_receiver) = {
let mut connection = jarust::connect(JaConfig::new(
"wss://janus.conf.meetecho.com/ws",
None,
TransportType::Wss,
"janus",
))
.await?;
let session = connection.create(10).await?;
session.attach_echotest().await?
};
let mut connection = jarust::connect(JaConfig::new(
"wss://janus.conf.meetecho.com/ws",
None,
TransportType::Wss,
"janus",
))
.await?;
let session = connection.create(10).await?;
let (handle, mut event_receiver) = session.attach_echo_test().await?;

handle
.start(EchoTestStartMsg {
Expand All @@ -32,7 +30,7 @@ async fn main() -> anyhow::Result<()> {
.await?;

while let Some(event) = event_receiver.recv().await {
match event.event {
match event {
EchoTestPluginEvent::Result { result, .. } => {
log::info!("result: {result}");
}
Expand Down
7 changes: 3 additions & 4 deletions jarust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = "jarust"
version = "0.1.0"
edition = "2021"

[lib]
doctest = false

[dependencies]
async-trait = "0.1.75"
futures-util = "0.3.29"
Expand All @@ -16,7 +19,3 @@ serde_json = "1.0.108"

[dev-dependencies]
tokio = { version = "1.35.1", features = ["macros"] }

[features]
default = ["echotest"]
echotest = []
2 changes: 1 addition & 1 deletion jarust/src/jaconfig.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;
pub const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Debug)]
pub struct JaConfig {
Expand Down
14 changes: 8 additions & 6 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use crate::utils::get_subnamespace_from_response;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::task::AbortHandle;

struct Shared {
demux_join_handle: JoinHandle<JaResult<()>>,
demux_abort_handle: AbortHandle,
config: JaConfig,
}

Expand All @@ -41,15 +43,15 @@ pub struct InnerConnection {
#[derive(Clone)]
pub struct JaConnection(Arc<InnerConnection>);

impl std::ops::Deref for JaConnection {
impl Deref for JaConnection {
type Target = Arc<InnerConnection>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for JaConnection {
impl DerefMut for JaConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
Expand Down Expand Up @@ -115,7 +117,7 @@ impl JaConnection {
});

let shared = Shared {
demux_join_handle,
demux_abort_handle: demux_join_handle.abort_handle(),
config,
};
let safe = SafeShared {
Expand Down Expand Up @@ -227,6 +229,6 @@ impl JaConnection {
impl Drop for InnerConnection {
fn drop(&mut self) {
log::trace!("Connection dropped");
self.shared.demux_join_handle.abort();
self.shared.demux_abort_handle.abort();
}
}
11 changes: 6 additions & 5 deletions jarust/src/jahandle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ use crate::jasession::JaSession;
use crate::prelude::*;
use serde_json::json;
use serde_json::Value;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::task::AbortHandle;

struct Shared {
id: u64,
session: JaSession,
join_handle: JoinHandle<()>,
abort_handle: AbortHandle,
}

struct SafeShared {
Expand All @@ -39,7 +40,7 @@ impl WeakJaHandle {
}
}

impl std::ops::Deref for JaHandle {
impl Deref for JaHandle {
type Target = Arc<InnerHandle>;

fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -73,7 +74,7 @@ impl JaHandle {
let shared = Shared {
id,
session,
join_handle,
abort_handle: join_handle.abort_handle(),
};
let safe = SafeShared { ack_receiver };

Expand Down Expand Up @@ -132,6 +133,6 @@ impl JaHandle {
impl Drop for InnerHandle {
fn drop(&mut self) {
log::trace!("Dropping handle {{ id: {} }}", self.shared.id);
self.shared.join_handle.abort();
self.shared.abort_handle.abort();
}
}
16 changes: 16 additions & 0 deletions jarust/src/japlugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::japrotocol::JaResponse;
use crate::prelude::JaHandle;
use crate::prelude::JaResult;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio::task::AbortHandle;

pub trait PluginTask {
fn assign_abort(&mut self, abort_handle: AbortHandle);
fn abort_plugin(&mut self);
}

#[async_trait]
pub trait Attach {
async fn attach(&self, plugin_id: &str) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)>;
}
88 changes: 45 additions & 43 deletions jarust/src/jasession.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use crate::japrotocol::JaResponse;
use crate::japrotocol::JaResponseProtocol;
use crate::japrotocol::JaSessionRequestProtocol;
use crate::prelude::*;
use async_trait::async_trait;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::task::AbortHandle;
use tokio::time;

pub struct Shared {
Expand All @@ -24,7 +26,7 @@ pub struct Shared {
pub struct SafeShared {
receiver: mpsc::Receiver<JaResponse>,
handles: HashMap<u64, WeakJaHandle>,
join_handle: Option<JoinHandle<()>>,
abort_handle: Option<AbortHandle>,
}

pub struct InnerSession {
Expand All @@ -43,7 +45,7 @@ impl WeakJaSession {
}
}

impl std::ops::Deref for JaSession {
impl Deref for JaSession {
type Target = Arc<InnerSession>;

fn deref(&self) -> &Self::Target {
Expand All @@ -62,7 +64,7 @@ impl JaSession {
let safe = SafeShared {
receiver,
handles: HashMap::new(),
join_handle: None,
abort_handle: None,
};

let session = Self(Arc::new(InnerSession {
Expand All @@ -76,15 +78,49 @@ impl JaSession {
let _ = this.keep_alive(ka_interval).await;
});

session.safe.lock().await.join_handle = Some(join_handle);
session.safe.lock().await.abort_handle = Some(join_handle.abort_handle());

session
}

pub async fn attach(
&self,
plugin_id: &str,
) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)> {
pub(crate) async fn send_request(&self, mut request: Value) -> JaResult<()> {
let mut connection = self.shared.connection.clone();
request["session_id"] = self.shared.id.into();
connection.send_request(request).await
}

async fn keep_alive(self, ka_interval: u32) -> JaResult<()> {
let mut interval = time::interval(Duration::from_secs(ka_interval.into()));
let id = { self.shared.id };
loop {
interval.tick().await;
log::trace!("Sending keep-alive {{ id: {id}, timeout: {ka_interval}s }}");
self.send_request(json!({
"janus": JaSessionRequestProtocol::KeepAlive,
}))
.await?;
self.safe.lock().await.receiver.recv().await.unwrap();
log::trace!("keep-alive OK {{ id: {id} }}");
}
}

pub(crate) fn downgrade(&self) -> WeakJaSession {
WeakJaSession(Arc::downgrade(self))
}
}

impl Drop for SafeShared {
fn drop(&mut self) {
if let Some(join_handle) = self.abort_handle.take() {
log::trace!("Keepalive task aborted");
join_handle.abort();
}
}
}

#[async_trait]
impl Attach for JaSession {
async fn attach(&self, plugin_id: &str) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)> {
log::info!("Attaching new handle {{ id: {} }}", self.shared.id);

let request = json!({
Expand Down Expand Up @@ -132,38 +168,4 @@ impl JaSession {

Ok((handle, event_receiver))
}

pub(crate) async fn send_request(&self, mut request: Value) -> JaResult<()> {
let mut connection = self.shared.connection.clone();
request["session_id"] = self.shared.id.into();
connection.send_request(request).await
}

async fn keep_alive(self, ka_interval: u32) -> JaResult<()> {
let mut interval = time::interval(Duration::from_secs(ka_interval.into()));
let id = { self.shared.id };
loop {
interval.tick().await;
log::trace!("Sending keep-alive {{ id: {id}, timeout: {ka_interval}s }}");
self.send_request(json!({
"janus": JaSessionRequestProtocol::KeepAlive,
}))
.await?;
self.safe.lock().await.receiver.recv().await.unwrap();
log::trace!("keep-alive OK {{ id: {id} }}");
}
}

pub(crate) fn downgrade(&self) -> WeakJaSession {
WeakJaSession(Arc::downgrade(self))
}
}

impl Drop for SafeShared {
fn drop(&mut self) {
if let Some(join_handle) = self.join_handle.take() {
log::trace!("Keepalive task aborted");
join_handle.abort();
}
}
}
6 changes: 3 additions & 3 deletions jarust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use jaconnection::JaConnection;
use prelude::JaResult;

pub mod jaconfig;
pub mod jahandle;
pub mod japlugin;
pub mod japrotocol;
pub mod plugins;
pub mod jasession;
pub mod prelude;
pub mod transport;

mod error;
mod jaconnection;
mod jahandle;
mod jasession;
mod nsp_registry;
mod tmanager;
mod utils;
Expand Down
6 changes: 4 additions & 2 deletions jarust/src/nsp_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::japrotocol::JaResponse;
use crate::prelude::*;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::mpsc;
Expand All @@ -13,15 +15,15 @@ pub(crate) struct Inner {
#[derive(Clone)]
pub(crate) struct NamespaceRegistry(Arc<RwLock<Inner>>);

impl std::ops::Deref for NamespaceRegistry {
impl Deref for NamespaceRegistry {
type Target = Arc<RwLock<Inner>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for NamespaceRegistry {
impl DerefMut for NamespaceRegistry {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
Expand Down
Loading
Loading