Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drive audio directly using cpal, position sounds precisely within callbacks #36

Merged
merged 4 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ bincode = "1"
clap = { version = "4.2", features = ["derive"] }
color-eyre = "0.6"
composer_api = { path = "../composer_api" }
cpal = "0.15"
eyre = "0.6"
rodio = { version = "0.17", features = ["symphonia-wav"] }
150 changes: 150 additions & 0 deletions crates/composer/src/audio_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use cpal::{
traits::{DeviceTrait, HostTrait},
OutputCallbackInfo, OutputStreamTimestamp, SampleFormat,
};
use eyre::{bail, eyre, Result};
use rodio::{
dynamic_mixer::{DynamicMixer, DynamicMixerController},
Source,
};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender, TryRecvError},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

pub(crate) struct AudioOutput {
source_tx: Sender<TimedSource>,
play_delay: Duration,
too_early_plays: Arc<AtomicU64>,
_stream: cpal::Stream,
}

/// Abstraction to actually produce sound using the [AudioOutput::play()] method.
/// Uses `cpal` and `rodio` behind the curtains. Great care is taken to position played samples
/// precisely in time so that sound superposition works well even at high frequencies.
/// Playback stops when this struct is dropped.
impl AudioOutput {
pub(crate) fn new(play_delay: Duration) -> Result<Self> {
let cpal_device = cpal::default_host()
.default_output_device()
.ok_or_else(|| eyre!("no cpal audio output device found"))?;
let supported_config = cpal_device.default_output_config()?;
let stream_config = supported_config.config();
println!(
"Using audio device '{}', supported config {:?}, stream config {:?}.",
cpal_device.name()?,
supported_config,
stream_config,
);
if supported_config.sample_format() != SampleFormat::F32 {
bail!("Only F32 sample format supported for now.");
}

let (mixer_controller, mixer) =
rodio::dynamic_mixer::mixer::<f32>(stream_config.channels, stream_config.sample_rate.0);

let (source_tx, source_rx) = channel();

let too_early_plays = Arc::default();

// The mixer_controller can be shared between threads, but we want to precisely control
// when we add new sources w.r.t. the audio callback, so we move it to the audio thread and
// use a mpsc channel to send new sources to the audio thread.
let mut audio_callback =
AudioCallback::new(mixer_controller, mixer, source_rx, &too_early_plays);
let _stream = cpal_device.build_output_stream::<f32, _, _>(
&stream_config,
move |data_out, info| audio_callback.fill_data(data_out, info),
|err| eprintln!("Got cpal stream error callback: {err}."),
None,
)?;

Ok(Self { source_tx, play_delay, too_early_plays, _stream })
}

pub(crate) fn play<S: Source<Item = f32> + Send + 'static>(&self, source: S) {
// TODO(Matej): use timestamp from the event itself once we have it.
let play_at_timestamp = current_timestamp() + self.play_delay;

// TODO(Matej): we are in fact double-boxing because DynamicMixerController internally adds
// another box. But we need a sized type to send it through threads. We could make this
// method non-generic, but that would be less flexible, so just accept it for now.
let source = Box::new(source);

self.source_tx
.send(TimedSource { source, play_at_timestamp })
.expect("source receiver should be still alive");
}

/// Get "too early plays" counter since the last call of this method.
pub(crate) fn fetch_too_early_plays(&self) -> u64 {
self.too_early_plays.swap(0, Ordering::SeqCst)
}
}

/// An f32 [rodio::source::Source] with UNIX timestamp of desired play time attached.
struct TimedSource {
source: Box<dyn Source<Item = f32> + Send + 'static>,
play_at_timestamp: Duration,
}

/// A sort of manual implementation of the closure used as cpal audio data callback, for tidiness.
struct AudioCallback {
mixer_controller: Arc<DynamicMixerController<f32>>,
mixer: DynamicMixer<f32>,
source_rx: Receiver<TimedSource>,
too_early_plays: Arc<AtomicU64>,
}

