Skip to content

Commit

Permalink
Merge pull request #1234 from lelloman/lelloman/tunable-audio-fetch-p…
Browse files Browse the repository at this point in the history
…arams

Make audio fetch parameters tunable
  • Loading branch information
roderickvd authored Dec 24, 2023
2 parents a245a3c + e175a88 commit ccd1a72
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 59 deletions.
114 changes: 76 additions & 38 deletions audio/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Arc, OnceLock,
},
time::Duration,
};
Expand Down Expand Up @@ -55,42 +55,75 @@ impl From<AudioFileError> for Error {
}
}

/// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file.
/// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
/// Note: smaller requests can happen if part of the block is downloaded already.
pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024;

/// The minimum network throughput that we expect. Together with the minimum download size,
/// this will determine the time we will wait for a response.
pub const MINIMUM_THROUGHPUT: usize = 8 * 1024;

/// The ping time that is used for calculations before a ping time was actually measured.
pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);

/// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching.
pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
#[derive(Clone)]
pub struct AudioFetchParams {
/// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file.
/// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
/// Note: smaller requests can happen if part of the block is downloaded already.
pub minimum_download_size: usize,

/// The minimum network throughput that we expect. Together with the minimum download size,
/// this will determine the time we will wait for a response.
pub minimum_throughput: usize,

/// The ping time that is used for calculations before a ping time was actually measured.
pub initial_ping_time_estimate: Duration,

/// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching.
pub maximum_assumed_ping_time: Duration,

/// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub read_ahead_before_playback: Duration,

/// While playing back, this many seconds of data ahead of the current read position are
/// requested.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub read_ahead_during_playback: Duration,

/// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
pub prefetch_threshold_factor: f32,

/// The time we will wait to obtain status updates on downloading.
pub download_timeout: Duration,
}

/// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1);
impl Default for AudioFetchParams {
fn default() -> Self {
let minimum_download_size = 64 * 1024;
let minimum_throughput = 8 * 1024;
Self {
minimum_download_size,
minimum_throughput,
initial_ping_time_estimate: Duration::from_millis(500),
maximum_assumed_ping_time: Duration::from_millis(1500),
read_ahead_before_playback: Duration::from_secs(1),
read_ahead_during_playback: Duration::from_secs(5),
prefetch_threshold_factor: 4.0,
download_timeout: Duration::from_secs(
(minimum_download_size / minimum_throughput) as u64,
),
}
}
}

/// While playing back, this many seconds of data ahead of the current read position are
/// requested.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5);
static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();

/// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
impl AudioFetchParams {
pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
AUDIO_FETCH_PARAMS.set(params)
}

/// The time we will wait to obtain status updates on downloading.
pub const DOWNLOAD_TIMEOUT: Duration =
Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64);
pub fn get() -> &'static AudioFetchParams {
AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
}
}

