Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle timeout in the client IO task #1109

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 64 additions & 79 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use slog::{debug, error, info, o, warn, Logger};
use tokio::{
net::{TcpSocket, TcpStream},
sync::{mpsc, oneshot},
time::{sleep_until, Instant},
time::sleep_until,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use uuid::Uuid;
Expand Down Expand Up @@ -135,15 +135,6 @@ pub(crate) struct DownstairsClient {
*/
pub(crate) repair_info: Option<ExtentInfo>,

/// Deadline for the next ping
ping_interval: Instant,

/// Deadline until we mark the client as dead
timeout_deadline: Instant,

/// Ping every 10 seconds if things are idle
ping_count: u64,

/// Accumulated statistics
pub(crate) stats: DownstairsStats,

Expand Down Expand Up @@ -175,11 +166,8 @@ impl DownstairsClient {
client_id,
region_uuid: None,
negotiation_state: NegotiationState::Start,
ping_count: 0,
ping_interval: deadline_secs(PING_INTERVAL_SECS),
tls_context,
promote_state: None,
timeout_deadline: deadline_secs(TIMEOUT_SECS),
log,
target_addr,
repair_addr: None,
Expand Down Expand Up @@ -214,11 +202,8 @@ impl DownstairsClient {
client_id: ClientId::new(0),
region_uuid: None,
negotiation_state: NegotiationState::Start,
ping_count: 0,
ping_interval: deadline_secs(PING_INTERVAL_SECS),
tls_context: None,
promote_state: None,
timeout_deadline: deadline_secs(TIMEOUT_SECS),
log: crucible_common::build_logger(),
target_addr: None,
repair_addr: None,
Expand Down Expand Up @@ -256,12 +241,6 @@ impl DownstairsClient {
None => ClientAction::ChannelClosed,
}
}
_ = sleep_until(self.ping_interval) => {
ClientAction::Ping
}
_ = sleep_until(self.timeout_deadline) => {
ClientAction::Timeout
}
}
}

Expand All @@ -279,19 +258,6 @@ impl DownstairsClient {
.await;
}

/// If the client task is running, send a `Message::Ruok`
///
/// If the client task is **not** running, log a warning to that effect.
pub(crate) async fn send_ping(&mut self) {
self.ping_interval = deadline_secs(PING_INTERVAL_SECS);
// It's possible for the client task to have stopped after we requested
// the ping. If that's the case, then we'll catch it on the next
// go-around, and should just log an error here.
self.send(Message::Ruok).await;
self.ping_count += 1;
cdt::ds__ping__sent!(|| (self.ping_count, self.client_id.get()));
}

pub(crate) fn halt_io_task(&mut self, r: ClientStopReason) {
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
Expand Down Expand Up @@ -601,7 +567,6 @@ impl DownstairsClient {
self.tls_context.clone(),
&self.log,
);
self.reset_timeout();
}

fn new_io_task(
Expand Down Expand Up @@ -1162,11 +1127,6 @@ impl DownstairsClient {
self.stats.live_repair_completed += 1;
}

/// Resets our timeout deadline
pub(crate) fn reset_timeout(&mut self) {
self.timeout_deadline = deadline_secs(TIMEOUT_SECS);
}

/// Handles a single IO operation
///
/// Returns `true` if the job is now ackable, `false` otherwise
Expand Down Expand Up @@ -2203,12 +2163,6 @@ pub(crate) enum ClientAction {
/// The client task has stopped
TaskStopped(ClientRunResult),

/// It's time to ping the client
Ping,

/// The client has hit a (Crucible) timeout
Timeout,

/// The client IO channel has returned `None`
///
/// This should never happen during normal operation, because
Expand Down Expand Up @@ -2256,9 +2210,6 @@ pub(crate) struct DownstairsStats {
/// When the upstairs halts the IO client task, it must provide a reason
#[derive(Debug)]
pub(crate) enum ClientStopReason {
/// Crucible-level timeout (i.e. no packets received in too long)
Timeout,

/// We are about to replace the client task
Replacing,

Expand Down Expand Up @@ -2317,6 +2268,8 @@ pub(crate) enum ClientRunResult {
ConnectionTimeout,
/// We failed to make the initial connection
ConnectionFailed(std::io::Error),
/// We experienced a timeout after connecting
Timeout,
/// A socket write failed
WriteFailed(anyhow::Error),
/// We received an error while reading from the connection
Expand Down Expand Up @@ -2461,10 +2414,11 @@ async fn client_run_inner(
WrappedStream::Http(tcp)
}
};
proc_stream(tcp, rx, stop, tx, log).await
proc_stream(client_id, tcp, rx, stop, tx, log).await
}

async fn proc_stream(
client_id: ClientId,
stream: WrappedStream,
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
Expand All @@ -2478,43 +2432,35 @@ async fn proc_stream(
let fr = FramedRead::new(read, CrucibleDecoder::new());
let fw = FramedWrite::new(write, CrucibleEncoder::new());

cmd_loop(rx, stop, tx, fr, fw, log).await
cmd_loop(client_id, rx, stop, tx, fr, fw, log).await
}
WrappedStream::Https(stream) => {
let (read, write) = tokio::io::split(stream);

let fr = FramedRead::new(read, CrucibleDecoder::new());
let fw = FramedWrite::new(write, CrucibleEncoder::new());

cmd_loop(rx, stop, tx, fr, fw, log).await
cmd_loop(client_id, rx, stop, tx, fr, fw, log).await
}
}
}

async fn cmd_loop<R, W>(
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
tx: &mpsc::Sender<ClientResponse>,
async fn rx_loop<R>(
tx: mpsc::Sender<ClientResponse>,
mut fr: FramedRead<R, crucible_protocol::CrucibleDecoder>,
mut fw: FramedWrite<W, crucible_protocol::CrucibleEncoder>,
log: &Logger,
log: Logger,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github is butchering the diff here, sorry about that!

I moved the code from within the tokio::spawn into a separate function named rx_loop, which now includes the timeout as well as reading from the FramedRead (so it's got a select! in there)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, thanks for the comment otherwise I would have puzzled about this for a while.

) -> ClientRunResult
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static,
{
tx.send(ClientResponse::Connected)
.await
.expect("client_response_tx closed unexpectedly");

let mut recv_task = {
let tx = tx.clone();
let log = log.clone();

tokio::spawn(async move {
while let Some(f) = fr.next().await {
let mut timeout = deadline_secs(TIMEOUT_SECS);
loop {
tokio::select! {
f = fr.next() => {
match f {
Ok(m) => {
Some(Ok(m)) => {
// reset the timeout, since we've received a message
timeout = deadline_secs(TIMEOUT_SECS);
if let Err(e) =
tx.send(ClientResponse::Message(m)).await
{
Expand All @@ -2523,21 +2469,50 @@ where
"client response queue closed unexpectedly: \
{e}; is the program exiting?"
);
return ClientRunResult::QueueClosed;
break ClientRunResult::QueueClosed;
}
}
Err(e) => {
Some(Err(e)) => {
warn!(log, "downstairs client error {e}");
return ClientRunResult::ReadFailed(e);
break ClientRunResult::ReadFailed(e);
}
None => {
warn!(log, "downstairs disconnected");
break ClientRunResult::Finished;
}
}
}
// Downstairs disconnected
warn!(log, "downstairs disconnected");
ClientRunResult::Finished
})
};
_ = sleep_until(timeout) => {
warn!(log, "downstairs timed out");
break ClientRunResult::Timeout;
}
}
}
}

async fn cmd_loop<R, W>(
client_id: ClientId,
rx: &mut mpsc::Receiver<Message>,
stop: &mut oneshot::Receiver<ClientStopReason>,
tx: &mpsc::Sender<ClientResponse>,
fr: FramedRead<R, crucible_protocol::CrucibleDecoder>,
mut fw: FramedWrite<W, crucible_protocol::CrucibleEncoder>,
log: &Logger,
) -> ClientRunResult
where
R: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send + 'static,
W: tokio::io::AsyncWrite + std::marker::Unpin + std::marker::Send + 'static,
{
tx.send(ClientResponse::Connected)
.await
.expect("client_response_tx closed unexpectedly");

// Spawn a separate task to receive data over the network, so that we can
// always make progress and keep the socket buffer from filling up.
let mut recv_task = tokio::spawn(rx_loop(tx.clone(), fr, log.clone()));

let mut ping_interval = deadline_secs(PING_INTERVAL_SECS);
let mut ping_count = 0u64;
loop {
tokio::select! {
join_result = &mut recv_task => {
Expand All @@ -2559,6 +2534,16 @@ where
}
}

_ = sleep_until(ping_interval) => {
ping_interval = deadline_secs(PING_INTERVAL_SECS);
ping_count += 1;
cdt::ds__ping__sent!(|| (ping_count, client_id.get()));

if let Err(e) = fw.send(Message::Ruok).await {
break ClientRunResult::WriteFailed(e);
}
}

s = &mut *stop => {
match s {
Ok(s) => {
Expand Down
23 changes: 2 additions & 21 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Data structures specific to Crucible's `struct Upstairs`
use crate::{
cdt,
client::{ClientAction, ClientRunResult, ClientStopReason},
client::{ClientAction, ClientRunResult},
control::ControlRequest,
deadline_secs,
deferred::{
Expand Down Expand Up @@ -1518,27 +1518,7 @@ impl Upstairs {
self.downstairs.clients[client_id].stats.connected += 1;
self.downstairs.clients[client_id].send_here_i_am().await;
}
ClientAction::Ping => {
self.downstairs.clients[client_id].send_ping().await;
}
ClientAction::Timeout => {
// Ask the downstairs client task to stop, because the client
// has hit a Crucible timeout.
//
// This will come back to `TaskStopped`, at which point we'll
// clear out the task and restart it.
//
// We need to reset the timeout, because otherwise it will keep
// firing and will monopolize the future.
let c = &mut self.downstairs.clients[client_id];
c.reset_timeout();
c.halt_io_task(ClientStopReason::Timeout);
}
ClientAction::Response(m) => {
// We have received a message, so reset the timeout watchdog for
// this particular client.
self.downstairs.clients[client_id].reset_timeout();

// Defer the message if it's a (large) read that needs
// decryption, or there are other deferred messages in the queue
// (to preserve order). Otherwise, handle it immediately.
Expand Down Expand Up @@ -2075,6 +2055,7 @@ impl Upstairs {
pub(crate) mod test {
use super::*;
use crate::{
client::ClientStopReason,
downstairs::test::set_all_active,
test::{make_encrypted_upstairs, make_upstairs},
BlockContext, BlockReq, BlockReqWaiter, DsState, JobId,
Expand Down
Loading