Skip to content

Commit

Permalink
feat: add minimal support for streaming plugin
Browse files Browse the repository at this point in the history
We currently only support:
* create an RTP mountpoint
  LIVE,ONDEMAND or RTSP are not yet supported
* list available mountpoints
* destroy mountpoint
  • Loading branch information
fcx-mrogez authored and Ghamza-Jd committed Sep 29, 2024
1 parent af05f91 commit 466b70e
Show file tree
Hide file tree
Showing 12 changed files with 625 additions and 3 deletions.
3 changes: 2 additions & 1 deletion jarust_plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ tokio = { workspace = true, features = ["sync"] }
tracing.workspace = true

[features]
default = ["echo_test", "audio_bridge", "video_room"]
default = ["echo_test", "audio_bridge", "video_room", "streaming"]
echo_test = []
audio_bridge = []
video_room = []
streaming = []
__experimental = []

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions jarust_plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Current plugins:
- [x] Echo test
- [x] Audio bridge
- [ ] Video room
- [ ] Streaming

## EchoTest Example

Expand Down
74 changes: 74 additions & 0 deletions jarust_plugins/examples/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use jarust::jaconfig::JaConfig;
use jarust::jaconfig::JanusAPI;
use jarust::jaconnection::CreateConnectionParams;
use jarust_plugins::streaming::jahandle_ext::Streaming;
use jarust_plugins::streaming::msg_options::*;
use jarust_plugins::JanusId;
use jarust_transport::tgenerator::RandomTransactionGenerator;
use std::path::Path;
use tracing_subscriber::EnvFilter;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let filename = Path::new(file!()).file_stem().unwrap().to_str().unwrap();
let env_filter = EnvFilter::from_default_env()
.add_directive("jarust=trace".parse()?)
.add_directive("jarust_plugins=trace".parse()?)
.add_directive("jarust_transport=trace".parse()?)
.add_directive("jarust_rt=trace".parse()?)
.add_directive(format!("{filename}=trace").parse()?);
tracing_subscriber::fmt().with_env_filter(env_filter).init();

let timeout = std::time::Duration::from_secs(10);
let config = JaConfig::builder()
.url("ws://localhost:8188/ws")
.capacity(32)
.build();
let mut connection =
jarust::connect(config, JanusAPI::WebSocket, RandomTransactionGenerator).await?;
let session = connection
.create_session(CreateConnectionParams {
ka_interval: 10,
timeout,
})
.await?;
let (handle, mut events) = session.attach_streaming(timeout).await?;

tokio::spawn(async move {
while let Some(e) = events.recv().await {
tracing::info!("{e:#?}");
}
});

let mountpoint_id = handle
.create_mountpoint_with_config(
StreamingCreateOptions {
id: Some(JanusId::Uint(1337)),
name: Some("stream name".to_string()),
description: Some("stream description".to_string()),
mountpoint_type: "rtp".to_string(),
media: Some(Vec::from([StreamingRtpMedia {
media_type: "video".to_string(),
mid: "v".to_string(),
port: 5005,
pt: Some(100),
codec: Some("vp8".to_string()),
..Default::default()
}])),
..Default::default()
},
timeout,
)
.await?
.stream
.id;

let mountpoints = handle.list(timeout).await?;
tracing::info!("Mountpoints {:#?}", mountpoints);

handle
.destroy_mountpoint(mountpoint_id, Default::default(), timeout)
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion jarust_plugins/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::Serialize;

impl_tryfrom_serde_value!(JanusId);

