diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 232ba1a5..dcc44569 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -137,17 +137,8 @@ impl Limiter { ) -> CachedRedisStorage { // TODO: Not all the options are configurable via ENV. Add them as needed. - let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url); - - if cache_cfg.flushing_period < 0 { - cached_redis_storage = cached_redis_storage.flushing_period(None) - } else { - cached_redis_storage = cached_redis_storage.flushing_period(Some( - Duration::from_millis(cache_cfg.flushing_period as u64), - )) - } - - cached_redis_storage = cached_redis_storage + let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url) + .flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64)) .max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl)) .ttl_ratio_cached_counters(cache_cfg.ttl_ratio) .max_cached_counters(cache_cfg.max_counters) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 3b13c281..6d725c0e 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -43,7 +43,6 @@ pub struct CachedRedisStorage { batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, - batching_is_enabled: bool, partitioned: Arc, } @@ -139,9 +138,7 @@ impl AsyncCounterStorage for CachedRedisStorage { counter_ttls_msecs[i], ttl_margin, ); - let remaining = counter.max_value() - - counter_vals[i].unwrap_or(0) - - delta; + let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta; if first_limited.is_none() && remaining < 0 { first_limited = Some(Authorization::Limited( counter.limit().name().map(|n| n.to_owned()), @@ -168,24 +165,9 @@ impl AsyncCounterStorage for CachedRedisStorage { } // Batch or update depending on configuration - if self.is_partitioned() || self.batching_is_enabled { - let mut batcher = self.batcher_counter_updates.lock().unwrap(); - for counter in counters.iter() { - Self::batch_counter(delta, &mut batcher, counter); - } - } else { - for counter in counters.iter() { - self.update_counter(counter, delta).await.or_else(|err| { - if err.is_transient() { - self.partitioned(true); - let mut batcher = self.batcher_counter_updates.lock().unwrap(); - Self::batch_counter(delta, &mut batcher, counter); - Ok(()) - } else { - Err(err) - } - })? - } + let mut batcher = self.batcher_counter_updates.lock().unwrap(); + for counter in counters.iter() { + Self::batch_counter(delta, &mut batcher, counter); } Ok(Authorization::Ok) @@ -211,7 +193,7 @@ impl CachedRedisStorage { pub async fn new(redis_url: &str) -> Result { Self::new_with_options( redis_url, - Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), DEFAULT_MAX_CACHED_COUNTERS, Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), DEFAULT_TTL_RATIO_CACHED_COUNTERS, @@ -222,7 +204,7 @@ impl CachedRedisStorage { async fn new_with_options( redis_url: &str, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, @@ -247,40 +229,38 @@ impl CachedRedisStorage { let storage = async_redis_storage.clone(); let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); let p = Arc::clone(&partitioned); - if let Some(flushing_period) = flushing_period { - let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); - tokio::spawn(async move { - loop { - if p.load(Ordering::Acquire) { - if storage.is_alive().await { - warn!("Partition to Redis resolved!"); - p.store(false, Ordering::Release); - } - } else { - let counters = { - let mut batch = batcher_flusher.lock().unwrap(); - std::mem::take(&mut *batch) - }; - for (counter, delta) in counters { - storage - .update_counter(&counter, delta) - .await - .or_else(|err| { - if err.is_transient() { - p.store(true, Ordering::Release); - Ok(()) - } else { - Err(err) - } - }) - .expect("Unrecoverable Redis error!"); - } + let batcher_flusher = batcher.clone(); + let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { + loop { + if p.load(Ordering::Acquire) { + if storage.is_alive().await { + warn!("Partition to Redis resolved!"); + p.store(false, Ordering::Release); + } + } else { + let counters = { + let mut batch = batcher_flusher.lock().unwrap(); + std::mem::take(&mut *batch) + }; + for (counter, delta) in counters { + storage + .update_counter(&counter, delta) + .await + .or_else(|err| { + if err.is_transient() { + p.store(true, Ordering::Release); + Ok(()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); } - interval.tick().await; } - }); - } + interval.tick().await; + } + }); let cached_counters = CountersCacheBuilder::new() .max_cached_counters(max_cached_counters) @@ -293,7 +273,6 @@ impl CachedRedisStorage { batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, - batching_is_enabled: flushing_period.is_some(), partitioned, }) } @@ -379,7 +358,7 @@ impl CachedRedisStorage { pub struct CachedRedisStorageBuilder { redis_url: String, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, max_ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, @@ -390,7 +369,7 @@ impl CachedRedisStorageBuilder { pub fn new(redis_url: &str) -> Self { Self { redis_url: redis_url.to_string(), - flushing_period: Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, @@ -398,7 +377,7 @@ impl CachedRedisStorageBuilder { } } - pub fn flushing_period(mut self, flushing_period: Option) -> Self { + pub fn flushing_period(mut self, flushing_period: Duration) -> Self { self.flushing_period = flushing_period; self } diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index d300c46f..47634674 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -683,7 +683,17 @@ mod test { for counter in result.counters.iter() { if let Some(ttl) = counter.expires_in() { +<<<<<<< HEAD assert!(ttl.as_secs() <= 60); +======= + let ttl_secs = ttl.as_secs(); + assert!( + ttl_secs <= 60, + "Unexpected ttl of {} seconds after {} hits", + ttl_secs, + hit + ); +>>>>>>> 59d1296 (Always use the batch writes to redis) } assert_eq!(counter.remaining().unwrap(), 3 - (hit + 1)); }