impl AudioCallback {
fn new(
mixer_controller: Arc<DynamicMixerController<f32>>,
mixer: DynamicMixer<f32>,
source_rx: Receiver<TimedSource>,
too_early_plays: &Arc<AtomicU64>,
) -> Self {
let too_early_plays = Arc::clone(too_early_plays);
Self { mixer_controller, mixer, source_rx, too_early_plays }
}

fn fill_data(&mut self, data_out: &mut [f32], info: &OutputCallbackInfo) {
let now = current_timestamp();
// cpal gives us two timestamps that cannot be compared to unix time directly as they have a different epoch.
// However subtracting them gives us the duration between when we were called (i.e. now) and when the buffer
// we produce will be played.
let OutputStreamTimestamp { playback, callback } = info.timestamp();
let playback_delay =
playback.duration_since(&callback).expect("playback shouldn't be planned in past");
// ...and by adding it to current unix timestamp we get a unix timestamp of the instant the buffer will be played.
let playback_unix_timestamp = now + playback_delay;

// Add possible new sources to the list
loop {
match self.source_rx.try_recv() {
Ok(timed_source) => {
let delay = timed_source
.play_at_timestamp
.checked_sub(playback_unix_timestamp)
.unwrap_or_else(|| {
self.too_early_plays.fetch_add(1, Ordering::SeqCst);
Duration::ZERO
});
self.mixer_controller.add(timed_source.source.delay(delay));
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("source sender should be still alive"),
}
}

data_out.iter_mut().for_each(|d| *d = self.mixer.next().unwrap_or(0f32))
}
}

// Get current timestamp as the `Duration` since the UNIX epoch.
fn current_timestamp() -> Duration {
SystemTime::now().duration_since(UNIX_EPOCH).expect("Unable to get current UNIX time")
}
10 changes: 4 additions & 6 deletions crates/composer/src/jukebox.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::audio_output::AudioOutput;
use eyre::{Context, Result};
use rodio::{
source::{Buffered, SamplesConverter},
Decoder, OutputStreamHandle, Source,
Decoder, Source,
};
use std::{collections::HashMap, fs::File, io::BufReader, path::Path};

Expand Down Expand Up @@ -48,15 +49,12 @@ impl Jukebox {
Ok(Self { samples })
}

pub(crate) fn play(&self, output_stream: &OutputStreamHandle, sample: Sample) -> Result<()> {
pub(crate) fn play(&self, audio_output: &AudioOutput, sample: Sample) {
let buffer = self
.samples
.get(&sample)
.expect("programmer error, all possible samples should be loaded");

output_stream
.play_raw(buffer.clone())
.with_context(|| format!("playing sample {sample:?}"))?;
Ok(())
audio_output.play(buffer.clone());
}
}
30 changes: 20 additions & 10 deletions crates/composer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
#![warn(clippy::all, clippy::clone_on_ref_ptr)]

use crate::jukebox::{Jukebox, Sample};
use crate::{
audio_output::AudioOutput,
jukebox::{Jukebox, Sample},
};
use clap::Parser;
use composer_api::{Event, DEFAULT_SERVER_ADDRESS};
use eyre::{Context, Result};
use rodio::{OutputStream, OutputStreamHandle};
use std::{
net::UdpSocket,
time::{Duration, Instant},
};

mod audio_output;
mod jukebox;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// the address to listen on for incoming events
address: Option<String>,

/// Delay event timestamps by this amount during playback. Should be larger than audio buffer
/// period time plus the sound card latency.
#[arg(short, long, default_value_t = 200)]
delay_ms: u64,
}

fn main() -> Result<()> {
Expand All @@ -27,13 +35,13 @@ fn main() -> Result<()> {
let socket = UdpSocket::bind(args.address.as_deref().unwrap_or(DEFAULT_SERVER_ADDRESS))?;
println!("Listening on {}", socket.local_addr()?);

let (_stream, stream_handle) = OutputStream::try_default()?;
let audio_output = AudioOutput::new(Duration::from_millis(args.delay_ms))?;

let jukebox = Jukebox::new().context("creating jukebox")?;
let mut stats = Stats { since: Instant::now(), events: 0, total_bytes: 0 };
loop {
match handle_datagram(&socket, &stream_handle, &jukebox) {
Ok(bytes_received) => stats.record_event(bytes_received),
match handle_datagram(&socket, &audio_output, &jukebox) {
Ok(bytes_received) => stats.record_event(bytes_received, &audio_output),
Err(err) => eprintln!("Could not process datagram. Ignoring and continuing. {:?}", err),
}
}
Expand All @@ -42,7 +50,7 @@ fn main() -> Result<()> {
/// Block until next datagram is received and handle it. Returns its size in bytes.
fn handle_datagram(
socket: &UdpSocket,
output_stream: &OutputStreamHandle,
audio_output: &AudioOutput,
jukebox: &Jukebox,
) -> Result<usize> {
// Size up to max normal network packet size
Expand All @@ -57,7 +65,7 @@ fn handle_datagram(
// TODO(Matej): add different sounds for these, and vary some their quality based on length.
Event::StderrWrite { length: _ } | Event::StdoutWrite { length: _ } => Sample::Click,
};
jukebox.play(output_stream, sample)?;
jukebox.play(audio_output, sample);

Ok(number_of_bytes)
}
Expand All @@ -71,15 +79,17 @@ struct Stats {
impl Stats {
const REPORT_EVERY: Duration = Duration::from_secs(1);

fn record_event(&mut self, bytes_received: usize) {
fn record_event(&mut self, bytes_received: usize, audio_output: &AudioOutput) {
self.events += 1;
self.total_bytes += bytes_received;

let elapsed = self.since.elapsed();
if elapsed >= Self::REPORT_EVERY {
println!(
"Received {} events ({} bytes) in last {elapsed:.2?}.",
self.events, self.total_bytes
"Received {} events ({} bytes) in last {elapsed:.2?}, {} too early plays.",
self.events,
self.total_bytes,
audio_output.fetch_too_early_plays(),
);

self.since = Instant::now();
Expand Down