From 9a580b11b62a1b7377d2d1eaa159576e428f9d8e Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Tue, 23 Apr 2024 20:53:32 -0400 Subject: [PATCH 1/2] feat: CachedRedisStorage will now default to flushing a batch ASAP to reduce the length of time the cache is out of sync with Redis counters. Setting a flushing_period is still supported to have a delay before flushing which can be useful to reduce the load on the Redis server in exchange for having staler counters in limitador. Signed-off-by: Hiram Chirino --- limitador/src/storage/redis/mod.rs | 2 +- limitador/src/storage/redis/redis_cached.rs | 25 ++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index a60ab99a..951c58c0 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -7,7 +7,7 @@ mod redis_cached; mod redis_sync; mod scripts; -pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1; +pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 0; pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000; pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5; pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 3f9e3da6..a7c40a9d 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -44,6 +44,7 @@ pub struct CachedRedisStorage { async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, partitioned: Arc, + flush_tx: tokio::sync::watch::Sender<()>, } #[async_trait] @@ -154,10 +155,10 @@ impl AsyncCounterStorage for CachedRedisStorage { self.cached_counters.increase_by(counter, delta); } - // Batch or update depending on configuration let mut batcher = self.batcher_counter_updates.lock().unwrap(); let now = SystemTime::now(); for counter in counters.iter() { + // Update an existing batch entry or add a new batch entry match batcher.get_mut(counter) { Some(val) => { val.update(delta, counter.seconds(), now); @@ -174,6 +175,9 @@ impl AsyncCounterStorage for CachedRedisStorage { } } + // ask the flusher to flush the batch + self.flush_tx.send(()).unwrap(); + Ok(Authorization::Ok) } @@ -240,15 +244,30 @@ impl CachedRedisStorage { let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); + let (flush_tx, mut flush_rx) = tokio::sync::watch::channel(()); + { let storage = async_redis_storage.clone(); let counters_cache_clone = counters_cache.clone(); let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { loop { + // Wait for a new flush request, + flush_rx.changed().await.unwrap(); + + if flushing_period != Duration::ZERO { + // Set the flushing_period to reduce the load on Redis the downside to + // setting it the cached counters will not be as accurate as when it's zero. + tokio::time::sleep(flushing_period).await + } + + // even if flushing_period is zero, the next batch/ will be growing while + // current batch is being executed against Redis. When under load, the flush + // frequency will proportional to batch execution latency. + flush_batcher_and_update_counters( conn.clone(), batcher_flusher.clone(), @@ -257,7 +276,6 @@ impl CachedRedisStorage { p.clone(), ) .await; - interval.tick().await; } }); } @@ -268,6 +286,7 @@ impl CachedRedisStorage { redis_conn_manager, async_redis_storage, partitioned, + flush_tx, }) } From e935047eddbebb678620f53cf550be4f64b0ec27 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Thu, 25 Apr 2024 09:17:24 -0400 Subject: [PATCH 2/2] Replace watch::channel with Notify Signed-off-by: Hiram Chirino --- limitador/src/storage/redis/redis_cached.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a7c40a9d..4eaf14d4 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; +use tokio::sync::Notify; use tracing::{debug_span, error, warn, Instrument}; // This is just a first version. @@ -44,7 +45,7 @@ pub struct CachedRedisStorage { async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, partitioned: Arc, - flush_tx: tokio::sync::watch::Sender<()>, + pub flush_notify: Arc, } #[async_trait] @@ -176,7 +177,7 @@ impl AsyncCounterStorage for CachedRedisStorage { } // ask the flusher to flush the batch - self.flush_tx.send(()).unwrap(); + self.flush_notify.notify_one(); Ok(Authorization::Ok) } @@ -244,7 +245,7 @@ impl CachedRedisStorage { let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); - let (flush_tx, mut flush_rx) = tokio::sync::watch::channel(()); + let flush_notify = Arc::new(Notify::new()); { let storage = async_redis_storage.clone(); @@ -252,11 +253,12 @@ impl CachedRedisStorage { let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); let batcher_flusher = batcher.clone(); + let flush_notify = flush_notify.clone(); tokio::spawn(async move { loop { // Wait for a new flush request, - flush_rx.changed().await.unwrap(); + flush_notify.notified().await; if flushing_period != Duration::ZERO { // Set the flushing_period to reduce the load on Redis the downside to @@ -286,7 +288,7 @@ impl CachedRedisStorage { redis_conn_manager, async_redis_storage, partitioned, - flush_tx, + flush_notify, }) }