pub enum AudioFile {
Cached(fs::File),
Expand Down Expand Up @@ -183,6 +216,7 @@ impl StreamLoaderController {

if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock();
let download_timeout = AudioFetchParams::get().download_timeout;

while range.length
> download_status
Expand All @@ -191,7 +225,7 @@ impl StreamLoaderController {
{
if shared
.cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(AudioFileError::WaitTimeout.into());
Expand Down Expand Up @@ -297,7 +331,7 @@ impl AudioFileShared {
if ping_time_ms > 0 {
Duration::from_millis(ping_time_ms as u64)
} else {
INITIAL_PING_TIME_ESTIMATE
AudioFetchParams::get().initial_ping_time_estimate
}
}

Expand Down Expand Up @@ -395,14 +429,16 @@ impl AudioFileStreaming {
trace!("Streaming from {}", url);
}

let minimum_download_size = AudioFetchParams::get().minimum_download_size;

// When the audio file is really small, this `download_size` may turn out to be
// larger than the audio file we're going to stream later on. This is OK; requesting
// `Content-Range` > `Content-Length` will return the complete file with status code
// 206 Partial Content.
let mut streamer =
session
.spclient()
.stream_from_cdn(&cdn_url, 0, MINIMUM_DOWNLOAD_SIZE)?;
.stream_from_cdn(&cdn_url, 0, minimum_download_size)?;

// Get the first chunk with the headers to get the file size.
// The remainder of that chunk with possibly also a response body is then
Expand Down Expand Up @@ -490,9 +526,10 @@ impl Read for AudioFileStreaming {
return Ok(0);
}

let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
let length_to_request = if self.shared.is_download_streaming() {
let length_to_request = length
+ (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
+ (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
as usize;

// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
Expand All @@ -515,11 +552,12 @@ impl Read for AudioFileStreaming {
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
}

let download_timeout = AudioFetchParams::get().download_timeout;
while !download_status.downloaded.contains(offset) {
if self
.shared
.cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(io::Error::new(
Expand Down
29 changes: 17 additions & 12 deletions audio/src/fetch/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use librespot_core::{http_client::HttpClient, session::Session, Error};
use crate::range_set::{Range, RangeSet};

use super::{
AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest,
MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT,
PREFETCH_THRESHOLD_FACTOR,
AudioFetchParams, AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand,
StreamingRequest,
};

struct PartialFileData {
Expand Down Expand Up @@ -151,6 +150,8 @@ struct AudioFileFetch {
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times: Vec<Duration>,

params: AudioFetchParams,
}

// Might be replaced by enum from std once stable
Expand All @@ -166,8 +167,8 @@ impl AudioFileFetch {
}

fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE;
if length < self.params.minimum_download_size {
length = self.params.minimum_download_size;
}

// If we are in streaming mode (so not seeking) then start downloading as large
Expand Down Expand Up @@ -258,13 +259,13 @@ impl AudioFileFetch {
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> {
match data {
ReceivedData::Throughput(mut throughput) => {
if throughput < MINIMUM_THROUGHPUT {
if throughput < self.params.minimum_throughput {
warn!(
"Throughput {} kbps lower than minimum {}, setting to minimum",
throughput / 1000,
MINIMUM_THROUGHPUT / 1000,
self.params.minimum_throughput / 1000,
);
throughput = MINIMUM_THROUGHPUT;
throughput = self.params.minimum_throughput;
}

let old_throughput = self.shared.throughput();
Expand All @@ -287,13 +288,13 @@ impl AudioFileFetch {
self.shared.set_throughput(avg_throughput);
}
ReceivedData::ResponseTime(mut response_time) => {
if response_time > MAXIMUM_ASSUMED_PING_TIME {
if response_time > self.params.maximum_assumed_ping_time {
warn!(
"Time to first byte {} ms exceeds maximum {}, setting to maximum",
response_time.as_millis(),
MAXIMUM_ASSUMED_PING_TIME.as_millis()
self.params.maximum_assumed_ping_time.as_millis()
);
response_time = MAXIMUM_ASSUMED_PING_TIME;
response_time = self.params.maximum_assumed_ping_time;
}

let old_ping_time_ms = self.shared.ping_time().as_millis();
Expand Down Expand Up @@ -423,6 +424,8 @@ pub(super) async fn audio_file_fetch(
initial_request,
));

let params = AudioFetchParams::get();

let mut fetch = AudioFileFetch {
session: session.clone(),
shared,
Expand All @@ -431,6 +434,8 @@ pub(super) async fn audio_file_fetch(
file_data_tx,
complete_tx: Some(complete_tx),
network_response_times: Vec::with_capacity(3),

params: params.clone(),
};

loop {
Expand Down Expand Up @@ -472,7 +477,7 @@ pub(super) async fn audio_file_fetch(
let throughput = fetch.shared.throughput();

let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR
(params.prefetch_threshold_factor
* ping_time_seconds
* fetch.shared.bytes_per_second as f32) as usize,
(ping_time_seconds * throughput as f32) as usize,
Expand Down
3 changes: 1 addition & 2 deletions audio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ mod fetch;
mod range_set;

pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileError, StreamLoaderController};
pub use fetch::{MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_DURING_PLAYBACK};
pub use fetch::{AudioFetchParams, AudioFile, AudioFileError, StreamLoaderController};
2 changes: 1 addition & 1 deletion playback/src/decoder/symphonia_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SymphoniaDecoder {
R: MediaSource + 'static,
{
let mss_opts = MediaSourceStreamOptions {
buffer_len: librespot_audio::MINIMUM_DOWNLOAD_SIZE,
buffer_len: librespot_audio::AudioFetchParams::get().minimum_download_size,
};
let mss = MediaSourceStream::new(Box::new(input), mss_opts);

Expand Down
10 changes: 4 additions & 6 deletions playback/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use symphonia::core::io::MediaSource;
use tokio::sync::{mpsc, oneshot};

use crate::{
audio::{
AudioDecrypt, AudioFile, StreamLoaderController, READ_AHEAD_BEFORE_PLAYBACK,
READ_AHEAD_DURING_PLAYBACK,
},
audio::{AudioDecrypt, AudioFetchParams, AudioFile, StreamLoaderController},
audio_backend::Sink,
config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig},
convert::Converter,
Expand Down Expand Up @@ -2223,13 +2220,14 @@ impl PlayerInternal {
..
} = self.state
{
let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
// Request our read ahead range
let request_data_length =
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize;
(read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;

// Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete.
let wait_for_data_length =
(READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize;
(read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;

stream_loader_controller
.fetch_next_and_wait(request_data_length, wait_for_data_length)
Expand Down

0 comments on commit ccd1a72

Please sign in to comment.