Skip to content

Commit

Permalink
Per client, queue-based backpressure (#1186)
Browse files Browse the repository at this point in the history
This PR adds per-client delays to prevent queues from becoming arbitrarily long
during read-heavy workflows. These delays are based either per-client queue
lengths and bytes in flight (whichever is worst compared to the slowest client),
which is analogous to the guest backpressure implementation.

Compared to main, this keeps the queues relatively bounded, and is at least
similar in performance (hard to tell, because there significant run-to-run
variability).

Fixes #1167, and is an alternative to #1181
  • Loading branch information
mkeeter authored Mar 2, 2024
1 parent fbe6f12 commit 1c1574f
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 6 deletions.
100 changes: 94 additions & 6 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ use crate::{
use crucible_common::x509::TLSContext;
use crucible_protocol::{ReconciliationId, CRUCIBLE_MESSAGE_VERSION};

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use std::{
collections::BTreeSet,
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use futures::StreamExt;
use slog::{debug, error, info, o, warn, Logger};
use tokio::{
io::AsyncWriteExt,
net::{TcpSocket, TcpStream},
sync::{mpsc, oneshot},
time::sleep_until,
time::{sleep_until, Duration},
};
use tokio_util::codec::{Encoder, FramedRead};
use uuid::Uuid;
Expand Down Expand Up @@ -154,6 +161,12 @@ pub(crate) struct DownstairsClient {
/// IO state counters
pub(crate) io_state_count: ClientIOStateCount,

/// Bytes in queues for this client
///
/// This includes read, write, and write-unwritten jobs, and is used to
/// estimate per-client backpressure to keep the 3x downstairs in sync.
pub(crate) bytes_outstanding: u64,

/// UUID for this downstairs region
///
/// Unpopulated until provided by `Message::RegionInfo`
Expand Down Expand Up @@ -216,6 +229,9 @@ pub(crate) struct DownstairsClient {

/// Session ID for a clients connection to a downstairs.
connection_id: ConnectionId,

/// Per-client delay, shared with the [`DownstairsClient`]
client_delay_us: Arc<AtomicU64>,
}

impl DownstairsClient {
Expand All @@ -226,6 +242,7 @@ impl DownstairsClient {
log: Logger,
tls_context: Option<Arc<crucible_common::x509::TLSContext>>,
) -> Self {
let client_delay_us = Arc::new(AtomicU64::new(0));
Self {
cfg,
client_task: Self::new_io_task(
Expand All @@ -234,6 +251,7 @@ impl DownstairsClient {
false, // do not start the task until GoActive
client_id,
tls_context.clone(),
client_delay_us.clone(),
&log,
),
client_id,
Expand All @@ -252,7 +270,9 @@ impl DownstairsClient {
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
bytes_outstanding: 0,
connection_id: ConnectionId(0),
client_delay_us,
}
}

Expand All @@ -262,6 +282,7 @@ impl DownstairsClient {
/// client will disappear into the void.
#[cfg(test)]
fn test_default() -> Self {
let client_delay_us = Arc::new(AtomicU64::new(0));
let cfg = Arc::new(UpstairsConfig {
encryption_context: None,
upstairs_id: Uuid::new_v4(),
Expand Down Expand Up @@ -289,7 +310,9 @@ impl DownstairsClient {
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
bytes_outstanding: 0,
connection_id: ConnectionId(0),
client_delay_us,
}
}

Expand Down Expand Up @@ -407,9 +430,26 @@ impl DownstairsClient {
job: &mut DownstairsIO,
new_state: IOState,
) -> IOState {
let is_running =
matches!(new_state, IOState::New | IOState::InProgress);
self.io_state_count.incr(&new_state);
let old_state = job.state.insert(self.client_id, new_state);
let was_running =
matches!(old_state, IOState::New | IOState::InProgress);
self.io_state_count.decr(&old_state);

// Update our bytes-in-flight counter
if was_running && !is_running {
self.bytes_outstanding = self
.bytes_outstanding
.checked_sub(job.work.job_bytes())
.unwrap();
} else if is_running && !was_running {
// This should only happen if a job is replayed, but that still
// counts!
self.bytes_outstanding += job.work.job_bytes();
}

old_state
}

Expand Down Expand Up @@ -471,8 +511,12 @@ impl DownstairsClient {
}

/// Sets this job as skipped and moves it to `skipped_jobs`
///
/// # Panics
/// If the job is not new or in-progress
pub(crate) fn skip_job(&mut self, job: &mut DownstairsIO) {
self.set_job_state(job, IOState::Skipped);
let prev_state = self.set_job_state(job, IOState::Skipped);
assert!(matches!(prev_state, IOState::New | IOState::InProgress));
self.skipped_jobs.insert(job.ds_id);
}

Expand Down Expand Up @@ -628,6 +672,7 @@ impl DownstairsClient {
}

self.connection_id.update();

// Restart with a short delay
self.start_task(true, auto_promote);
}
Expand All @@ -652,6 +697,7 @@ impl DownstairsClient {
connect,
self.client_id,
self.tls_context.clone(),
self.client_delay_us.clone(),
&self.log,
);
}
Expand All @@ -662,6 +708,7 @@ impl DownstairsClient {
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
client_delay_us: Arc<AtomicU64>,
log: &Logger,
) -> ClientTaskHandle {
#[cfg(test)]
Expand All @@ -672,6 +719,7 @@ impl DownstairsClient {
connect,
client_id,
tls_context,
client_delay_us,
log,
)
} else {
Expand All @@ -685,6 +733,7 @@ impl DownstairsClient {
connect,
client_id,
tls_context,
client_delay_us,
log,
)
}
Expand All @@ -695,6 +744,7 @@ impl DownstairsClient {
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
client_delay_us: Arc<AtomicU64>,
log: &Logger,
) -> ClientTaskHandle {
// These channels must support at least MAX_ACTIVE_COUNT messages;
Expand Down Expand Up @@ -730,6 +780,7 @@ impl DownstairsClient {
log: log.clone(),
},
delay,
client_delay_us,
log,
};
c.run().await
Expand Down Expand Up @@ -945,6 +996,9 @@ impl DownstairsClient {
IOState::New
}
};
if r == IOState::New {
self.bytes_outstanding += io.work.job_bytes();
}
self.io_state_count.incr(&r);
r
}
Expand Down Expand Up @@ -1261,9 +1315,8 @@ impl DownstairsClient {
}
};

let old_state = job.state.insert(self.client_id, new_state.clone());
self.io_state_count.decr(&old_state);
self.io_state_count.incr(&new_state);
// Update the state, maintaining various counters
let old_state = self.set_job_state(job, new_state.clone());

/*
* Verify the job was InProgress
Expand Down Expand Up @@ -2209,6 +2262,10 @@ impl DownstairsClient {
(self.io_state_count.new + self.io_state_count.in_progress) as usize
}

pub(crate) fn total_bytes_outstanding(&self) -> usize {
self.bytes_outstanding as usize
}

/// Returns a unique ID for the current connection, or `None`
///
/// This can be used to disambiguate between messages returned from
Expand All @@ -2220,6 +2277,16 @@ impl DownstairsClient {
None
}
}

/// Sets the per-client delay
pub(crate) fn set_delay_us(&self, delay: u64) {
self.client_delay_us.store(delay, Ordering::Relaxed);
}

/// Looks up the per-client delay
pub(crate) fn get_delay_us(&self) -> u64 {
self.client_delay_us.load(Ordering::Relaxed)
}
}

/// How to handle "promote to active" requests
Expand Down Expand Up @@ -2420,6 +2487,9 @@ struct ClientIoTask {
/// Handle for the rx task
recv_task: ClientRxTask,

/// Shared handle to receive per-client backpressure delay
client_delay_us: Arc<AtomicU64>,

log: Logger,
}

Expand Down Expand Up @@ -2692,6 +2762,24 @@ impl ClientIoTask {
+ std::marker::Send
+ 'static,
{
// Delay communication with this client based on backpressure, to keep
// the three clients relatively in sync with each other.
//
// We don't need to delay writes, because they're already constrained by
// the global backpressure system and cannot build up an unbounded
// queue. This is admittedly quite subtle; see crucible#1167 for
// discussions and graphs.
if !matches!(
m,
ClientRequest::Message(Message::Write { .. })
| ClientRequest::RawMessage(RawMessage::Write { .. }, ..)
) {
let d = self.client_delay_us.load(Ordering::Relaxed);
if d > 0 {
tokio::time::sleep(Duration::from_micros(d)).await;
}
}

// There's some duplication between this function and `cmd_loop` above,
// but it's not obvious whether there's a cleaner way to organize stuff.
tokio::select! {
Expand Down
Loading

0 comments on commit 1c1574f

Please sign in to comment.