Skip to content

Commit

Permalink
Merge pull request #199 from Kuadrant/normalize-counter-values
Browse files Browse the repository at this point in the history
Normalize counter values
  • Loading branch information
didierofrivia authored Aug 14, 2023
2 parents 58758c2 + c96f478 commit 490826c
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 16 deletions.
4 changes: 4 additions & 0 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl CounterStorage for RocksDbStorage {
Ok(counter.max_value() >= value.value() + delta)
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let key = key_for_counter(counter);
self.insert_or_update(&key, counter, delta)?;
Expand Down
23 changes: 13 additions & 10 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ impl CounterStorage for InMemoryStorage {
Ok(counter.max_value() >= value + delta)
}

fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> {
if limit.variables().is_empty() {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
limits_by_namespace
.entry(limit.namespace().clone())
.or_insert_with(HashMap::new)
.entry(limit.clone())
.or_insert_with(AtomicExpiringValue::default);
}
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let now = SystemTime::now();
Expand Down Expand Up @@ -84,7 +96,7 @@ impl CounterStorage for InMemoryStorage {
delta: i64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let limits_by_namespace = self.limits_for_namespace.write().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
Expand All @@ -110,15 +122,6 @@ impl CounterStorage for InMemoryStorage {
None
};

// Normalize counters and values
for counter in counters.iter().filter(|c| !c.is_qualified()) {
limits_by_namespace
.entry(counter.limit().namespace().clone())
.or_insert_with(HashMap::new)
.entry(counter.limit().clone())
.or_insert_with(AtomicExpiringValue::default);
}

// Process simple counters
for counter in counters.iter_mut().filter(|c| !c.is_qualified()) {
let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace
Expand Down
10 changes: 4 additions & 6 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ impl Storage {

pub fn add_limit(&self, limit: Limit) -> bool {
let namespace = limit.namespace().clone();
self.limits
.write()
.unwrap()
.entry(namespace)
.or_default()
.insert(limit)
let mut limits = self.limits.write().unwrap();
self.counters.add_counter(&limit).unwrap();
limits.entry(namespace).or_default().insert(limit)
}

pub fn update_limit(&self, update: &Limit) -> bool {
Expand Down Expand Up @@ -263,6 +260,7 @@ impl AsyncStorage {

pub trait CounterStorage: Sync + Send {
fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr>;
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr>;
fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>;
fn check_and_update(
&self,
Expand Down
4 changes: 4 additions & 0 deletions limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl CounterStorage for RedisStorage {
}
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut con = self.conn_pool.get()?;

Expand Down
4 changes: 4 additions & 0 deletions limitador/src/storage/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl CounterStorage for WasmStorage {
Ok(self.counter_is_within_limits(counter, stored_counters.get(counter), delta))
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut counters = self.counters.write().unwrap();
self.insert_or_update_counter(&mut counters, counter, delta);
Expand Down

0 comments on commit 490826c

Please sign in to comment.