Skip to content

Commit

Permalink
feat: added plugable api
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghamza-Jd committed Jan 2, 2024
1 parent 670a001 commit e28e58a
Show file tree
Hide file tree
Showing 24 changed files with 284 additions and 145 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["jarust", "client"]
members = ["jarust", "jarust_make_plugin", "jarust_plugins", "client"]
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
jarust = { version = "*", path = "../jarust" }
jarust_plugins = { version = "*", path = "../jarust_plugins" }
log = "0.4.20"
tokio = { version = "1.34.0", features = ["time", "macros", "rt-multi-thread"] }
simple_logger = "4.3.0"
Expand Down
28 changes: 13 additions & 15 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use jarust::jaconfig::JaConfig;
use jarust::jaconfig::TransportType;
use jarust::plugins::echotest::events::EchoTestPluginEvent;
use jarust::plugins::echotest::handle::EchoTest;
use jarust::plugins::echotest::messages::EchoTestStartMsg;
use jarust_plugins::echotest::events::EchoTestPluginEvent;
use jarust_plugins::echotest::messages::EchoTestStartMsg;
use jarust_plugins::echotest::EchoTest;
use log::LevelFilter;
use log::SetLoggerError;
use simple_logger::SimpleLogger;
Expand All @@ -12,17 +12,15 @@ async fn main() -> anyhow::Result<()> {
init_logger()?;

// To make sure handle is working even after dropping the session and the connection
let (handle, mut event_receiver) = {
let mut connection = jarust::connect(JaConfig::new(
"wss://janus.conf.meetecho.com/ws",
None,
TransportType::Wss,
"janus",
))
.await?;
let session = connection.create(10).await?;
session.attach_echotest().await?
};
let mut connection = jarust::connect(JaConfig::new(
"wss://janus.conf.meetecho.com/ws",
None,
TransportType::Wss,
"janus",
))
.await?;
let session = connection.create(10).await?;
let (handle, mut event_receiver) = session.attach_echo_test().await?;

handle
.start(EchoTestStartMsg {
Expand All @@ -32,7 +30,7 @@ async fn main() -> anyhow::Result<()> {
.await?;

while let Some(event) = event_receiver.recv().await {
match event.event {
match event {
EchoTestPluginEvent::Result { result, .. } => {
log::info!("result: {result}");
}
Expand Down
4 changes: 0 additions & 4 deletions jarust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,3 @@ serde_json = "1.0.108"

[dev-dependencies]
tokio = { version = "1.35.1", features = ["macros"] }

[features]
default = ["echotest"]
echotest = []
2 changes: 1 addition & 1 deletion jarust/src/jaconfig.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;
pub const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Debug)]
pub struct JaConfig {
Expand Down
6 changes: 4 additions & 2 deletions jarust/src/jaconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::utils::get_subnamespace_from_response;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
Expand All @@ -41,15 +43,15 @@ pub struct InnerConnection {
#[derive(Clone)]
pub struct JaConnection(Arc<InnerConnection>);

impl std::ops::Deref for JaConnection {
impl Deref for JaConnection {
type Target = Arc<InnerConnection>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for JaConnection {
impl DerefMut for JaConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
Expand Down
3 changes: 2 additions & 1 deletion jarust/src/jahandle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::jasession::JaSession;
use crate::prelude::*;
use serde_json::json;
use serde_json::Value;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -39,7 +40,7 @@ impl WeakJaHandle {
}
}

impl std::ops::Deref for JaHandle {
impl Deref for JaHandle {
type Target = Arc<InnerHandle>;

fn deref(&self) -> &Self::Target {
Expand Down
16 changes: 16 additions & 0 deletions jarust/src/japlugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::japrotocol::JaResponse;
use crate::prelude::JaHandle;
use crate::prelude::JaResult;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio::task::AbortHandle;

pub trait PluginTask {
fn assign_abort(&mut self, abort_handle: AbortHandle);
fn abort_plugin(&mut self);
}

#[async_trait]
pub trait Attach {
async fn attach(&self, plugin_id: &str) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)>;
}
80 changes: 41 additions & 39 deletions jarust/src/jasession.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use crate::japrotocol::JaResponse;
use crate::japrotocol::JaResponseProtocol;
use crate::japrotocol::JaSessionRequestProtocol;
use crate::prelude::*;
use async_trait::async_trait;
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
Expand Down Expand Up @@ -43,7 +45,7 @@ impl WeakJaSession {
}
}

impl std::ops::Deref for JaSession {
impl Deref for JaSession {
type Target = Arc<InnerSession>;

fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -81,10 +83,44 @@ impl JaSession {
session
}

pub async fn attach(
&self,
plugin_id: &str,
) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)> {
pub(crate) async fn send_request(&self, mut request: Value) -> JaResult<()> {
let mut connection = self.shared.connection.clone();
request["session_id"] = self.shared.id.into();
connection.send_request(request).await
}

async fn keep_alive(self, ka_interval: u32) -> JaResult<()> {
let mut interval = time::interval(Duration::from_secs(ka_interval.into()));
let id = { self.shared.id };
loop {
interval.tick().await;
log::trace!("Sending keep-alive {{ id: {id}, timeout: {ka_interval}s }}");
self.send_request(json!({
"janus": JaSessionRequestProtocol::KeepAlive,
}))
.await?;
self.safe.lock().await.receiver.recv().await.unwrap();
log::trace!("keep-alive OK {{ id: {id} }}");
}
}

pub(crate) fn downgrade(&self) -> WeakJaSession {
WeakJaSession(Arc::downgrade(self))
}
}

impl Drop for SafeShared {
fn drop(&mut self) {
if let Some(join_handle) = self.join_handle.take() {
log::trace!("Keepalive task aborted");
join_handle.abort();
}
}
}

#[async_trait]
impl Attach for JaSession {
async fn attach(&self, plugin_id: &str) -> JaResult<(JaHandle, mpsc::Receiver<JaResponse>)> {
log::info!("Attaching new handle {{ id: {} }}", self.shared.id);

let request = json!({
Expand Down Expand Up @@ -132,38 +168,4 @@ impl JaSession {

Ok((handle, event_receiver))
}

pub(crate) async fn send_request(&self, mut request: Value) -> JaResult<()> {
let mut connection = self.shared.connection.clone();
request["session_id"] = self.shared.id.into();
connection.send_request(request).await
}

async fn keep_alive(self, ka_interval: u32) -> JaResult<()> {
let mut interval = time::interval(Duration::from_secs(ka_interval.into()));
let id = { self.shared.id };
loop {
interval.tick().await;
log::trace!("Sending keep-alive {{ id: {id}, timeout: {ka_interval}s }}");
self.send_request(json!({
"janus": JaSessionRequestProtocol::KeepAlive,
}))
.await?;
self.safe.lock().await.receiver.recv().await.unwrap();
log::trace!("keep-alive OK {{ id: {id} }}");
}
}

pub(crate) fn downgrade(&self) -> WeakJaSession {
WeakJaSession(Arc::downgrade(self))
}
}

impl Drop for SafeShared {
fn drop(&mut self) {
if let Some(join_handle) = self.join_handle.take() {
log::trace!("Keepalive task aborted");
join_handle.abort();
}
}
}
6 changes: 3 additions & 3 deletions jarust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use jaconnection::JaConnection;
use prelude::JaResult;

pub mod jaconfig;
pub mod jahandle;
pub mod japlugin;
pub mod japrotocol;
pub mod plugins;
pub mod jasession;
pub mod prelude;
pub mod transport;

mod error;
mod jaconnection;
mod jahandle;
mod jasession;
mod nsp_registry;
mod tmanager;
mod utils;
Expand Down
6 changes: 4 additions & 2 deletions jarust/src/nsp_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::jaconfig::CHANNEL_BUFFER_SIZE;
use crate::japrotocol::JaResponse;
use crate::prelude::*;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::mpsc;
Expand All @@ -13,15 +15,15 @@ pub(crate) struct Inner {
#[derive(Clone)]
pub(crate) struct NamespaceRegistry(Arc<RwLock<Inner>>);

impl std::ops::Deref for NamespaceRegistry {
impl Deref for NamespaceRegistry {
type Target = Arc<RwLock<Inner>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for NamespaceRegistry {
impl DerefMut for NamespaceRegistry {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
Expand Down
65 changes: 0 additions & 65 deletions jarust/src/plugins/echotest/handle.rs

This file was deleted.

3 changes: 0 additions & 3 deletions jarust/src/plugins/echotest/mod.rs

This file was deleted.

7 changes: 0 additions & 7 deletions jarust/src/plugins/mod.rs

This file was deleted.

6 changes: 6 additions & 0 deletions jarust/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
pub use crate::error::JaError;
pub use crate::jaconfig::CHANNEL_BUFFER_SIZE;
pub use crate::jahandle::JaHandle;
pub use crate::japlugin::Attach;
pub use crate::japlugin::PluginTask;
pub use crate::japrotocol::JaResponse;
pub use crate::jasession::JaSession;

pub type JaResult<T> = core::result::Result<T, JaError>;
Loading

0 comments on commit e28e58a

Please sign in to comment.