diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index e7af6b30..5264741d 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -28,6 +28,7 @@ travis-ci = { repository = "google/tarpc" } [dependencies] fnv = "1.0" futures = "0.3" +futures-timer = "3.0" humantime = "1.0" log = "0.4" pin-project = "0.4" diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index 290602e5..4a6d2546 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -18,6 +18,7 @@ use futures::{ stream::Fuse, task::*, }; +use futures_timer::Delay; use log::{debug, info, trace}; use pin_project::{pin_project, pinned_drop, project}; use std::{ @@ -78,7 +79,7 @@ impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { #[must_use = "futures do nothing unless polled"] pub struct Call<'a, Req, Resp> { #[pin] - fut: tokio::time::Timeout, DispatchResponse>>, + fut: future::Select, DispatchResponse>>>, Delay>, } impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { @@ -87,8 +88,8 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let resp = ready!(self.as_mut().project().fut.poll(cx)); Poll::Ready(match resp { - Ok(resp) => resp, - Err(tokio::time::Elapsed { .. }) => Err(io::Error::new( + future::Either::Left((resp, _)) => resp, + future::Either::Right(_) => Err(io::Error::new( io::ErrorKind::TimedOut, "Client dropped expired request.".to_string(), )), @@ -137,7 +138,10 @@ impl Channel { ); Call { - fut: tokio::time::timeout(timeout, AndThenIdent::new(self.send(ctx, request))), + fut: future::select( + Box::pin(AndThenIdent::new(self.send(ctx, request))), + Delay::new(timeout), + ), } } } diff --git a/tarpc/src/rpc/server/mod.rs b/tarpc/src/rpc/server/mod.rs index d2aa9ebe..745a18d0 100644 --- a/tarpc/src/rpc/server/mod.rs +++ b/tarpc/src/rpc/server/mod.rs @@ -19,11 +19,11 @@ use futures::{ stream::Fuse, task::*, }; +use futures_timer::Delay; use humantime::format_rfc3339; use log::{debug, trace}; use pin_project::pin_project; use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime}; -use tokio::time::Timeout; mod filter; #[cfg(test)] @@ -487,7 +487,7 @@ where request_id, ctx, deadline, - f: tokio::time::timeout(timeout, response), + f: future::select(Box::pin(response), Delay::new(timeout)), response: None, response_tx: self.as_mut().project().responses_tx.clone(), }; @@ -526,7 +526,7 @@ struct Resp { ctx: context::Context, deadline: SystemTime, #[pin] - f: Timeout, + f: future::Select>, Delay>, response: Option>, #[pin] response_tx: mpsc::Sender<(context::Context, Response)>, @@ -554,8 +554,8 @@ where *self.as_mut().project().response = Some(Response { request_id: self.request_id, message: match result { - Ok(message) => Ok(message), - Err(tokio::time::Elapsed { .. }) => { + future::Either::Left((message, _)) => Ok(message), + future::Either::Right(_) => { debug!( "[{}] Response did not complete before deadline of {}s.", self.ctx.trace_id(),