/// Rooms and Participants Identifier.
/// Mountpoints, Rooms and Participants Identifier.
///
/// Identifier should be by default unsigned integer, unless configured otherwise in the plugin config.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion jarust_plugins/src/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
///
/// ### Example:
/// ```rust
/// tryfrom_serde_value!(ChangeRoomOptions EditRoomOptions DestroyRoomMsg JoinRoomOptions);
/// impl_tryfrom_serde_value!(ChangeRoomOptions EditRoomOptions DestroyRoomMsg JoinRoomOptions);
/// ```
#[macro_export]
macro_rules! impl_tryfrom_serde_value {
Expand Down
4 changes: 4 additions & 0 deletions jarust_plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! - EchoTest plugin
//! - AudioBridge plugin
//! - VideoRoom plugin
//! - Streaming plugin (minimal support)
//!
//! All of the plugins are hidden behind feature flags to allow you to cherry-pick your dependencies. By default, all plugins are enabled.
//!
Expand All @@ -25,5 +26,8 @@ pub mod audio_bridge;
#[cfg(feature = "video_room")]
pub mod video_room;

#[cfg(feature = "streaming")]
pub mod streaming;

pub mod common;
pub use common::JanusId;
183 changes: 183 additions & 0 deletions jarust_plugins/src/streaming/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use crate::JanusId;
use jarust::error::JaError;
use jarust::prelude::JaResponse;
use jarust_transport::error::JaTransportError;
use jarust_transport::japrotocol::GenericEvent;
use jarust_transport::japrotocol::JaHandleEvent;
use jarust_transport::japrotocol::ResponseType;
use serde::Deserialize;
use serde_json::from_value;

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum PluginEvent {
StreamingEvent(StreamingEvent),
GenericEvent(GenericEvent),
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)]
#[serde(tag = "streaming")]
enum StreamingEventDto {
#[serde(rename = "destroyed")]
DestroyMountpoint { id: JanusId },

#[serde(rename = "created")]
CreateMountpoint {
id: JanusId,
/// <live|on demand>
#[serde(rename = "type")]
mountpoint_type: String,
},

#[serde(rename = "event")]
Event(StreamingEventEventType),
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Deserialize)]
#[serde(untagged)]
enum StreamingEventEventType {
#[serde(rename = "error")]
ErrorEvent { error_code: u16, error: String },
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum StreamingEvent {
MountpointDestroyed {
id: JanusId,
},
MountpointCreated {
id: JanusId,
mountpoint_type: String,
},
}

impl TryFrom<JaResponse> for PluginEvent {
type Error = JaError;

fn try_from(value: JaResponse) -> Result<Self, Self::Error> {
match value.janus {
ResponseType::Event(JaHandleEvent::PluginEvent { plugin_data }) => {
let streaming_event = from_value::<StreamingEventDto>(plugin_data.data)?;
match streaming_event {
StreamingEventDto::DestroyMountpoint { id } => Ok(PluginEvent::StreamingEvent(
StreamingEvent::MountpointDestroyed { id },
)),
StreamingEventDto::CreateMountpoint {
id,
mountpoint_type,
} => Ok(PluginEvent::StreamingEvent(
StreamingEvent::MountpointCreated {
id,
mountpoint_type,
},
)),
StreamingEventDto::Event(e) => match e {
StreamingEventEventType::ErrorEvent { error_code, error } => {
Err(JaError::JanusTransport(JaTransportError::JanusError {
code: error_code,
reason: error,
}))
}
},
}
}
ResponseType::Event(JaHandleEvent::GenericEvent(event)) => {
Ok(PluginEvent::GenericEvent(event))
}
_ => Err(JaError::IncompletePacket),
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;

use jarust::error::JaError;
use jarust_transport::japrotocol::{JaHandleEvent, JaResponse, PluginData, ResponseType};

use super::PluginEvent;
use crate::streaming::events::StreamingEvent;
use crate::JanusId;

#[test]
fn it_parse_mountpoint_created() {
let rsp = JaResponse {
janus: ResponseType::Event(JaHandleEvent::PluginEvent {
plugin_data: PluginData {
plugin: "janus.plugin.streaming".to_string(),
data: json!({
"streaming": "created",
"id": 6380744183070564u64,
"type": "live",
}),
},
}),
establishment_protocol: None,
transaction: None,
session_id: None,
sender: None,
};
let event: PluginEvent = rsp.try_into().unwrap();
assert_eq!(
event,
PluginEvent::StreamingEvent(StreamingEvent::MountpointCreated {
id: JanusId::Uint(6380744183070564u64),
mountpoint_type: "live".to_string(),
})
);
}

#[test]
fn it_parse_mountpoint_destroyed() {
let rsp = JaResponse {
janus: ResponseType::Event(JaHandleEvent::PluginEvent {
plugin_data: PluginData {
plugin: "janus.plugin.streaming".to_string(),
data: json!({
"streaming": "destroyed",
"id": 6380744183070564u64,
}),
},
}),
establishment_protocol: None,
transaction: None,
session_id: None,
sender: None,
};
let event: PluginEvent = rsp.try_into().unwrap();
assert_eq!(
event,
PluginEvent::StreamingEvent(StreamingEvent::MountpointDestroyed {
id: JanusId::Uint(6380744183070564u64),
})
);
}

#[test]
fn it_parse_error() {
let rsp = JaResponse {
janus: ResponseType::Event(JaHandleEvent::PluginEvent {
plugin_data: PluginData {
plugin: "janus.plugin.streaming".to_string(),
data: json!({
"streaming": "event",
"error_code": 456,
"error": "Can't add 'rtp' stream, error creating data source stream"
}),
},
}),
establishment_protocol: None,
transaction: None,
session_id: None,
sender: None,
};

let result: Result<PluginEvent, JaError> = rsp.try_into();
assert!(result.is_err());
let ja_error = result.err();
assert!(ja_error.is_some());
assert_eq!(
ja_error.unwrap().to_string(),
"Transport: Janus error { code: 456, reason: Can't add 'rtp' stream, error creating data source stream}");
}
}
Loading

0 comments on commit 466b70e

Please sign in to comment.