Skip to content

Commit

Permalink
Support pending writes within CachedCounterValue
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 25, 2024
1 parent b7c748a commit 90bdcfb
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use crate::storage::redis::{
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use moka::sync::Cache;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

pub struct CachedCounterValue {
value: AtomicExpiringValue,
initial_value: AtomicI64,
expiry: AtomicExpiryTime,
}

Expand All @@ -24,6 +26,7 @@ impl CachedCounterValue {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())),
initial_value: AtomicI64::new(value),
expiry: AtomicExpiryTime::from_now(ttl),
}
}
Expand All @@ -39,8 +42,45 @@ impl CachedCounterValue {
}

pub fn delta(&self, counter: &Counter, delta: i64) -> i64 {
self.value
.update(delta, counter.seconds(), SystemTime::now())
let value = self
.value
.update(delta, counter.seconds(), SystemTime::now());
if value == delta {
// new window, invalidate initial value
self.initial_value.store(0, Ordering::SeqCst);
}
value
}

pub fn pending_writes(&self) -> Result<i64, ()> {

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Check

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Bench

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Check

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Bench

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

method `pending_writes` is never used

Check failure on line 55 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `pending_writes` is never used
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
let offset = if start == 0 {
value
} else {
let writes = value - start;
if writes > 0 {
writes
} else {
value
}
};
match self
.initial_value
.compare_exchange(start, offset, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => Ok(offset),
Err(newer) => {
if newer == 0 {
// We got expired in the meantime, this fresh value can wait the next iteration
Ok(0)
} else {
// Concurrent call to this method?
// We could support that with a CAS loop in the future if needed
Err(())
}
}
}
}

pub fn hits(&self, _: &Counter) -> i64 {
Expand Down

0 comments on commit 90bdcfb

Please sign in to comment.