diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 40d8cc8..6bce1ac 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -44,6 +44,7 @@ use redis::{ }; use serde::Serialize; use svix_ksuid::KsuidLike; +use thiserror::Error; use tokio::task::JoinSet; use tracing::{debug, error, trace, warn}; @@ -90,6 +91,49 @@ impl RedisConnection for RedisClusterConnectionManager { } } +#[derive(Debug, Error)] +enum EvictionCheckError { + #[error("Unable to verify eviction policy. Ensure `maxmemory-policy` set to `noeviction` or `volatile-*`")] + CheckEvictionPolicyFailed, + #[error("Unsafe eviction policy found. Your queue is at risk of data loss. Please ensure `maxmemory-policy` set to `noeviction` or `volatile-*`")] + UnsafeEvictionPolicy, +} + +async fn check_eviction_policy( + pool: bb8::Pool, +) -> std::result::Result<(), EvictionCheckError> { + let mut conn = pool + .get() + .await + .map_err(|_| EvictionCheckError::CheckEvictionPolicyFailed)?; + + let results: Vec = match redis::cmd("CONFIG") + .arg("GET") + .arg("maxmemory-policy") + .query_async::<::Connection, Vec>(&mut *conn) + .await + { + Ok(results) if results.len() == 2 => Ok(results), + _ => Err(EvictionCheckError::CheckEvictionPolicyFailed), + }?; + + let eviction_policy = results.get(1).expect("Length already validated"); + if [ + "noeviction", + "volatile-lru", + "volatile-lfu", + "volatile-random", + "volatile-ttl", + ] + .contains(&eviction_policy.as_str()) + { + tracing::debug!("Eviction policy `{eviction_policy}` found"); + Ok(()) + } else { + Err(EvictionCheckError::UnsafeEvictionPolicy) + } +} + pub struct RedisConfig { pub dsn: String, pub max_connections: u16, @@ -326,6 +370,22 @@ impl RedisBackendBuilder { } }); + join_set.spawn({ + async move { + if let Err(e) = check_eviction_policy(redis.clone()).await { + match e { + EvictionCheckError::CheckEvictionPolicyFailed => { + tracing::warn!("{e}"); + } + EvictionCheckError::UnsafeEvictionPolicy => { + tracing::error!("{e}"); + } + } + } + Ok(()) + } + }); + Arc::new(join_set) } }