diff --git a/crates/composer/Cargo.toml b/crates/composer/Cargo.toml index 09383d2..d415a28 100644 --- a/crates/composer/Cargo.toml +++ b/crates/composer/Cargo.toml @@ -8,5 +8,6 @@ bincode = "1" clap = { version = "4.2", features = ["derive"] } color-eyre = "0.6" composer_api = { path = "../composer_api" } +cpal = "0.15" eyre = "0.6" rodio = { version = "0.17", features = ["symphonia-wav"] } diff --git a/crates/composer/src/audio_output.rs b/crates/composer/src/audio_output.rs new file mode 100644 index 0000000..24e95b5 --- /dev/null +++ b/crates/composer/src/audio_output.rs @@ -0,0 +1,150 @@ +use cpal::{ + traits::{DeviceTrait, HostTrait}, + OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, +}; +use eyre::{bail, eyre, Result}; +use rodio::{ + dynamic_mixer::{DynamicMixer, DynamicMixerController}, + Source, +}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc::{channel, Receiver, Sender, TryRecvError}, + Arc, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +pub(crate) struct AudioOutput { + source_tx: Sender, + play_delay: Duration, + too_early_plays: Arc, + _stream: cpal::Stream, +} + +/// Abstraction to actually produce sound using the [AudioOutput::play()] method. +/// Uses `cpal` and `rodio` behind the curtains. Great care is taken to position played samples +/// precisely in time so that sound superposition works well even at high frequencies. +/// Playback stops when this struct is dropped. +impl AudioOutput { + pub(crate) fn new(play_delay: Duration) -> Result { + let cpal_device = cpal::default_host() + .default_output_device() + .ok_or_else(|| eyre!("no cpal audio output device found"))?; + let supported_config = cpal_device.default_output_config()?; + let stream_config = supported_config.config(); + println!( + "Using audio device '{}', supported config {:?}, stream config {:?}.", + cpal_device.name()?, + supported_config, + stream_config, + ); + if supported_config.sample_format() != SampleFormat::F32 { + bail!("Only F32 sample format supported for now."); + } + + let (mixer_controller, mixer) = + rodio::dynamic_mixer::mixer::(stream_config.channels, stream_config.sample_rate.0); + + let (source_tx, source_rx) = channel(); + + let too_early_plays = Arc::default(); + + // The mixer_controller can be shared between threads, but we want to precisely control + // when we add new sources w.r.t. the audio callback, so we move it to the audio thread and + // use a mpsc channel to send new sources to the audio thread. + let mut audio_callback = + AudioCallback::new(mixer_controller, mixer, source_rx, &too_early_plays); + let _stream = cpal_device.build_output_stream::( + &stream_config, + move |data_out, info| audio_callback.fill_data(data_out, info), + |err| eprintln!("Got cpal stream error callback: {err}."), + None, + )?; + + Ok(Self { source_tx, play_delay, too_early_plays, _stream }) + } + + pub(crate) fn play + Send + 'static>(&self, source: S) { + // TODO(Matej): use timestamp from the event itself once we have it. + let play_at_timestamp = current_timestamp() + self.play_delay; + + // TODO(Matej): we are in fact double-boxing because DynamicMixerController internally adds + // another box. But we need a sized type to send it through threads. We could make this + // method non-generic, but that would be less flexible, so just accept it for now. + let source = Box::new(source); + + self.source_tx + .send(TimedSource { source, play_at_timestamp }) + .expect("source receiver should be still alive"); + } + + /// Get "too early plays" counter since the last call of this method. + pub(crate) fn fetch_too_early_plays(&self) -> u64 { + self.too_early_plays.swap(0, Ordering::SeqCst) + } +} + +/// An f32 [rodio::source::Source] with UNIX timestamp of desired play time attached. +struct TimedSource { + source: Box + Send + 'static>, + play_at_timestamp: Duration, +} + +/// A sort of manual implementation of the closure used as cpal audio data callback, for tidiness. +struct AudioCallback { + mixer_controller: Arc>, + mixer: DynamicMixer, + source_rx: Receiver, + too_early_plays: Arc, +} + +impl AudioCallback { + fn new( + mixer_controller: Arc>, + mixer: DynamicMixer, + source_rx: Receiver, + too_early_plays: &Arc, + ) -> Self { + let too_early_plays = Arc::clone(too_early_plays); + Self { mixer_controller, mixer, source_rx, too_early_plays } + } + + fn fill_data(&mut self, data_out: &mut [f32], info: &OutputCallbackInfo) { + let now = current_timestamp(); + // cpal gives us two timestamps that cannot be compared to unix time directly as they have a different epoch. + // However subtracting them gives us the duration between when we were called (i.e. now) and when the buffer + // we produce will be played. + let OutputStreamTimestamp { playback, callback } = info.timestamp(); + let playback_delay = + playback.duration_since(&callback).expect("playback shouldn't be planned in past"); + // ...and by adding it to current unix timestamp we get a unix timestamp of the instant the buffer will be played. + let playback_unix_timestamp = now + playback_delay; + + // Add possible new sources to the list + loop { + match self.source_rx.try_recv() { + Ok(timed_source) => { + let delay = timed_source + .play_at_timestamp + .checked_sub(playback_unix_timestamp) + .unwrap_or_else(|| { + self.too_early_plays.fetch_add(1, Ordering::SeqCst); + Duration::ZERO + }); + self.mixer_controller.add(timed_source.source.delay(delay)); + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => panic!("source sender should be still alive"), + } + } + + data_out.iter_mut().for_each(|d| *d = self.mixer.next().unwrap_or(0f32)) + } +} + +// Get current timestamp as the `Duration` since the UNIX epoch. +fn current_timestamp() -> Duration { + SystemTime::now().duration_since(UNIX_EPOCH).expect("Unable to get current UNIX time") +} diff --git a/crates/composer/src/jukebox.rs b/crates/composer/src/jukebox.rs index e77d7ee..0c05f27 100644 --- a/crates/composer/src/jukebox.rs +++ b/crates/composer/src/jukebox.rs @@ -1,7 +1,8 @@ +use crate::audio_output::AudioOutput; use eyre::{Context, Result}; use rodio::{ source::{Buffered, SamplesConverter}, - Decoder, OutputStreamHandle, Source, + Decoder, Source, }; use std::{collections::HashMap, fs::File, io::BufReader, path::Path}; @@ -48,15 +49,12 @@ impl Jukebox { Ok(Self { samples }) } - pub(crate) fn play(&self, output_stream: &OutputStreamHandle, sample: Sample) -> Result<()> { + pub(crate) fn play(&self, audio_output: &AudioOutput, sample: Sample) { let buffer = self .samples .get(&sample) .expect("programmer error, all possible samples should be loaded"); - output_stream - .play_raw(buffer.clone()) - .with_context(|| format!("playing sample {sample:?}"))?; - Ok(()) + audio_output.play(buffer.clone()); } } diff --git a/crates/composer/src/main.rs b/crates/composer/src/main.rs index 0034e72..6051a33 100644 --- a/crates/composer/src/main.rs +++ b/crates/composer/src/main.rs @@ -1,15 +1,18 @@ #![warn(clippy::all, clippy::clone_on_ref_ptr)] -use crate::jukebox::{Jukebox, Sample}; +use crate::{ + audio_output::AudioOutput, + jukebox::{Jukebox, Sample}, +}; use clap::Parser; use composer_api::{Event, DEFAULT_SERVER_ADDRESS}; use eyre::{Context, Result}; -use rodio::{OutputStream, OutputStreamHandle}; use std::{ net::UdpSocket, time::{Duration, Instant}, }; +mod audio_output; mod jukebox; #[derive(Parser, Debug)] @@ -17,6 +20,11 @@ mod jukebox; struct Args { /// the address to listen on for incoming events address: Option, + + /// Delay event timestamps by this amount during playback. Should be larger than audio buffer + /// period time plus the sound card latency. + #[arg(short, long, default_value_t = 200)] + delay_ms: u64, } fn main() -> Result<()> { @@ -27,13 +35,13 @@ fn main() -> Result<()> { let socket = UdpSocket::bind(args.address.as_deref().unwrap_or(DEFAULT_SERVER_ADDRESS))?; println!("Listening on {}", socket.local_addr()?); - let (_stream, stream_handle) = OutputStream::try_default()?; + let audio_output = AudioOutput::new(Duration::from_millis(args.delay_ms))?; let jukebox = Jukebox::new().context("creating jukebox")?; let mut stats = Stats { since: Instant::now(), events: 0, total_bytes: 0 }; loop { - match handle_datagram(&socket, &stream_handle, &jukebox) { - Ok(bytes_received) => stats.record_event(bytes_received), + match handle_datagram(&socket, &audio_output, &jukebox) { + Ok(bytes_received) => stats.record_event(bytes_received, &audio_output), Err(err) => eprintln!("Could not process datagram. Ignoring and continuing. {:?}", err), } } @@ -42,7 +50,7 @@ fn main() -> Result<()> { /// Block until next datagram is received and handle it. Returns its size in bytes. fn handle_datagram( socket: &UdpSocket, - output_stream: &OutputStreamHandle, + audio_output: &AudioOutput, jukebox: &Jukebox, ) -> Result { // Size up to max normal network packet size @@ -57,7 +65,7 @@ fn handle_datagram( // TODO(Matej): add different sounds for these, and vary some their quality based on length. Event::StderrWrite { length: _ } | Event::StdoutWrite { length: _ } => Sample::Click, }; - jukebox.play(output_stream, sample)?; + jukebox.play(audio_output, sample); Ok(number_of_bytes) } @@ -71,15 +79,17 @@ struct Stats { impl Stats { const REPORT_EVERY: Duration = Duration::from_secs(1); - fn record_event(&mut self, bytes_received: usize) { + fn record_event(&mut self, bytes_received: usize, audio_output: &AudioOutput) { self.events += 1; self.total_bytes += bytes_received; let elapsed = self.since.elapsed(); if elapsed >= Self::REPORT_EVERY { println!( - "Received {} events ({} bytes) in last {elapsed:.2?}.", - self.events, self.total_bytes + "Received {} events ({} bytes) in last {elapsed:.2?}, {} too early plays.", + self.events, + self.total_bytes, + audio_output.fetch_too_early_plays(), ); self.since = Instant::now();