From fc61e8acd1a9e969c983876c6ed684007d79cab5 Mon Sep 17 00:00:00 2001 From: Ben Ashford Date: Mon, 12 Feb 2024 15:36:53 +0000 Subject: [PATCH] Use a default small keepalive value for Redis connections --- Cargo.toml | 3 ++- src/client/connect.rs | 20 +++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f9cdfee..d52526d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,12 @@ futures-util = { version = "^0.3.7", features = ["sink"] } log = "^0.4.11" native-tls = { version = "0.2", optional = true } pin-project = "1.0" +socket2 = "0.5" tokio = { version = "1.0", features = ["rt", "net", "time"] } tokio-native-tls = { version = "0.3.0", optional = true } tokio-rustls = { version = "0.24", optional = true } tokio-util = { version = "0.7", features = ["codec"] } -webpki-roots = {version = "0.23", optional = true } +webpki-roots = { version = "0.23", optional = true } [features] default = [] diff --git a/src/client/connect.rs b/src/client/connect.rs index 3c359ac..3cb5736 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 Ben Ashford + * Copyright 2017-2024 Ben Ashford * * Licensed under the Apache License, Version 2.0 or the MIT license @@ -8,6 +8,8 @@ * except according to those terms. */ +use std::time::Duration; + use futures_util::{SinkExt, StreamExt}; use pin_project::pin_project; use tokio::{ @@ -95,6 +97,8 @@ impl AsyncRead for RespConnectionInner { pub type RespConnection = Framed; +const DEFAULT_KEEPALIVE_DURATION: Duration = Duration::from_secs(5); + /// Connect to a Redis server and return a Future that resolves to a /// `RespConnection` for reading and writing asynchronously. /// @@ -112,6 +116,7 @@ pub type RespConnection = Framed; /// single result, this library also implements `paired_connect`. pub async fn connect(host: &str, port: u16) -> Result { let tcp_stream = TcpStream::connect((host, port)).await?; + apply_keepalive(&tcp_stream, DEFAULT_KEEPALIVE_DURATION)?; Ok(RespCodec.framed(RespConnectionInner::Plain { stream: tcp_stream })) } @@ -216,6 +221,19 @@ pub async fn connect_with_auth( Ok(connection) } +/// Apply a custom keep-alive value to the connection +fn apply_keepalive(stream: &TcpStream, interval: Duration) -> Result<(), error::Error> { + let sock_ref = socket2::SockRef::from(stream); + + let keep_alive = socket2::TcpKeepalive::new() + .with_time(interval) + .with_interval(interval); + + sock_ref.set_tcp_keepalive(&keep_alive)?; + + Ok(()) +} + #[cfg(test)] mod test { use futures_util::{