From 466b70e3a8676d3374e8f526b49c3093cbb28ab7 Mon Sep 17 00:00:00 2001 From: Matthieu Rogez Date: Fri, 27 Sep 2024 09:30:01 +0000 Subject: [PATCH] feat: add minimal support for streaming plugin We currently only support: * create an RTP mountpoint LIVE,ONDEMAND or RTSP are not yet supported * list available mountpoints * destroy mountpoint --- jarust_plugins/Cargo.toml | 3 +- jarust_plugins/README.md | 1 + jarust_plugins/examples/streaming.rs | 74 ++++++++ jarust_plugins/src/common.rs | 2 +- jarust_plugins/src/from.rs | 2 +- jarust_plugins/src/lib.rs | 4 + jarust_plugins/src/streaming/events.rs | 183 +++++++++++++++++++ jarust_plugins/src/streaming/handle.rs | 127 +++++++++++++ jarust_plugins/src/streaming/jahandle_ext.rs | 41 +++++ jarust_plugins/src/streaming/mod.rs | 5 + jarust_plugins/src/streaming/msg_options.rs | 116 ++++++++++++ jarust_plugins/src/streaming/responses.rs | 70 +++++++ 12 files changed, 625 insertions(+), 3 deletions(-) create mode 100644 jarust_plugins/examples/streaming.rs create mode 100644 jarust_plugins/src/streaming/events.rs create mode 100644 jarust_plugins/src/streaming/handle.rs create mode 100644 jarust_plugins/src/streaming/jahandle_ext.rs create mode 100644 jarust_plugins/src/streaming/mod.rs create mode 100644 jarust_plugins/src/streaming/msg_options.rs create mode 100644 jarust_plugins/src/streaming/responses.rs diff --git a/jarust_plugins/Cargo.toml b/jarust_plugins/Cargo.toml index 9751d6b..e8b9589 100644 --- a/jarust_plugins/Cargo.toml +++ b/jarust_plugins/Cargo.toml @@ -24,10 +24,11 @@ tokio = { workspace = true, features = ["sync"] } tracing.workspace = true [features] -default = ["echo_test", "audio_bridge", "video_room"] +default = ["echo_test", "audio_bridge", "video_room", "streaming"] echo_test = [] audio_bridge = [] video_room = [] +streaming = [] __experimental = [] [dev-dependencies] diff --git a/jarust_plugins/README.md b/jarust_plugins/README.md index 371ad22..eb6757e 100644 --- a/jarust_plugins/README.md +++ b/jarust_plugins/README.md @@ -7,6 +7,7 @@ Current plugins: - [x] Echo test - [x] Audio bridge - [ ] Video room +- [ ] Streaming ## EchoTest Example diff --git a/jarust_plugins/examples/streaming.rs b/jarust_plugins/examples/streaming.rs new file mode 100644 index 0000000..0cbf33f --- /dev/null +++ b/jarust_plugins/examples/streaming.rs @@ -0,0 +1,74 @@ +use jarust::jaconfig::JaConfig; +use jarust::jaconfig::JanusAPI; +use jarust::jaconnection::CreateConnectionParams; +use jarust_plugins::streaming::jahandle_ext::Streaming; +use jarust_plugins::streaming::msg_options::*; +use jarust_plugins::JanusId; +use jarust_transport::tgenerator::RandomTransactionGenerator; +use std::path::Path; +use tracing_subscriber::EnvFilter; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + let filename = Path::new(file!()).file_stem().unwrap().to_str().unwrap(); + let env_filter = EnvFilter::from_default_env() + .add_directive("jarust=trace".parse()?) + .add_directive("jarust_plugins=trace".parse()?) + .add_directive("jarust_transport=trace".parse()?) + .add_directive("jarust_rt=trace".parse()?) + .add_directive(format!("{filename}=trace").parse()?); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + + let timeout = std::time::Duration::from_secs(10); + let config = JaConfig::builder() + .url("ws://localhost:8188/ws") + .capacity(32) + .build(); + let mut connection = + jarust::connect(config, JanusAPI::WebSocket, RandomTransactionGenerator).await?; + let session = connection + .create_session(CreateConnectionParams { + ka_interval: 10, + timeout, + }) + .await?; + let (handle, mut events) = session.attach_streaming(timeout).await?; + + tokio::spawn(async move { + while let Some(e) = events.recv().await { + tracing::info!("{e:#?}"); + } + }); + + let mountpoint_id = handle + .create_mountpoint_with_config( + StreamingCreateOptions { + id: Some(JanusId::Uint(1337)), + name: Some("stream name".to_string()), + description: Some("stream description".to_string()), + mountpoint_type: "rtp".to_string(), + media: Some(Vec::from([StreamingRtpMedia { + media_type: "video".to_string(), + mid: "v".to_string(), + port: 5005, + pt: Some(100), + codec: Some("vp8".to_string()), + ..Default::default() + }])), + ..Default::default() + }, + timeout, + ) + .await? + .stream + .id; + + let mountpoints = handle.list(timeout).await?; + tracing::info!("Mountpoints {:#?}", mountpoints); + + handle + .destroy_mountpoint(mountpoint_id, Default::default(), timeout) + .await?; + + Ok(()) +} diff --git a/jarust_plugins/src/common.rs b/jarust_plugins/src/common.rs index a4cae0b..bb816ea 100644 --- a/jarust_plugins/src/common.rs +++ b/jarust_plugins/src/common.rs @@ -3,7 +3,7 @@ use serde::Serialize; impl_tryfrom_serde_value!(JanusId); -/// Rooms and Participants Identifier. +/// Mountpoints, Rooms and Participants Identifier. /// /// Identifier should be by default unsigned integer, unless configured otherwise in the plugin config. #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize, Deserialize)] diff --git a/jarust_plugins/src/from.rs b/jarust_plugins/src/from.rs index 875c199..72a8fa1 100644 --- a/jarust_plugins/src/from.rs +++ b/jarust_plugins/src/from.rs @@ -6,7 +6,7 @@ /// /// ### Example: /// ```rust -/// tryfrom_serde_value!(ChangeRoomOptions EditRoomOptions DestroyRoomMsg JoinRoomOptions); +/// impl_tryfrom_serde_value!(ChangeRoomOptions EditRoomOptions DestroyRoomMsg JoinRoomOptions); /// ``` #[macro_export] macro_rules! impl_tryfrom_serde_value { diff --git a/jarust_plugins/src/lib.rs b/jarust_plugins/src/lib.rs index 85d3def..0c1ce05 100644 --- a/jarust_plugins/src/lib.rs +++ b/jarust_plugins/src/lib.rs @@ -6,6 +6,7 @@ //! - EchoTest plugin //! - AudioBridge plugin //! - VideoRoom plugin +//! - Streaming plugin (minimal support) //! //! All of the plugins are hidden behind feature flags to allow you to cherry-pick your dependencies. By default, all plugins are enabled. //! @@ -25,5 +26,8 @@ pub mod audio_bridge; #[cfg(feature = "video_room")] pub mod video_room; +#[cfg(feature = "streaming")] +pub mod streaming; + pub mod common; pub use common::JanusId; diff --git a/jarust_plugins/src/streaming/events.rs b/jarust_plugins/src/streaming/events.rs new file mode 100644 index 0000000..8396a18 --- /dev/null +++ b/jarust_plugins/src/streaming/events.rs @@ -0,0 +1,183 @@ +use crate::JanusId; +use jarust::error::JaError; +use jarust::prelude::JaResponse; +use jarust_transport::error::JaTransportError; +use jarust_transport::japrotocol::GenericEvent; +use jarust_transport::japrotocol::JaHandleEvent; +use jarust_transport::japrotocol::ResponseType; +use serde::Deserialize; +use serde_json::from_value; + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum PluginEvent { + StreamingEvent(StreamingEvent), + GenericEvent(GenericEvent), +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +#[serde(tag = "streaming")] +enum StreamingEventDto { + #[serde(rename = "destroyed")] + DestroyMountpoint { id: JanusId }, + + #[serde(rename = "created")] + CreateMountpoint { + id: JanusId, + /// + #[serde(rename = "type")] + mountpoint_type: String, + }, + + #[serde(rename = "event")] + Event(StreamingEventEventType), +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +#[serde(untagged)] +enum StreamingEventEventType { + #[serde(rename = "error")] + ErrorEvent { error_code: u16, error: String }, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum StreamingEvent { + MountpointDestroyed { + id: JanusId, + }, + MountpointCreated { + id: JanusId, + mountpoint_type: String, + }, +} + +impl TryFrom for PluginEvent { + type Error = JaError; + + fn try_from(value: JaResponse) -> Result { + match value.janus { + ResponseType::Event(JaHandleEvent::PluginEvent { plugin_data }) => { + let streaming_event = from_value::(plugin_data.data)?; + match streaming_event { + StreamingEventDto::DestroyMountpoint { id } => Ok(PluginEvent::StreamingEvent( + StreamingEvent::MountpointDestroyed { id }, + )), + StreamingEventDto::CreateMountpoint { + id, + mountpoint_type, + } => Ok(PluginEvent::StreamingEvent( + StreamingEvent::MountpointCreated { + id, + mountpoint_type, + }, + )), + StreamingEventDto::Event(e) => match e { + StreamingEventEventType::ErrorEvent { error_code, error } => { + Err(JaError::JanusTransport(JaTransportError::JanusError { + code: error_code, + reason: error, + })) + } + }, + } + } + ResponseType::Event(JaHandleEvent::GenericEvent(event)) => { + Ok(PluginEvent::GenericEvent(event)) + } + _ => Err(JaError::IncompletePacket), + } + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use jarust::error::JaError; + use jarust_transport::japrotocol::{JaHandleEvent, JaResponse, PluginData, ResponseType}; + + use super::PluginEvent; + use crate::streaming::events::StreamingEvent; + use crate::JanusId; + + #[test] + fn it_parse_mountpoint_created() { + let rsp = JaResponse { + janus: ResponseType::Event(JaHandleEvent::PluginEvent { + plugin_data: PluginData { + plugin: "janus.plugin.streaming".to_string(), + data: json!({ + "streaming": "created", + "id": 6380744183070564u64, + "type": "live", + }), + }, + }), + establishment_protocol: None, + transaction: None, + session_id: None, + sender: None, + }; + let event: PluginEvent = rsp.try_into().unwrap(); + assert_eq!( + event, + PluginEvent::StreamingEvent(StreamingEvent::MountpointCreated { + id: JanusId::Uint(6380744183070564u64), + mountpoint_type: "live".to_string(), + }) + ); + } + + #[test] + fn it_parse_mountpoint_destroyed() { + let rsp = JaResponse { + janus: ResponseType::Event(JaHandleEvent::PluginEvent { + plugin_data: PluginData { + plugin: "janus.plugin.streaming".to_string(), + data: json!({ + "streaming": "destroyed", + "id": 6380744183070564u64, + }), + }, + }), + establishment_protocol: None, + transaction: None, + session_id: None, + sender: None, + }; + let event: PluginEvent = rsp.try_into().unwrap(); + assert_eq!( + event, + PluginEvent::StreamingEvent(StreamingEvent::MountpointDestroyed { + id: JanusId::Uint(6380744183070564u64), + }) + ); + } + + #[test] + fn it_parse_error() { + let rsp = JaResponse { + janus: ResponseType::Event(JaHandleEvent::PluginEvent { + plugin_data: PluginData { + plugin: "janus.plugin.streaming".to_string(), + data: json!({ + "streaming": "event", + "error_code": 456, + "error": "Can't add 'rtp' stream, error creating data source stream" + }), + }, + }), + establishment_protocol: None, + transaction: None, + session_id: None, + sender: None, + }; + + let result: Result = rsp.try_into(); + assert!(result.is_err()); + let ja_error = result.err(); + assert!(ja_error.is_some()); + assert_eq!( + ja_error.unwrap().to_string(), + "Transport: Janus error { code: 456, reason: Can't add 'rtp' stream, error creating data source stream}"); + } +} diff --git a/jarust_plugins/src/streaming/handle.rs b/jarust_plugins/src/streaming/handle.rs new file mode 100644 index 0000000..9fd60e8 --- /dev/null +++ b/jarust_plugins/src/streaming/handle.rs @@ -0,0 +1,127 @@ +use crate::streaming::msg_options::*; +use crate::streaming::responses::*; +use crate::JanusId; +use jarust::prelude::*; +use jarust_rt::JaTask; +use serde_json::json; +use serde_json::Value; +use std::ops::Deref; +use std::time::Duration; + +pub struct StreamingHandle { + handle: JaHandle, + task: Option, +} + +// +// synchronous methods +// +impl StreamingHandle { + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub async fn create_mountpoint( + &self, + mountpoint: Option, + timeout: Duration, + ) -> JaResult { + self.create_mountpoint_with_config( + StreamingCreateOptions { + id: mountpoint, + ..Default::default() + }, + timeout, + ) + .await + } + + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub async fn create_mountpoint_with_config( + &self, + options: StreamingCreateOptions, + timeout: Duration, + ) -> JaResult { + tracing::info!(plugin = "streaming", "Sending create"); + let mut message: Value = options.try_into()?; + message["request"] = "create".into(); + + self.handle + .send_waiton_rsp::(message, timeout) + .await + } + + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub async fn destroy_mountpoint( + &self, + mountpoint: JanusId, + options: StreamingDestroyOptions, + timeout: Duration, + ) -> JaResult { + tracing::info!(plugin = "streaming", "Sending destroy"); + let mut message: Value = options.try_into()?; + message["request"] = "destroy".into(); + message["id"] = mountpoint.try_into()?; + + self.handle + .send_waiton_rsp::(message, timeout) + .await + } + + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub async fn list(&self, timeout: Duration) -> JaResult> { + tracing::info!(plugin = "streaming", "Sending list"); + let response = self + .handle + .send_waiton_rsp::( + json!({ + "request": "list" + }), + timeout, + ) + .await?; + + Ok(response.list) + } + + // TODO: + // info + // edit + // enable + // disable + // recording +} + +// +// asynchronous methods +// +// TODO + +impl PluginTask for StreamingHandle { + fn assign_task(&mut self, task: JaTask) { + self.task = Some(task); + } + + fn cancel_task(&mut self) { + if let Some(task) = self.task.take() { + task.cancel() + }; + } +} + +impl From for StreamingHandle { + fn from(handle: JaHandle) -> Self { + Self { handle, task: None } + } +} + +impl Deref for StreamingHandle { + type Target = JaHandle; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + +impl Drop for StreamingHandle { + fn drop(&mut self) { + self.cancel_task(); + } +} diff --git a/jarust_plugins/src/streaming/jahandle_ext.rs b/jarust_plugins/src/streaming/jahandle_ext.rs new file mode 100644 index 0000000..96959ae --- /dev/null +++ b/jarust_plugins/src/streaming/jahandle_ext.rs @@ -0,0 +1,41 @@ +use super::events::PluginEvent; +use super::handle::StreamingHandle; +use jarust::japlugin::AttachHandleParams; +use jarust::prelude::*; +use std::ops::Deref; +use std::time::Duration; +use tokio::sync::mpsc; + +#[async_trait::async_trait] +pub trait Streaming: Attach { + type Event: TryFrom + Send + Sync + 'static; + type Handle: From + Deref + PluginTask; + + async fn attach_streaming( + &self, + timeout: Duration, + ) -> JaResult<(Self::Handle, mpsc::UnboundedReceiver)> { + let (handle, mut receiver) = self + .attach(AttachHandleParams { + plugin_id: "janus.plugin.streaming".to_string(), + timeout, + }) + .await?; + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let task = jarust_rt::spawn("streaming listener", async move { + while let Some(rsp) = receiver.recv().await { + if let Ok(event) = rsp.try_into() { + let _ = tx.send(event); + }; + } + }); + let mut handle: Self::Handle = handle.into(); + handle.assign_task(task); + Ok((handle, rx)) + } +} + +impl Streaming for JaSession { + type Event = PluginEvent; + type Handle = StreamingHandle; +} diff --git a/jarust_plugins/src/streaming/mod.rs b/jarust_plugins/src/streaming/mod.rs new file mode 100644 index 0000000..07f6921 --- /dev/null +++ b/jarust_plugins/src/streaming/mod.rs @@ -0,0 +1,5 @@ +pub mod events; +pub mod handle; +pub mod jahandle_ext; +pub mod msg_options; +pub mod responses; diff --git a/jarust_plugins/src/streaming/msg_options.rs b/jarust_plugins/src/streaming/msg_options.rs new file mode 100644 index 0000000..49a0d0b --- /dev/null +++ b/jarust_plugins/src/streaming/msg_options.rs @@ -0,0 +1,116 @@ +use crate::JanusId; +use serde::Serialize; + +impl_tryfrom_serde_value!( + StreamingCreateOptions StreamingDestroyOptions +); + +// +// Create Message +// https://github.com/meetecho/janus-gateway/blob/v1.2.4/src/plugins/janus_streaming.c#L3311-L4175 +// TODO: only RTP type is supported +// + +#[derive(Serialize, Default)] +pub struct StreamingCreateOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub admin_key: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub is_private: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub secret: Option, + + /// pin required for viewers to access mountpoint + #[serde(skip_serializing_if = "Option::is_none")] + pub pin: Option, + + /// whether the mountpoint should be saved to the configuration file or not, default=false + #[serde(skip_serializing_if = "Option::is_none")] + pub permanent: Option, + + /// + #[serde(rename = "type")] + pub mountpoint_type: String, + + // RTP only + #[serde(skip_serializing_if = "Option::is_none")] + pub media: Option>, +} + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum StreamingMountpointType { + RTP, + LIVE, + ONDEMAND, + RTSP, +} + +// https://github.com/meetecho/janus-gateway/blob/v1.2.4/src/plugins/janus_streaming.c#L1100 +#[derive(Serialize, Default)] +pub struct StreamingRtpMedia { + /// audio|video|data + #[serde(rename = "type")] + pub media_type: String, + + /// Unique mid to assign to this stream in negociated PeerConnections + pub mid: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub label: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub msid: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub mcast: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub iface: Option, + pub port: u16, + #[serde(skip_serializing_if = "Option::is_none")] + pub rtcpport: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pt: Option, // payload type is restricted to 0-127 + #[serde(skip_serializing_if = "Option::is_none")] + pub codec: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub fmtp: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub skew: Option, + // missing video only and data only parameters ? +} + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum StreamingRtpMediaType { + AUDIO, + VIDEO, + DATA, +} + +// +// Destroy Message +// + +#[derive(Serialize, Default)] +pub struct StreamingDestroyOptions { + /// mountpoint secret, mandatory if configured + #[serde(skip_serializing_if = "Option::is_none")] + pub secret: Option, + + /// whether the mountpoint should be also removed from the config file, default=false + #[serde(skip_serializing_if = "Option::is_none")] + pub permanent: Option, +} diff --git a/jarust_plugins/src/streaming/responses.rs b/jarust_plugins/src/streaming/responses.rs new file mode 100644 index 0000000..c13992e --- /dev/null +++ b/jarust_plugins/src/streaming/responses.rs @@ -0,0 +1,70 @@ +use crate::JanusId; +use serde::Deserialize; + +// https://github.com/meetecho/janus-gateway/blob/v1.2.4/src/plugins/janus_streaming.c#L4335-L4414 +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct MountpointCreatedRsp { + pub created: String, + pub permanent: bool, + pub stream: MountpointCreated, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct MountpointCreated { + pub id: JanusId, + /// + #[serde(rename = "type")] + pub mountpoint_type: String, + pub description: String, + pub is_private: bool, + // RTP only + pub host: Option, + pub ports: Option>, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct RtpMediaCreated { + /// + #[serde(rename = "type")] + pub media_type: String, + pub mid: String, + pub msid: Option, + pub port: Option, + pub rtcp_port: Option, + pub port_2: Option, + pub port_3: Option, +} + +// https://github.com/meetecho/janus-gateway/blob/v1.2.4/src/plugins/janus_streaming.c#L4994-L4997 +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct MountpointDestroyedRsp { + pub destroyed: JanusId, +} + +// https://github.com/meetecho/janus-gateway/blob/v1.2.4/src/plugins/janus_streaming.c#L3058-L3127 +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct ListMountpointsRsp { + pub list: Vec, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct MountpointListed { + pub id: JanusId, + /// + #[serde(rename = "type")] + pub mountpoint_type: String, + pub description: String, + pub metadata: Option, + pub enabled: bool, + pub media: Option>, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)] +pub struct RtpMediaListed { + #[serde(rename = "type")] + pub media_type: String, + pub mid: String, + pub label: String, + pub msid: Option, + pub age_ms: Option, +}