Skip to content

Commit

Permalink
Lookup the write behing queue on miss
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 26, 2024
1 parent 87f69a7 commit 9d2b2c9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
32 changes: 24 additions & 8 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,15 @@ impl CountersCacheBuilder {

impl CountersCache {
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> {
self.cache.get(counter)
let option = self.cache.get(counter);
if option.is_none() {
let from_queue = self.batcher.updates.get(counter);

Check warning on line 257 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/limitador/limitador/limitador/src/storage/redis/counters_cache.rs

Check warning on line 257 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/limitador/limitador/limitador/src/storage/redis/counters_cache.rs
if let Some(entry) = from_queue {
self.cache.insert(counter.clone(), entry.value().clone());
return Some(entry.value().clone())
}
}
option
}

pub fn batcher(&self) -> &Batcher {
Expand All @@ -277,7 +285,11 @@ impl CountersCache {
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
let previous = self.cache.get_with(counter.clone(), || {
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl))
if let Some(entry) = self.batcher.updates.get(&counter) {
entry.value().clone()
} else {
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl))
}
});
if previous.expired_at(now) || previous.value.value() < counter_val {
previous.set_from_authority(&counter, counter_val, cache_ttl);
Expand All @@ -295,12 +307,16 @@ impl CountersCache {
pub fn increase_by(&self, counter: &Counter, delta: i64) {
let mut priority = false;
let val = self.cache.get_with_by_ref(counter, || {
priority = true;
Arc::new(
// this TTL is wrong, it needs to be the cache's TTL, not the time window of our limit
// todo fix when introducing the Batcher type!
CachedCounterValue::from(counter, 0, Duration::from_secs(counter.seconds())),
)
if let Some(entry) = self.batcher.updates.get(&counter) {

Check failure on line 310 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 310 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
entry.value().clone()
} else {
priority = true;
Arc::new(
// this TTL is wrong, it needs to be the cache's TTL, not the time window of our limit
// todo fix when introducing the Batcher type!
CachedCounterValue::from(counter, 0, Duration::from_secs(counter.seconds())),
)
}
});
val.delta(counter, delta);
self.batcher.add(counter.clone(), val.clone(), priority);
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {
let partitioned = Arc::new(AtomicBool::new(false));

if let Some(c) = cached_counters.get(&counter) {
assert_eq!(c.hits(&counter), 1);
assert_eq!(c.hits(&counter), 2);
}

flush_batcher_and_update_counters(mock_client, true, cached_counters.clone(), partitioned)
Expand Down

0 comments on commit 9d2b2c9

Please sign in to comment.