Skip to content

Commit

Permalink
Use a default small keepalive value for Redis connections
Browse files Browse the repository at this point in the history
  • Loading branch information
benashford committed Feb 12, 2024
1 parent 0070a76 commit fc61e8a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ futures-util = { version = "^0.3.7", features = ["sink"] }
log = "^0.4.11"
native-tls = { version = "0.2", optional = true }
pin-project = "1.0"
socket2 = "0.5"
tokio = { version = "1.0", features = ["rt", "net", "time"] }
tokio-native-tls = { version = "0.3.0", optional = true }
tokio-rustls = { version = "0.24", optional = true }
tokio-util = { version = "0.7", features = ["codec"] }
webpki-roots = {version = "0.23", optional = true }
webpki-roots = { version = "0.23", optional = true }

[features]
default = []
Expand Down
20 changes: 19 additions & 1 deletion src/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 Ben Ashford
* Copyright 2017-2024 Ben Ashford
*
* Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
* http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
Expand All @@ -8,6 +8,8 @@
* except according to those terms.
*/

use std::time::Duration;

use futures_util::{SinkExt, StreamExt};
use pin_project::pin_project;
use tokio::{
Expand Down Expand Up @@ -95,6 +97,8 @@ impl AsyncRead for RespConnectionInner {

pub type RespConnection = Framed<RespConnectionInner, RespCodec>;

const DEFAULT_KEEPALIVE_DURATION: Duration = Duration::from_secs(5);

/// Connect to a Redis server and return a Future that resolves to a
/// `RespConnection` for reading and writing asynchronously.
///
Expand All @@ -112,6 +116,7 @@ pub type RespConnection = Framed<RespConnectionInner, RespCodec>;
/// single result, this library also implements `paired_connect`.
pub async fn connect(host: &str, port: u16) -> Result<RespConnection, error::Error> {
let tcp_stream = TcpStream::connect((host, port)).await?;
apply_keepalive(&tcp_stream, DEFAULT_KEEPALIVE_DURATION)?;
Ok(RespCodec.framed(RespConnectionInner::Plain { stream: tcp_stream }))
}

Expand Down Expand Up @@ -216,6 +221,19 @@ pub async fn connect_with_auth(
Ok(connection)
}

/// Apply a custom keep-alive value to the connection
fn apply_keepalive(stream: &TcpStream, interval: Duration) -> Result<(), error::Error> {
let sock_ref = socket2::SockRef::from(stream);

let keep_alive = socket2::TcpKeepalive::new()
.with_time(interval)
.with_interval(interval);

sock_ref.set_tcp_keepalive(&keep_alive)?;

Ok(())
}

#[cfg(test)]
mod test {
use futures_util::{
Expand Down

0 comments on commit fc61e8a

Please sign in to comment.