Skip to content

Commit

Permalink
Handle timeout in the client IO task (#1109)
Browse files Browse the repository at this point in the history
This PR moves the timeout into the client IO task, for consistency with other
forms of disconnection.

Previously, the main Upstairs event loop would notice the timeout, then tell the
client task to stop; this was a roundabout way of doing things led to stuff like
the panic seen in #1103.

For symmetry, we also move the ping task into the client IO task, since there's
no reason to bother the main event loop about it: we always want to be pinging
periodically, as long as the client IO task is running.
  • Loading branch information
mkeeter authored Jan 24, 2024
1 parent a61bbb0 commit da60a6d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 100 deletions.
143 changes: 64 additions & 79 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use slog::{debug, error, info, o, warn, Logger};
use tokio::{
net::{TcpSocket, TcpStream},
sync::{mpsc, oneshot},
time::{sleep_until, Instant},
time::sleep_until,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use uuid::Uuid;
Expand Down Expand Up @@ -135,15 +135,6 @@ pub(crate) struct DownstairsClient {
*/
pub(crate) repair_info: Option<ExtentInfo>,

/// Deadline for the next ping
ping_interval: Instant,

/// Deadline until we mark the client as dead
timeout_deadline: Instant,

/// Ping every 10 seconds if things are idle
ping_count: u64,

/// Accumulated statistics
pub(crate) stats: DownstairsStats,

Expand Down Expand Up @@ -175,11 +166,8 @@ impl DownstairsClient {
client_id,
region_uuid: None,
negotiation_state: NegotiationState::Start,
ping_count: 0,
ping_interval: deadline_secs(PING_INTERVAL_SECS),
tls_context,
promote_state: None,
timeout_deadline: deadline_secs(TIMEOUT_SECS),
log,
target_addr,
repair_addr: None,
Expand Down Expand Up @@ -214,11 +202,8 @@ impl DownstairsClient {
client_id: ClientId::new(0),
region_uuid: None,
negotiation_state: NegotiationState::Start,
ping_count: 0,
ping_interval: deadline_secs(PING_INTERVAL_SECS),
tls_context: None,
promote_state: None,
timeout_deadline: deadline_secs(TIMEOUT_SECS),
log: crucible_common::build_logger(),
target_addr: None,
repair_addr: None,
Expand Down Expand Up @@ -256,12 +241,6 @@ impl DownstairsClient {
None => ClientAction::ChannelClosed,
}
}
_ = sleep_until(self.ping_interval) => {
ClientAction::Ping
}
_ = sleep_until(self.timeout_deadline) => {
ClientAction::Timeout
}
}
}

Expand All @@ -279,19 +258,6 @@ impl DownstairsClient {
.await;
}

/// If the client task is running, send a `Message::Ruok`
///
/// If the client task is **not** running, log a warning to that effect.
pub(crate) async fn send_ping(&mut self) {
self.ping_interval = deadline_secs(PING_INTERVAL_SECS);
// It's possible for the client task to have stopped after we requested
// the ping. If that's the case, then we'll catch it on the next
// go-around, and should just log an error here.
self.send(Message::Ruok).await;
self.ping_count += 1;
cdt::ds__ping__sent!(|| (self.ping_count, self.client_id.get()));
}

pub(crate) fn halt_io_task(&mut self, r: ClientStopReason) {
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
Expand Down Expand Up @@ -601,7 +567,6 @@ impl DownstairsClient {
self.tls_context.clone(),
&self.log,
);
self.reset_timeout();
}

fn new_io_task(
Expand Down Expand Up @@ -1162,11 +1127,6 @@ impl DownstairsClient {
self.stats.live_repair_completed += 1;
}

/// Resets our timeout deadline
pub(crate) fn reset_timeout(&mut self) {
self.timeout_deadline = deadline_secs(TIMEOUT_SECS);
}

/// Handles a single IO operation
///
/// Returns `true` if the job is now ackable, `false` otherwise
Expand Down Expand Up @@ -2203,12 +2163,6 @@ pub(crate) enum ClientAction {
/// The client task has stopped
TaskStopped(ClientRunResult),

/// It's time to ping the client
Ping,

/// The client has hit a (Crucible) timeout
Timeout,

/// The client IO channel has returned `None`
///
/// This should never happen during normal operation, because
Expand Down Expand Up @@ -2256,9 +2210,6 @@ pub(crate) struct DownstairsStats {
/// When the upstairs halts the IO client task, it must provide a reason
#[derive(Debug)]
pub(crate) enum ClientStopReason {
/// Crucible-level timeout (i.e. no packets received in too long)
Timeout,

/// We are about to replace the client task
Replacing,

Expand Down Expand Up @@ -2317,6 +2268,8 @@ pub(crate) enum ClientRunResult {
ConnectionTimeout,
/// We failed to make the initial connection
ConnectionFailed(std::io::Error),
/// We experienced a timeout after connecting
Timeout,
/// A socket write failed
WriteFailed(anyhow::Error),
/// We received an error while reading from the connection
Expand Down Expand Up @@ -2461,10 +2414,11 @@ async fn client_run_inner(
WrappedStream::Http(tcp)
}
};
proc_stream(tcp, rx, stop, tx, log).await
proc_stream(client_id, tcp, rx, stop, tx, log).await
}

async fn proc_stream(
client_id: ClientId,
stream: WrappedStream,
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
Expand All @@ -2478,43 +2432,35 @@ async fn proc_stream(
let fr = FramedRead::new(read, CrucibleDecoder::new());
let fw = FramedWrite::new(write, CrucibleEncoder::new());

cmd_loop(rx, stop, tx, fr, fw, log).await
cmd_loop(client_id, rx, stop, tx, fr, fw, log).await
}
WrappedStream::Https(stream) => {
let (read, write) = tokio::io::split(stream);

let fr = FramedRead::new(read, CrucibleDecoder::new());
let fw = FramedWrite::new(write, CrucibleEncoder::new());

cmd_loop(rx, stop, tx, fr, fw, log).await
cmd_loop(client_id, rx, stop, tx, fr, fw, log).await
}
}
}

async fn cmd_loop<R, W>(
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
tx: &mpsc::Sender<ClientResponse>,
async fn rx_loop<R>(
tx: mpsc::Sender<ClientResponse>,
mut fr: FramedRead<R, crucible_protocol::CrucibleDecoder>,
mut fw: FramedWrite<W, crucible_protocol::CrucibleEncoder>,
log: &Logger,
log: Logger,
) -> ClientRunResult
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static,
{
tx.send(ClientResponse::Connected)
.await
.expect("client_response_tx closed unexpectedly");

let mut recv_task = {
let tx = tx.clone();
let log = log.clone();

tokio::spawn(async move {
while let Some(f) = fr.next().await {
let mut timeout = deadline_secs(TIMEOUT_SECS);
loop {
tokio::select! {
f = fr.next() => {
match f {
Ok(m) => {
Some(Ok(m)) => {
// reset the timeout, since we've received a message
timeout = deadline_secs(TIMEOUT_SECS);
if let Err(e) =
tx.send(ClientResponse::Message(m)).await
{
Expand All @@ -2523,21 +2469,50 @@ where
"client response queue closed unexpectedly: \
{e}; is the program exiting?"
);
return ClientRunResult::QueueClosed;
break ClientRunResult::QueueClosed;
}
}
Err(e) => {
Some(Err(e)) => {
warn!(log, "downstairs client error {e}");
return ClientRunResult::ReadFailed(e);
break ClientRunResult::ReadFailed(e);
}
None => {
warn!(log, "downstairs disconnected");
break ClientRunResult::Finished;
}
}
}
// Downstairs disconnected
warn!(log, "downstairs disconnected");
ClientRunResult::Finished
})
};
_ = sleep_until(timeout) => {
warn!(log, "downstairs timed out");
break ClientRunResult::Timeout;
}
}
}
}

async fn cmd_loop<R, W>(
client_id: ClientId,
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
tx: &mpsc::Sender<ClientResponse>,
fr: FramedRead<R, crucible_protocol::CrucibleDecoder>,
mut fw: FramedWrite<W, crucible_protocol::CrucibleEncoder>,
log: &Logger,
) -> ClientRunResult
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static,
{
tx.send(ClientResponse::Connected)
.await
.expect("client_response_tx closed unexpectedly");

// Spawn a separate task to receive data over the network, so that we can
// always make progress and keep the socket buffer from filling up.
let mut recv_task = tokio::spawn(rx_loop(tx.clone(), fr, log.clone()));

let mut ping_interval = deadline_secs(PING_INTERVAL_SECS);
let mut ping_count = 0u64;
loop {
tokio::select! {
join_result = &mut recv_task => {
Expand All @@ -2559,6 +2534,16 @@ where
}
}

_ = sleep_until(ping_interval) => {
ping_interval = deadline_secs(PING_INTERVAL_SECS);
ping_count += 1;
cdt::ds__ping__sent!(|| (ping_count, client_id.get()));

if let Err(e) = fw.send(Message::Ruok).await {
break ClientRunResult::WriteFailed(e);
}
}

s = &mut *stop => {
match s {
Ok(s) => {
Expand Down
23 changes: 2 additions & 21 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Data structures specific to Crucible's `struct Upstairs`
use crate::{
cdt,
client::{ClientAction, ClientRunResult, ClientStopReason},
client::{ClientAction, ClientRunResult},
control::ControlRequest,
deadline_secs,
deferred::{
Expand Down Expand Up @@ -1518,27 +1518,7 @@ impl Upstairs {
self.downstairs.clients[client_id].stats.connected += 1;
self.downstairs.clients[client_id].send_here_i_am().await;
}
ClientAction::Ping => {
self.downstairs.clients[client_id].send_ping().await;
}
ClientAction::Timeout => {
// Ask the downstairs client task to stop, because the client
// has hit a Crucible timeout.
//
// This will come back to `TaskStopped`, at which point we'll
// clear out the task and restart it.
//
// We need to reset the timeout, because otherwise it will keep
// firing and will monopolize the future.
let c = &mut self.downstairs.clients[client_id];
c.reset_timeout();
c.halt_io_task(ClientStopReason::Timeout);
}
ClientAction::Response(m) => {
// We have received a message, so reset the timeout watchdog for
// this particular client.
self.downstairs.clients[client_id].reset_timeout();

// Defer the message if it's a (large) read that needs
// decryption, or there are other deferred messages in the queue
// (to preserve order). Otherwise, handle it immediately.
Expand Down Expand Up @@ -2075,6 +2055,7 @@ impl Upstairs {
pub(crate) mod test {
use super::*;
use crate::{
client::ClientStopReason,
downstairs::test::set_all_active,
test::{make_encrypted_upstairs, make_upstairs},
BlockContext, BlockReq, BlockReqWaiter, DsState, JobId,
Expand Down

0 comments on commit da60a6d

Please sign in to comment.