Skip to content

Commit

Permalink
feat(webcam): add real-time audio streaming support
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Oct 3, 2024
1 parent 03b6ad1 commit a3d1d47
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 82 deletions.
22 changes: 11 additions & 11 deletions crates/cassette-plugin-webcam-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ js-sys = { workspace = true, optional = true }
serde = { workspace = true }
wasm-bindgen = { workspace = true, optional = true }
web-sys = { workspace = true, optional = true, features = [
"MediaStream",
"MediaRecorder",
"MediaDevices",
"MediaStreamConstraints",
"MediaTrackConstraints",
"HtmlElement",
"Window",
"console",
"Url",
"Blob",
"BlobEvent",
"console",
"Event",
"EventTarget",
"HtmlAnchorElement",
"Document",
"HtmlElement",
"MediaDevices",
"MediaRecorder",
"MediaRecorderOptions",
"MediaStream",
"MediaStreamConstraints",
"MediaTrackConstraints",
"Navigator",
"WebSocket",
"Window",
] }
yew = { workspace = true, optional = true }
163 changes: 95 additions & 68 deletions crates/cassette-plugin-webcam-core/src/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{
rc::Rc,
sync::atomic::{AtomicU64, Ordering},
};
use std::rc::Rc;

use anyhow::{anyhow, Result};
use cassette_core::cassette::{CassetteContext, CassetteTaskHandle};
use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
use web_sys::{
window, Blob, BlobEvent, Document, HtmlAnchorElement, MediaDevices, MediaRecorder, MediaStream,
MediaStreamConstraints, Navigator, Url, Window,
window, Blob, BlobEvent, ErrorEvent, Event, MediaDevices, MediaRecorder, MediaRecorderOptions,
MediaStream, MediaStreamConstraints, WebSocket, Window,
};

