From da60a6d9cd242c1cb0688968263b8c840a3145e8 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Wed, 24 Jan 2024 14:53:42 -0500 Subject: [PATCH] Handle timeout in the client IO task (#1109) This PR moves the timeout into the client IO task, for consistency with other forms of disconnection. Previously, the main Upstairs event loop would notice the timeout, then tell the client task to stop; this was a roundabout way of doing things led to stuff like the panic seen in #1103. For symmetry, we also move the ping task into the client IO task, since there's no reason to bother the main event loop about it: we always want to be pinging periodically, as long as the client IO task is running. --- upstairs/src/client.rs | 143 ++++++++++++++++++--------------------- upstairs/src/upstairs.rs | 23 +------ 2 files changed, 66 insertions(+), 100 deletions(-) diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 05e45b763..5e77b0042 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -17,7 +17,7 @@ use slog::{debug, error, info, o, warn, Logger}; use tokio::{ net::{TcpSocket, TcpStream}, sync::{mpsc, oneshot}, - time::{sleep_until, Instant}, + time::sleep_until, }; use tokio_util::codec::{FramedRead, FramedWrite}; use uuid::Uuid; @@ -135,15 +135,6 @@ pub(crate) struct DownstairsClient { */ pub(crate) repair_info: Option, - /// Deadline for the next ping - ping_interval: Instant, - - /// Deadline until we mark the client as dead - timeout_deadline: Instant, - - /// Ping every 10 seconds if things are idle - ping_count: u64, - /// Accumulated statistics pub(crate) stats: DownstairsStats, @@ -175,11 +166,8 @@ impl DownstairsClient { client_id, region_uuid: None, negotiation_state: NegotiationState::Start, - ping_count: 0, - ping_interval: deadline_secs(PING_INTERVAL_SECS), tls_context, promote_state: None, - timeout_deadline: deadline_secs(TIMEOUT_SECS), log, target_addr, repair_addr: None, @@ -214,11 +202,8 @@ impl DownstairsClient { client_id: ClientId::new(0), region_uuid: None, negotiation_state: NegotiationState::Start, - ping_count: 0, - ping_interval: deadline_secs(PING_INTERVAL_SECS), tls_context: None, promote_state: None, - timeout_deadline: deadline_secs(TIMEOUT_SECS), log: crucible_common::build_logger(), target_addr: None, repair_addr: None, @@ -256,12 +241,6 @@ impl DownstairsClient { None => ClientAction::ChannelClosed, } } - _ = sleep_until(self.ping_interval) => { - ClientAction::Ping - } - _ = sleep_until(self.timeout_deadline) => { - ClientAction::Timeout - } } } @@ -279,19 +258,6 @@ impl DownstairsClient { .await; } - /// If the client task is running, send a `Message::Ruok` - /// - /// If the client task is **not** running, log a warning to that effect. - pub(crate) async fn send_ping(&mut self) { - self.ping_interval = deadline_secs(PING_INTERVAL_SECS); - // It's possible for the client task to have stopped after we requested - // the ping. If that's the case, then we'll catch it on the next - // go-around, and should just log an error here. - self.send(Message::Ruok).await; - self.ping_count += 1; - cdt::ds__ping__sent!(|| (self.ping_count, self.client_id.get())); - } - pub(crate) fn halt_io_task(&mut self, r: ClientStopReason) { if let Some(t) = self.client_task.client_stop_tx.take() { if let Err(_e) = t.send(r) { @@ -601,7 +567,6 @@ impl DownstairsClient { self.tls_context.clone(), &self.log, ); - self.reset_timeout(); } fn new_io_task( @@ -1162,11 +1127,6 @@ impl DownstairsClient { self.stats.live_repair_completed += 1; } - /// Resets our timeout deadline - pub(crate) fn reset_timeout(&mut self) { - self.timeout_deadline = deadline_secs(TIMEOUT_SECS); - } - /// Handles a single IO operation /// /// Returns `true` if the job is now ackable, `false` otherwise @@ -2203,12 +2163,6 @@ pub(crate) enum ClientAction { /// The client task has stopped TaskStopped(ClientRunResult), - /// It's time to ping the client - Ping, - - /// The client has hit a (Crucible) timeout - Timeout, - /// The client IO channel has returned `None` /// /// This should never happen during normal operation, because @@ -2256,9 +2210,6 @@ pub(crate) struct DownstairsStats { /// When the upstairs halts the IO client task, it must provide a reason #[derive(Debug)] pub(crate) enum ClientStopReason { - /// Crucible-level timeout (i.e. no packets received in too long) - Timeout, - /// We are about to replace the client task Replacing, @@ -2317,6 +2268,8 @@ pub(crate) enum ClientRunResult { ConnectionTimeout, /// We failed to make the initial connection ConnectionFailed(std::io::Error), + /// We experienced a timeout after connecting + Timeout, /// A socket write failed WriteFailed(anyhow::Error), /// We received an error while reading from the connection @@ -2461,10 +2414,11 @@ async fn client_run_inner( WrappedStream::Http(tcp) } }; - proc_stream(tcp, rx, stop, tx, log).await + proc_stream(client_id, tcp, rx, stop, tx, log).await } async fn proc_stream( + client_id: ClientId, stream: WrappedStream, rx: &mut mpsc::Receiver, stop: &mut oneshot::Receiver, @@ -2478,7 +2432,7 @@ async fn proc_stream( let fr = FramedRead::new(read, CrucibleDecoder::new()); let fw = FramedWrite::new(write, CrucibleEncoder::new()); - cmd_loop(rx, stop, tx, fr, fw, log).await + cmd_loop(client_id, rx, stop, tx, fr, fw, log).await } WrappedStream::Https(stream) => { let (read, write) = tokio::io::split(stream); @@ -2486,35 +2440,27 @@ async fn proc_stream( let fr = FramedRead::new(read, CrucibleDecoder::new()); let fw = FramedWrite::new(write, CrucibleEncoder::new()); - cmd_loop(rx, stop, tx, fr, fw, log).await + cmd_loop(client_id, rx, stop, tx, fr, fw, log).await } } } -async fn cmd_loop( - rx: &mut mpsc::Receiver, - stop: &mut oneshot::Receiver, - tx: &mpsc::Sender, +async fn rx_loop( + tx: mpsc::Sender, mut fr: FramedRead, - mut fw: FramedWrite, - log: &Logger, + log: Logger, ) -> ClientRunResult where R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static, - W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static, { - tx.send(ClientResponse::Connected) - .await - .expect("client_response_tx closed unexpectedly"); - - let mut recv_task = { - let tx = tx.clone(); - let log = log.clone(); - - tokio::spawn(async move { - while let Some(f) = fr.next().await { + let mut timeout = deadline_secs(TIMEOUT_SECS); + loop { + tokio::select! { + f = fr.next() => { match f { - Ok(m) => { + Some(Ok(m)) => { + // reset the timeout, since we've received a message + timeout = deadline_secs(TIMEOUT_SECS); if let Err(e) = tx.send(ClientResponse::Message(m)).await { @@ -2523,21 +2469,50 @@ where "client response queue closed unexpectedly: \ {e}; is the program exiting?" ); - return ClientRunResult::QueueClosed; + break ClientRunResult::QueueClosed; } } - Err(e) => { + Some(Err(e)) => { warn!(log, "downstairs client error {e}"); - return ClientRunResult::ReadFailed(e); + break ClientRunResult::ReadFailed(e); + } + None => { + warn!(log, "downstairs disconnected"); + break ClientRunResult::Finished; } } } - // Downstairs disconnected - warn!(log, "downstairs disconnected"); - ClientRunResult::Finished - }) - }; + _ = sleep_until(timeout) => { + warn!(log, "downstairs timed out"); + break ClientRunResult::Timeout; + } + } + } +} + +async fn cmd_loop( + client_id: ClientId, + rx: &mut mpsc::Receiver, + stop: &mut oneshot::Receiver, + tx: &mpsc::Sender, + fr: FramedRead, + mut fw: FramedWrite, + log: &Logger, +) -> ClientRunResult +where + R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static, + W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static, +{ + tx.send(ClientResponse::Connected) + .await + .expect("client_response_tx closed unexpectedly"); + // Spawn a separate task to receive data over the network, so that we can + // always make progress and keep the socket buffer from filling up. + let mut recv_task = tokio::spawn(rx_loop(tx.clone(), fr, log.clone())); + + let mut ping_interval = deadline_secs(PING_INTERVAL_SECS); + let mut ping_count = 0u64; loop { tokio::select! { join_result = &mut recv_task => { @@ -2559,6 +2534,16 @@ where } } + _ = sleep_until(ping_interval) => { + ping_interval = deadline_secs(PING_INTERVAL_SECS); + ping_count += 1; + cdt::ds__ping__sent!(|| (ping_count, client_id.get())); + + if let Err(e) = fw.send(Message::Ruok).await { + break ClientRunResult::WriteFailed(e); + } + } + s = &mut *stop => { match s { Ok(s) => { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 4522e2c7a..a06ac05aa 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -2,7 +2,7 @@ //! Data structures specific to Crucible's `struct Upstairs` use crate::{ cdt, - client::{ClientAction, ClientRunResult, ClientStopReason}, + client::{ClientAction, ClientRunResult}, control::ControlRequest, deadline_secs, deferred::{ @@ -1518,27 +1518,7 @@ impl Upstairs { self.downstairs.clients[client_id].stats.connected += 1; self.downstairs.clients[client_id].send_here_i_am().await; } - ClientAction::Ping => { - self.downstairs.clients[client_id].send_ping().await; - } - ClientAction::Timeout => { - // Ask the downstairs client task to stop, because the client - // has hit a Crucible timeout. - // - // This will come back to `TaskStopped`, at which point we'll - // clear out the task and restart it. - // - // We need to reset the timeout, because otherwise it will keep - // firing and will monopolize the future. - let c = &mut self.downstairs.clients[client_id]; - c.reset_timeout(); - c.halt_io_task(ClientStopReason::Timeout); - } ClientAction::Response(m) => { - // We have received a message, so reset the timeout watchdog for - // this particular client. - self.downstairs.clients[client_id].reset_timeout(); - // Defer the message if it's a (large) read that needs // decryption, or there are other deferred messages in the queue // (to preserve order). Otherwise, handle it immediately. @@ -2075,6 +2055,7 @@ impl Upstairs { pub(crate) mod test { use super::*; use crate::{ + client::ClientStopReason, downstairs::test::set_all_active, test::{make_encrypted_upstairs, make_upstairs}, BlockContext, BlockReq, BlockReqWaiter, DsState, JobId,