diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index 9b03e141..0e4a34d0 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -24,6 +24,10 @@ impl CounterStorage for RocksDbStorage { Ok(counter.max_value() >= value.value() + delta) } + fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { + Ok(()) + } + fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let key = key_for_counter(counter); self.insert_or_update(&key, counter, delta)?; diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 244d8b86..485a576c 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -35,6 +35,18 @@ impl CounterStorage for InMemoryStorage { Ok(counter.max_value() >= value + delta) } + fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> { + if limit.variables().is_empty() { + let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); + limits_by_namespace + .entry(limit.namespace().clone()) + .or_insert_with(HashMap::new) + .entry(limit.clone()) + .or_insert_with(AtomicExpiringValue::default); + } + Ok(()) + } + fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); let now = SystemTime::now(); @@ -84,7 +96,7 @@ impl CounterStorage for InMemoryStorage { delta: i64, load_counters: bool, ) -> Result { - let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); + let limits_by_namespace = self.limits_for_namespace.write().unwrap(); let mut first_limited = None; let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = @@ -110,15 +122,6 @@ impl CounterStorage for InMemoryStorage { None }; - // Normalize counters and values - for counter in counters.iter().filter(|c| !c.is_qualified()) { - limits_by_namespace - .entry(counter.limit().namespace().clone()) - .or_insert_with(HashMap::new) - .entry(counter.limit().clone()) - .or_insert_with(AtomicExpiringValue::default); - } - // Process simple counters for counter in counters.iter_mut().filter(|c| !c.is_qualified()) { let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index a269615c..97b702e0 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -57,12 +57,9 @@ impl Storage { pub fn add_limit(&self, limit: Limit) -> bool { let namespace = limit.namespace().clone(); - self.limits - .write() - .unwrap() - .entry(namespace) - .or_default() - .insert(limit) + let mut limits = self.limits.write().unwrap(); + self.counters.add_counter(&limit).unwrap(); + limits.entry(namespace).or_default().insert(limit) } pub fn update_limit(&self, update: &Limit) -> bool { @@ -263,6 +260,7 @@ impl AsyncStorage { pub trait CounterStorage: Sync + Send { fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result; + fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr>; fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>; fn check_and_update( &self, diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index eeb654bd..8d0861e5 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -32,6 +32,10 @@ impl CounterStorage for RedisStorage { } } + fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { + Ok(()) + } + fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; diff --git a/limitador/src/storage/wasm.rs b/limitador/src/storage/wasm.rs index 1d4237c1..b62c4a66 100644 --- a/limitador/src/storage/wasm.rs +++ b/limitador/src/storage/wasm.rs @@ -85,6 +85,10 @@ impl CounterStorage for WasmStorage { Ok(self.counter_is_within_limits(counter, stored_counters.get(counter), delta)) } + fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { + Ok(()) + } + fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut counters = self.counters.write().unwrap(); self.insert_or_update_counter(&mut counters, counter, delta);