Skip to content

Commit

Permalink
chore(server): Set nodelay and tcp timeout on constructing TcpIncoming
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Nov 6, 2024
1 parent 1449bd5 commit 6e8af8b
Showing 1 changed file with 12 additions and 37 deletions.
49 changes: 12 additions & 37 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
net::{SocketAddr, TcpListener as StdTcpListener},
ops::ControlFlow,
pin::{pin, Pin},
task::{ready, Context, Poll},
task::{Context, Poll},
time::Duration,
};

Expand All @@ -13,7 +13,6 @@ use tokio::{
};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::{Stream, StreamExt};
use tracing::warn;

use super::service::ServerIo;
#[cfg(feature = "_tls-any")]
Expand Down Expand Up @@ -163,8 +162,6 @@ enum SelectOutput<A> {
#[derive(Debug)]
pub struct TcpIncoming {
inner: TcpListenerStream,
nodelay: bool,
keepalive: Option<Duration>,
}

impl TcpIncoming {
Expand Down Expand Up @@ -207,12 +204,7 @@ impl TcpIncoming {
let std_listener = StdTcpListener::bind(addr)?;
std_listener.set_nonblocking(true)?;

let inner = TcpListenerStream::new(TcpListener::from_std(std_listener)?);
Ok(Self {
inner,
nodelay,
keepalive,
})
Self::from_listener(std_listener.try_into()?, nodelay, keepalive)
}

/// Creates a new `TcpIncoming` from an existing `tokio::net::TcpListener`.
Expand All @@ -221,10 +213,17 @@ impl TcpIncoming {
nodelay: bool,
keepalive: Option<Duration>,
) -> Result<Self, crate::BoxError> {
let sock_ref = socket2::SockRef::from(&listener);

sock_ref.set_nodelay(nodelay)?;

if let Some(timeout) = keepalive {
let sock_keepalive = socket2::TcpKeepalive::new().with_time(timeout);
sock_ref.set_tcp_keepalive(&sock_keepalive)?;
}

Ok(Self {
inner: TcpListenerStream::new(listener),
nodelay,
keepalive,
})
}
}
Expand All @@ -233,31 +232,7 @@ impl Stream for TcpIncoming {
type Item = Result<TcpStream, std::io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(stream)) => {
set_accepted_socket_options(&stream, self.nodelay, self.keepalive);
Some(Ok(stream)).into()
}
other => Poll::Ready(other),
}
}
}

// Consistent with hyper-0.14, this function does not return an error.
fn set_accepted_socket_options(stream: &TcpStream, nodelay: bool, keepalive: Option<Duration>) {
if nodelay {
if let Err(e) = stream.set_nodelay(true) {
warn!("error trying to set TCP nodelay: {}", e);
}
}

if let Some(timeout) = keepalive {
let sock_ref = socket2::SockRef::from(&stream);
let sock_keepalive = socket2::TcpKeepalive::new().with_time(timeout);

if let Err(e) = sock_ref.set_tcp_keepalive(&sock_keepalive) {
warn!("error trying to set TCP keepalive: {}", e);
}
Pin::new(&mut self.inner).poll_next(cx)
}
}

Expand Down

0 comments on commit 6e8af8b

Please sign in to comment.