diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 2f2b22ca..d7ea6de7 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -5,11 +5,13 @@ use crate::storage::redis::{ DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; use moka::sync::Cache; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; pub struct CachedCounterValue { value: AtomicExpiringValue, + initial_value: AtomicI64, expiry: AtomicExpiryTime, } @@ -24,6 +26,7 @@ impl CachedCounterValue { let now = SystemTime::now(); Self { value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), + initial_value: AtomicI64::new(value), expiry: AtomicExpiryTime::from_now(ttl), } } @@ -39,8 +42,46 @@ impl CachedCounterValue { } pub fn delta(&self, counter: &Counter, delta: i64) -> i64 { - self.value - .update(delta, counter.seconds(), SystemTime::now()) + let value = self + .value + .update(delta, counter.seconds(), SystemTime::now()); + if value == delta { + // new window, invalidate initial value + self.initial_value.store(0, Ordering::SeqCst); + } + value + } + + #[allow(dead_code)] + pub fn pending_writes(&self) -> Result { + let start = self.initial_value.load(Ordering::SeqCst); + let value = self.value.value_at(SystemTime::now()); + let offset = if start == 0 { + value + } else { + let writes = value - start; + if writes > 0 { + writes + } else { + value + } + }; + match self + .initial_value + .compare_exchange(start, offset, Ordering::SeqCst, Ordering::SeqCst) + { + Ok(_) => Ok(offset), + Err(newer) => { + if newer == 0 { + // We got expired in the meantime, this fresh value can wait the next iteration + Ok(0) + } else { + // Concurrent call to this method? + // We could support that with a CAS loop in the future if needed + Err(()) + } + } + } } pub fn hits(&self, _: &Counter) -> i64 {