Skip to content

Commit

Permalink
Async flush either periodically or on batch size being reached
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 25, 2024
1 parent d4eed7b commit d5c6e4a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
41 changes: 32 additions & 9 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
use tokio::time::interval;

pub struct CachedCounterValue {
value: AtomicExpiringValue,
Expand All @@ -18,32 +21,52 @@ pub struct CachedCounterValue {

pub struct Batcher {
updates: Mutex<HashMap<Counter, Arc<CachedCounterValue>>>,
notifier: Notify,
interval: Duration,
}

impl Batcher {
fn new() -> Self {
fn new(period: Duration) -> Self {
Self {
updates: Mutex::new(Default::default()),
notifier: Default::default(),
interval: period,
}
}

pub fn is_empty(&self) -> bool {
self.updates.lock().unwrap().is_empty()
}

pub async fn consume(&self, min: usize) -> HashMap<Counter, Arc<CachedCounterValue>> {
let mut interval = interval(self.interval);
let mut ready = self.updates.lock().unwrap().len() >= min;
loop {
if ready {
return self.consume_all();
} else {
ready = select! {
_ = self.notifier.notified() => self.updates.lock().unwrap().len() >= min,
_ = interval.tick() => true,
}
}
}
}

pub fn consume_all(&self) -> HashMap<Counter, Arc<CachedCounterValue>> {
let mut batch = self.updates.lock().unwrap();
std::mem::take(&mut *batch)
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
self.updates.lock().unwrap().entry(counter).or_insert(value);
self.notifier.notify_one();
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new()
Self::new(Duration::from_millis(100))
}
}

Expand Down Expand Up @@ -164,12 +187,12 @@ impl CountersCacheBuilder {
self
}

pub fn build(&self) -> CountersCache {
pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Default::default(),
batcher: Batcher::new(period),
}
}
}
Expand Down Expand Up @@ -294,7 +317,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(10),
Expand All @@ -321,7 +344,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());

assert!(cache.get(&counter).is_none());
}
Expand All @@ -343,7 +366,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_value),
Expand Down Expand Up @@ -374,7 +397,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
None,
Expand Down Expand Up @@ -403,7 +426,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_val),
Expand Down
6 changes: 3 additions & 3 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl CachedRedisStorage {
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();
.build(flushing_period);

let counters_cache = Arc::new(cached_counters);
let partitioned = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -422,7 +422,7 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
flip_partitioned(&partitioned, false);
}
} else {
let counters = cached_counters.batcher().consume_all();
let counters = cached_counters.batcher().consume(1).await;

let time_start_update_counters = Instant::now();

Expand Down Expand Up @@ -560,7 +560,7 @@ mod tests {
Ok(mock_response.clone()),
)]);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::from_millis(1));
cache.batcher().add(
counter.clone(),
Arc::new(CachedCounterValue::from(
Expand Down

0 comments on commit d5c6e4a

Please sign in to comment.