diff --git a/Cargo.lock b/Cargo.lock index d1451e2939..a5a03ab47c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3758,6 +3758,7 @@ name = "relay-redis" version = "24.11.0" dependencies = [ "r2d2", + "rayon", "redis", "relay-log", "serde", diff --git a/relay-redis/Cargo.toml b/relay-redis/Cargo.toml index b66483cb84..ccc9cbd1f3 100644 --- a/relay-redis/Cargo.toml +++ b/relay-redis/Cargo.toml @@ -24,6 +24,7 @@ redis = { workspace = true, optional = true, features = [ relay-log = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } +rayon = { workspace = true } [features] default = [] diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 30fa44f773..aafa4450a8 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -1,14 +1,25 @@ use r2d2::{Builder, ManageConnection, Pool, PooledConnection}; +use rayon::prelude::*; +use rayon::ThreadPool; +use rayon::ThreadPoolBuilder; pub use redis; use redis::ConnectionLike; use std::error::Error; -use std::thread::Scope; +use std::fmt; +use std::sync::LazyLock; use std::time::Duration; -use std::{fmt, thread}; use thiserror::Error; use crate::config::RedisConfigOptions; +static REDIS_SECONDARIES_THREAD_POOL: LazyLock = LazyLock::new(|| { + ThreadPoolBuilder::new() + .num_threads(10) + .thread_name(|i| format!("redis-secondary-{}", i)) + .build() + .expect("Failed to create Redis secondaries thread pool") +}); + /// An error returned from `RedisPool`. #[derive(Debug, Error)] pub enum RedisError { @@ -34,21 +45,6 @@ fn log_secondary_redis_error(result: redis::RedisResult) { } } -fn spawn_secondary_thread<'scope, 'env: 'scope, T>( - scope: &'scope Scope<'scope, 'env>, - block: impl FnOnce() -> redis::RedisResult + Send + 'scope, -) { - let result = thread::Builder::new().spawn_scoped(scope, move || { - log_secondary_redis_error(block()); - }); - if let Err(error) = result { - relay_log::error!( - error = &error as &dyn Error, - "spawning the thread for the secondary Redis connection failed", - ); - } -} - enum ConnectionInner<'a> { Cluster(&'a mut redis::cluster::ClusterConnection), MultiWrite { @@ -66,12 +62,14 @@ impl ConnectionLike for ConnectionInner<'_> { ConnectionInner::MultiWrite { primary, secondaries, - } => thread::scope(|s| { - for connection in secondaries { - spawn_secondary_thread(s, || connection.req_packed_command(cmd)) - } + } => { + REDIS_SECONDARIES_THREAD_POOL.install(|| { + secondaries.par_iter_mut().for_each(|connection| { + log_secondary_redis_error(connection.req_packed_command(cmd)); + }); + }); primary.req_packed_command(cmd) - }), + } } } @@ -87,12 +85,16 @@ impl ConnectionLike for ConnectionInner<'_> { ConnectionInner::MultiWrite { primary, secondaries, - } => thread::scope(|s| { - for connection in secondaries { - spawn_secondary_thread(s, || connection.req_packed_commands(cmd, offset, count)) - } + } => { + REDIS_SECONDARIES_THREAD_POOL.install(|| { + secondaries.par_iter_mut().for_each(|connection| { + log_secondary_redis_error( + connection.req_packed_commands(cmd, offset, count), + ); + }); + }); primary.req_packed_commands(cmd, offset, count) - }), + } } }