use crate::recorder::WebcamRecorder;
Expand All @@ -33,7 +30,7 @@ fn build_webcam(

// Load a global window object
let window = window().ok_or_else(|| anyhow!("Global window object not found"))?;
let navigator: Navigator = window.navigator();
let navigator = window.navigator();

// Load media devices
let media_devices: MediaDevices = navigator
Expand Down Expand Up @@ -88,100 +85,130 @@ fn handle_stream(media_stream: JsValue, window: &Window, handler: &crate::Handle
url,
} = handler;

let stream: MediaStream = media_stream.unchecked_into();
// Configure WebSocket connection
let ws = WebSocket::new(url).expect("Failed to create WebSocket");

// Configure Session
let session = Rc::new(Session::new(ws));

// Configure MediaRecorder
let recorder =
MediaRecorder::new_with_media_stream(&stream).expect("MediaRecorder creation failed");
let stream: MediaStream = media_stream.unchecked_into();
let options = MediaRecorderOptions::new();
options.set_mime_type("audio/webm;codecs=opus");

// Configure Session
let session = Rc::new(Session::new(url.clone()));
let recorder =
MediaRecorder::new_with_media_stream_and_media_recorder_options(&stream, &options)
.expect("MediaRecorder creation failed");

// Configure the event handler: `ondataavailable`
{
let session = session.clone();
let ondataavailable = Closure::wrap(Box::new(move |event: BlobEvent| {
let blob = event.data().expect("No data available");
session.commit(blob)
session.commit(&blob)
}) as Box<dyn FnMut(_)>);

recorder.set_ondataavailable(Some(ondataavailable.as_ref().unchecked_ref()));
ondataavailable.forget();
ondataavailable.forget()
}

// Start recording!
// Configure the event handler: `onstop`
{
let session = session.clone();
let onstop = Closure::wrap(Box::new(move || session.finalize()) as Box<dyn Fn()>);

recorder.set_onstop(Some(onstop.as_ref().unchecked_ref()));
onstop.forget()
}

// Configure the main loop
let start_recording = {
let duration = *duration;
let time_slice = (*interval).try_into().expect("Too large interval");
recorder
.start_with_time_slice(time_slice)
.expect("Failed to start recorder")
let window = window.clone();
move || {
// Start recording!
recorder
.start_with_time_slice(time_slice)
.expect("Failed to start recorder");

// Set the timeout to break
if let Some(timeout) = duration {
let recorder = recorder.clone();
let stop_recorder = Closure::wrap(Box::new(move || {
recorder.stop().expect("Failed to stop recorder")
}) as Box<dyn Fn()>);

window
.set_timeout_with_callback_and_timeout_and_arguments_0(
stop_recorder.as_ref().unchecked_ref(),
timeout.try_into().expect("Too large interval"),
)
.expect("Failed to set timeout");
stop_recorder.forget()
}
}
};

// Set the timeout to break
if let Some(timeout) = *duration {
let recorder = recorder.clone();
let stop_recorder =
Closure::wrap(
Box::new(move || recorder.stop().expect("Failed to stop recorder"))
as Box<dyn Fn()>,
);

window
.set_timeout_with_callback_and_timeout_and_arguments_0(
stop_recorder.as_ref().unchecked_ref(),
timeout.try_into().expect("Too large interval"),
)
.expect("Failed to set timeout");
stop_recorder.forget();
// Configure the event handler: `onopen`
{
let onopen = Closure::wrap(Box::new(move |_: Event| {
::web_sys::console::log_1(&"WebSocket connection opened".into());
start_recording()
}) as Box<dyn FnMut(_)>);

session.ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
onopen.forget()
}

// Configure the event handler: `onstop`
// Configure the event handler: `onerror`
{
let onstop = Closure::wrap(Box::new(move || session.finalize()) as Box<dyn Fn()>);
let onerror = Closure::wrap(Box::new(move |e: ErrorEvent| {
::web_sys::console::error_1(&format!("WebSocket error: {e:?}").into());
}) as Box<dyn FnMut(_)>);

recorder.set_onstop(Some(onstop.as_ref().unchecked_ref()));
onstop.forget();
session
.ws
.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onerror.forget()
}

// Configure the event handler: `onclose`
{
let onclose = Closure::wrap(Box::new(move |_: Event| {
::web_sys::console::log_1(&"WebSocket connection closed".into());
}) as Box<dyn FnMut(_)>);

session
.ws
.set_onclose(Some(onclose.as_ref().unchecked_ref()));
onclose.forget()
}
}

struct Session {
index: AtomicU64,
url: String,
ws: WebSocket,
}

impl Session {
fn new(url: String) -> Self {
Self {
index: AtomicU64::default(),
url,
}
fn new(ws: WebSocket) -> Self {
Self { ws }
}

fn commit(&self, blob: Blob) {
let Self { index, url } = self;

// Generate an index
let index = index.fetch_add(1, Ordering::SeqCst);
fn commit(&self, blob: &Blob) {
let Self { ws } = self;

// Create a download link using Blob
let url = Url::create_object_url_with_blob(&blob).expect("Failed to create object URL");
let window = window().expect("Global window object not found");
let document: Document = window.document().expect("No document on window");
let a: HtmlAnchorElement = document
.create_element("a")
.expect("Failed to create anchor element")
.unchecked_into();

// Force to download the recorded data
a.set_href(&url);
a.set_download("audio.webm");
a.click();
// Start sending Blob
if ws.ready_state() == WebSocket::OPEN {
ws.send_with_blob(&blob)

Check failure on line 203 in crates/cassette-plugin-webcam-core/src/hooks.rs

View workflow job for this annotation

GitHub Actions / clippy-app

this expression creates a reference which is immediately dereferenced by the compiler
.expect("Failed to send data via WebSocket")
}
}

// Release the URL after use
Url::revoke_object_url(&url).expect("Failed to revoke object URL");
fn finalize(&self) {
let Self { ws } = self;

// todo!()
// Close websocket
ws.close().expect("Failed to close WebSocket")
}

fn finalize(&self) {}
}
2 changes: 1 addition & 1 deletion crates/cassette-plugin-webcam-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ impl Default for Handler {

impl Handler {
const fn default_interval() -> u32 {
1000 // 1 second
20 // 20 ms
}
}
4 changes: 2 additions & 2 deletions examples/webcam_record_audio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ spec:
- name: audio
kind: WebcamAudio
spec:
duration: 3000 # 3 seconds
url: http://localhost:9090/
duration: 60000 # 60 seconds
url: ws://localhost:9090

0 comments on commit a3d1d47

Please sign in to comment.