From 15f51ba7031135166caf857a3a3d4b57c7705dc9 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 23 May 2024 12:11:13 -0400 Subject: [PATCH] Got rid of the namespace indirection Signed-off-by: Alex Snaps --- limitador/src/storage/in_memory.rs | 114 ++++++++++------------------- 1 file changed, 39 insertions(+), 75 deletions(-) diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 2fedb7e8..afa15d49 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -9,29 +9,26 @@ use std::ops::Deref; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; -type NamespacedLimitCounters = HashMap>; - pub struct InMemoryStorage { - limits_for_namespace: RwLock>, + simple_limits: RwLock>, qualified_counters: Cache>, } impl CounterStorage for InMemoryStorage { #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: u64) -> Result { - let limits_by_namespace = self.limits_for_namespace.read().unwrap(); - - let mut value = 0; - - if counter.is_qualified() { - if let Some(counter) = self.qualified_counters.get(counter) { - value = counter.value(); - } - } else if let Some(limits) = limits_by_namespace.get(counter.limit().namespace()) { - if let Some(counter) = limits.get(counter.limit()) { - value = counter.value(); - } - } + let value = if counter.is_qualified() { + self.qualified_counters + .get(counter) + .map(|c| c.value()) + .unwrap_or_default() + } else { + let limits_by_namespace = self.simple_limits.read().unwrap(); + limits_by_namespace + .get(counter.limit()) + .map(|c| c.value()) + .unwrap_or_default() + }; Ok(counter.max_value() >= value + delta) } @@ -39,19 +36,15 @@ impl CounterStorage for InMemoryStorage { #[tracing::instrument(skip_all)] fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> { if limit.variables().is_empty() { - let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); - limits_by_namespace - .entry(limit.namespace().clone()) - .or_default() - .entry(limit.clone()) - .or_default(); + let mut limits_by_namespace = self.simple_limits.write().unwrap(); + limits_by_namespace.entry(limit.clone()).or_default(); } Ok(()) } #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: u64) -> Result<(), StorageErr> { - let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); + let mut counters = self.simple_limits.write().unwrap(); let now = SystemTime::now(); if counter.is_qualified() { let value = match self.qualified_counters.get(counter) { @@ -62,23 +55,13 @@ impl CounterStorage for InMemoryStorage { }; value.update(delta, counter.window(), now); } else { - match limits_by_namespace.entry(counter.limit().namespace().clone()) { + match counters.entry(counter.limit().clone()) { Entry::Vacant(v) => { - let mut limits = HashMap::new(); - limits.insert( - counter.limit().clone(), - AtomicExpiringValue::new(delta, now + counter.window()), - ); - v.insert(limits); + v.insert(AtomicExpiringValue::new(delta, now + counter.window())); + } + Entry::Occupied(o) => { + o.get().update(delta, counter.window(), now); } - Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { - Entry::Vacant(v) => { - v.insert(AtomicExpiringValue::new(delta, now + counter.window())); - } - Entry::Occupied(o) => { - o.get().update(delta, counter.window(), now); - } - }, } } Ok(()) @@ -91,7 +74,7 @@ impl CounterStorage for InMemoryStorage { delta: u64, load_counters: bool, ) -> Result { - let limits_by_namespace = self.limits_for_namespace.read().unwrap(); + let limits_by_namespace = self.simple_limits.read().unwrap(); let mut first_limited = None; let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new(); let mut qualified_counter_values_to_updated: Vec<(Arc, Duration)> = @@ -119,10 +102,8 @@ impl CounterStorage for InMemoryStorage { // Process simple counters for counter in counters.iter_mut().filter(|c| !c.is_qualified()) { - let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace - .get(counter.limit().namespace()) - .and_then(|limits| limits.get(counter.limit())) - .unwrap(); + let atomic_expiring_value: &AtomicExpiringValue = + limits_by_namespace.get(counter.limit()).unwrap(); if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) { if !load_counters { @@ -135,7 +116,7 @@ impl CounterStorage for InMemoryStorage { // Process qualified counters for counter in counters.iter_mut().filter(|c| c.is_qualified()) { let value = match self.qualified_counters.get(counter) { - None => self.qualified_counters.get_with(counter.clone(), || { + None => self.qualified_counters.get_with_by_ref(counter, || { Arc::new(AtomicExpiringValue::new(0, now + counter.window())) }), Some(counter) => counter, @@ -171,24 +152,14 @@ impl CounterStorage for InMemoryStorage { fn get_counters(&self, limits: &HashSet>) -> Result, StorageErr> { let mut res = HashSet::new(); - let namespaces: HashSet<&Namespace> = limits.iter().map(|l| l.namespace()).collect(); - let limits_by_namespace = self.limits_for_namespace.read().unwrap(); - - for namespace in namespaces { - if let Some(limits) = limits_by_namespace.get(namespace) { - for limit in limits.keys() { - if limits.contains_key(limit) { - for (counter, expiring_value) in self.counters_in_namespace(namespace) { - let mut counter_with_val = counter.clone(); - counter_with_val.set_remaining( - counter_with_val.max_value() - expiring_value.value(), - ); - counter_with_val.set_expires_in(expiring_value.ttl()); - if counter_with_val.expires_in().unwrap() > Duration::ZERO { - res.insert(counter_with_val); - } - } - } + for limit in limits { + for (counter, expiring_value) in self.counters_in_namespace(limit.namespace()) { + let mut counter_with_val = counter.clone(); + counter_with_val + .set_remaining(counter_with_val.max_value() - expiring_value.value()); + counter_with_val.set_expires_in(expiring_value.ttl()); + if counter_with_val.expires_in().unwrap() > Duration::ZERO { + res.insert(counter_with_val); } } } @@ -218,7 +189,7 @@ impl CounterStorage for InMemoryStorage { #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { - self.limits_for_namespace.write().unwrap().clear(); + self.simple_limits.write().unwrap().clear(); Ok(()) } } @@ -226,7 +197,7 @@ impl CounterStorage for InMemoryStorage { impl InMemoryStorage { pub fn new(cache_size: u64) -> Self { Self { - limits_for_namespace: RwLock::new(HashMap::new()), + simple_limits: RwLock::new(HashMap::new()), qualified_counters: Cache::new(cache_size), } } @@ -237,11 +208,11 @@ impl InMemoryStorage { ) -> HashMap { let mut res: HashMap = HashMap::new(); - if let Some(counters_by_limit) = self.limits_for_namespace.read().unwrap().get(namespace) { - for (limit, value) in counters_by_limit { + for (limit, counter) in self.simple_limits.read().unwrap().iter() { + if limit.namespace() == namespace { res.insert( Counter::new(limit.clone(), HashMap::default()), - value.clone(), + counter.clone(), ); } } @@ -256,14 +227,7 @@ impl InMemoryStorage { } fn delete_counters_of_limit(&self, limit: &Limit) { - if let Some(counters_by_limit) = self - .limits_for_namespace - .write() - .unwrap() - .get_mut(limit.namespace()) - { - counters_by_limit.remove(limit); - } + self.simple_limits.write().unwrap().remove(limit); } fn counter_is_within_limits(counter: &Counter, current_val: Option<&u64>, delta: u64) -> bool {