Skip to content

Commit

Permalink
[refactor] Using new AtomicExpiringValue update fn
Browse files Browse the repository at this point in the history
  • Loading branch information
didierofrivia committed Jul 26, 2023
1 parent 1748bba commit 53ae626
Showing 1 changed file with 28 additions and 38 deletions.
66 changes: 28 additions & 38 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ impl CounterStorage for InMemoryStorage {
) -> Result<Authorization, StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let now = SystemTime::now();

let mut process_counter =
|counter: &mut Counter, value: i64, delta: i64| -> Option<Authorization> {
Expand All @@ -123,56 +125,44 @@ impl CounterStorage for InMemoryStorage {
None
};

for counter in counters.iter_mut() {
if counter.max_value() < delta {
if let Some(limited) = process_counter(counter, 0, delta) {
if !load_counters {
return Ok(limited);
}
}
continue;
}
// Normalize counters and values
for counter in counters.iter() {
limits_by_namespace
.entry(counter.limit().namespace().clone())
.or_insert_with(HashMap::new)
.entry(counter.limit().clone())
.or_insert_with(HashMap::new)
.entry(counter.into())
.or_insert_with(|| {
AtomicExpiringValue::new(0, now + Duration::from_secs(counter.seconds()))
});
}

let value = Some(
limits_by_namespace
.get(counter.limit().namespace())
.and_then(|limits| limits.get(counter.limit()))
.and_then(|counters| counters.get(&counter.into()))
.map(|expiring_value| expiring_value.value())
.unwrap_or(0),
);
// Process counters
for counter in counters.iter_mut() {
let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace
.get(counter.limit().namespace())
.and_then(|limits| limits.get(counter.limit()))
.and_then(|counters| counters.get(&counter.into()))
.unwrap();

if let Some(limited) = process_counter(counter, value.unwrap(), delta) {
if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) {
if !load_counters {
return Ok(limited);
}
}

counter_values_to_update.push((atomic_expiring_value, counter.seconds()));
}

if let Some(limited) = first_limited {
return Ok(limited);
}

for counter in counters.iter_mut() {
let now = SystemTime::now();
match limits_by_namespace
.entry(counter.limit().namespace().clone())
.or_insert_with(HashMap::new)
.entry(counter.limit().clone())
.or_insert_with(HashMap::new)
.entry(counter.into())
{
Entry::Vacant(v) => {
v.insert(AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
));
}
Entry::Occupied(o) => {
o.get().update(delta, counter.seconds(), now);
}
}
}
// Update counters
counter_values_to_update
.iter()
.for_each(|(v, ttl)| v.update(delta, *ttl, now));

Ok(Authorization::Ok)
}
Expand Down

0 comments on commit 53ae626

Please sign in to comment.