diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 05e45b763..5e77b0042 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -17,7 +17,7 @@ use slog::{debug, error, info, o, warn, Logger}; use tokio::{ net::{TcpSocket, TcpStream}, sync::{mpsc, oneshot}, - time::{sleep_until, Instant}, + time::sleep_until, }; use tokio_util::codec::{FramedRead, FramedWrite}; use uuid::Uuid; @@ -135,15 +135,6 @@ pub(crate) struct DownstairsClient { */ pub(crate) repair_info: Option, - /// Deadline for the next ping - ping_interval: Instant, - - /// Deadline until we mark the client as dead - timeout_deadline: Instant, - - /// Ping every 10 seconds if things are idle - ping_count: u64, - /// Accumulated statistics pub(crate) stats: DownstairsStats, @@ -175,11 +166,8 @@ impl DownstairsClient { client_id, region_uuid: None, negotiation_state: NegotiationState::Start, - ping_count: 0, - ping_interval: deadline_secs(PING_INTERVAL_SECS), tls_context, promote_state: None, - timeout_deadline: deadline_secs(TIMEOUT_SECS), log, target_addr, repair_addr: None, @@ -214,11 +202,8 @@ impl DownstairsClient { client_id: ClientId::new(0), region_uuid: None, negotiation_state: NegotiationState::Start, - ping_count: 0, - ping_interval: deadline_secs(PING_INTERVAL_SECS), tls_context: None, promote_state: None, - timeout_deadline: deadline_secs(TIMEOUT_SECS), log: crucible_common::build_logger(), target_addr: None, repair_addr: None, @@ -256,12 +241,6 @@ impl DownstairsClient { None => ClientAction::ChannelClosed, } } - _ = sleep_until(self.ping_interval) => { - ClientAction::Ping - } - _ = sleep_until(self.timeout_deadline) => { - ClientAction::Timeout - } } } @@ -279,19 +258,6 @@ impl DownstairsClient { .await; } - /// If the client task is running, send a `Message::Ruok` - /// - /// If the client task is **not** running, log a warning to that effect. - pub(crate) async fn send_ping(&mut self) { - self.ping_interval = deadline_secs(PING_INTERVAL_SECS); - // It's possible for the client task to have stopped after we requested - // the ping. If that's the case, then we'll catch it on the next - // go-around, and should just log an error here. - self.send(Message::Ruok).await; - self.ping_count += 1; - cdt::ds__ping__sent!(|| (self.ping_count, self.client_id.get())); - } - pub(crate) fn halt_io_task(&mut self, r: ClientStopReason) { if let Some(t) = self.client_task.client_stop_tx.take() { if let Err(_e) = t.send(r) { @@ -601,7 +567,6 @@ impl DownstairsClient { self.tls_context.clone(), &self.log, ); - self.reset_timeout(); } fn new_io_task( @@ -1162,11 +1127,6 @@ impl DownstairsClient { self.stats.live_repair_completed += 1; } - /// Resets our timeout deadline - pub(crate) fn reset_timeout(&mut self) { - self.timeout_deadline = deadline_secs(TIMEOUT_SECS); - } - /// Handles a single IO operation /// /// Returns `true` if the job is now ackable, `false` otherwise @@ -2203,12 +2163,6 @@ pub(crate) enum ClientAction { /// The client task has stopped TaskStopped(ClientRunResult), - /// It's time to ping the client - Ping, - - /// The client has hit a (Crucible) timeout - Timeout, - /// The client IO channel has returned `None` /// /// This should never happen during normal operation, because @@ -2256,9 +2210,6 @@ pub(crate) struct DownstairsStats { /// When the upstairs halts the IO client task, it must provide a reason #[derive(Debug)] pub(crate) enum ClientStopReason { - /// Crucible-level timeout (i.e. no packets received in too long) - Timeout, - /// We are about to replace the client task Replacing, @@ -2317,6 +2268,8 @@ pub(crate) enum ClientRunResult { ConnectionTimeout, /// We failed to make the initial connection ConnectionFailed(std::io::Error), + /// We experienced a timeout after connecting + Timeout, /// A socket write failed WriteFailed(anyhow::Error), /// We received an error while reading from the connection @@ -2461,10 +2414,11 @@ async fn client_run_inner( WrappedStream::Http(tcp) } }; - proc_stream(tcp, rx, stop, tx, log).await + proc_stream(client_id, tcp, rx, stop, tx, log).await } async fn proc_stream( + client_id: ClientId, stream: WrappedStream, rx: &mut mpsc::Receiver, stop: &mut oneshot::Receiver, @@ -2478,7 +2432,7 @@ async fn proc_stream( let fr = FramedRead::new(read, CrucibleDecoder::new()); let fw = FramedWrite::new(write, CrucibleEncoder::new()); - cmd_loop(rx, stop, tx, fr, fw, log).await + cmd_loop(client_id, rx, stop, tx, fr, fw, log).await } WrappedStream::Https(stream) => { let (read, write) = tokio::io::split(stream); @@ -2486,35 +2440,27 @@ async fn proc_stream( let fr = FramedRead::new(read, CrucibleDecoder::new()); let fw = FramedWrite::new(write, CrucibleEncoder::new()); - cmd_loop(rx, stop, tx, fr, fw, log).await + cmd_loop(client_id, rx, stop, tx, fr, fw, log).await } } } -async fn cmd_loop( - rx: &mut mpsc::Receiver, - stop: &mut oneshot::Receiver, - tx: &mpsc::Sender, +async fn rx_loop( + tx: mpsc::Sender, mut fr: FramedRead, - mut fw: FramedWrite, - log: &Logger, + log: Logger, ) -> ClientRunResult where R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static, - W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static, { - tx.send(ClientResponse::Connected) - .await - .expect("client_response_tx closed unexpectedly"); - - let mut recv_task = { - let tx = tx.clone(); - let log = log.clone(); - - tokio::spawn(async move { - while let Some(f) = fr.next().await { + let mut timeout = deadline_secs(TIMEOUT_SECS); + loop { + tokio::select! { + f = fr.next() => { match f { - Ok(m) => { + Some(Ok(m)) => { + // reset the timeout, since we've received a message + timeout = deadline_secs(TIMEOUT_SECS); if let Err(e) = tx.send(ClientResponse::Message(m)).await { @@ -2523,21 +2469,50 @@ where "client response queue closed unexpectedly: \ {e}; is the program exiting?" ); - return ClientRunResult::QueueClosed; + break ClientRunResult::QueueClosed; } } - Err(e) => { + Some(Err(e)) => { warn!(log, "downstairs client error {e}"); - return ClientRunResult::ReadFailed(e); + break ClientRunResult::ReadFailed(e); + } + None => { + warn!(log, "downstairs disconnected"); + break ClientRunResult::Finished; } } } - // Downstairs disconnected - warn!(log, "downstairs disconnected"); - ClientRunResult::Finished - }) - }; + _ = sleep_until(timeout) => { + warn!(log, "downstairs timed out"); + break ClientRunResult::Timeout; + } + } + } +} + +async fn cmd_loop( + client_id: ClientId, + rx: &mut mpsc::Receiver, + stop: &mut oneshot::Receiver, + tx: &mpsc::Sender, + fr: FramedRead, + mut fw: FramedWrite, + log: &Logger, +) -> ClientRunResult +where + R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static, + W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static, +{ + tx.send(ClientResponse::Connected) + .await + .expect("client_response_tx closed unexpectedly"); + // Spawn a separate task to receive data over the network, so that we can + // always make progress and keep the socket buffer from filling up. + let mut recv_task = tokio::spawn(rx_loop(tx.clone(), fr, log.clone())); + + let mut ping_interval = deadline_secs(PING_INTERVAL_SECS); + let mut ping_count = 0u64; loop { tokio::select! { join_result = &mut recv_task => { @@ -2559,6 +2534,16 @@ where } } + _ = sleep_until(ping_interval) => { + ping_interval = deadline_secs(PING_INTERVAL_SECS); + ping_count += 1; + cdt::ds__ping__sent!(|| (ping_count, client_id.get())); + + if let Err(e) = fw.send(Message::Ruok).await { + break ClientRunResult::WriteFailed(e); + } + } + s = &mut *stop => { match s { Ok(s) => { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 4522e2c7a..a06ac05aa 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -2,7 +2,7 @@ //! Data structures specific to Crucible's `struct Upstairs` use crate::{ cdt, - client::{ClientAction, ClientRunResult, ClientStopReason}, + client::{ClientAction, ClientRunResult}, control::ControlRequest, deadline_secs, deferred::{ @@ -1518,27 +1518,7 @@ impl Upstairs { self.downstairs.clients[client_id].stats.connected += 1; self.downstairs.clients[client_id].send_here_i_am().await; } - ClientAction::Ping => { - self.downstairs.clients[client_id].send_ping().await; - } - ClientAction::Timeout => { - // Ask the downstairs client task to stop, because the client - // has hit a Crucible timeout. - // - // This will come back to `TaskStopped`, at which point we'll - // clear out the task and restart it. - // - // We need to reset the timeout, because otherwise it will keep - // firing and will monopolize the future. - let c = &mut self.downstairs.clients[client_id]; - c.reset_timeout(); - c.halt_io_task(ClientStopReason::Timeout); - } ClientAction::Response(m) => { - // We have received a message, so reset the timeout watchdog for - // this particular client. - self.downstairs.clients[client_id].reset_timeout(); - // Defer the message if it's a (large) read that needs // decryption, or there are other deferred messages in the queue // (to preserve order). Otherwise, handle it immediately. @@ -2075,6 +2055,7 @@ impl Upstairs { pub(crate) mod test { use super::*; use crate::{ + client::ClientStopReason, downstairs::test::set_all_active, test::{make_encrypted_upstairs, make_upstairs}, BlockContext, BlockReq, BlockReqWaiter, DsState, JobId,