Skip to content

Commit

Permalink
Do the priority dance
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 26, 2024
1 parent 620c021 commit 826b8a9
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lenient_conditions = []

[dependencies]
moka = { version = "0.12", features = ["sync"] }
dashmap = "5.5.3"
getrandom = { version = "0.2", features = ["js"] }
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
Expand Down
89 changes: 72 additions & 17 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::storage::redis::{
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use dashmap::DashMap;
use moka::sync::Cache;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
Expand All @@ -20,46 +22,91 @@ pub struct CachedCounterValue {
}

pub struct Batcher {
updates: Mutex<HashMap<Counter, Arc<CachedCounterValue>>>,
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl Batcher {
fn new(period: Duration) -> Self {
Self {
updates: Mutex::new(Default::default()),
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn is_empty(&self) -> bool {
self.updates.lock().unwrap().is_empty()
self.updates.is_empty()
}

pub async fn consume(&self, min: usize) -> HashMap<Counter, Arc<CachedCounterValue>> {
pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut interval = interval(self.interval);
let mut ready = self.updates.lock().unwrap().len() >= min;
let mut ready = self.updates.len() >= min;
loop {
if ready {
return self.consume_all();
let mut batch = Vec::with_capacity(min);
let mut probably_fake = Vec::with_capacity(min);
for entry in &self.updates {
if entry.value().value.ttl() < self.interval {
batch.push(entry.key().clone());
if batch.len() == min {
break;
}
}
if entry.value().expiry.duration() == Duration::from_secs(entry.key().seconds())
{
probably_fake.push(entry.key().clone());
if probably_fake.len() == min {
break;
}
}
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = probably_fake.into_iter().take(remaining);
batch.append(&mut take.collect());
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = self.updates.iter().take(remaining);
batch.append(&mut take.map(|e| e.key().clone()).collect());
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
for counter in &batch {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
}
return result;
} else {
ready = select! {
_ = self.notifier.notified() => self.updates.lock().unwrap().len() >= min,
_ = self.notifier.notified() => {
self.updates.len() >= min ||
self.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
},
_ = interval.tick() => true,
}
}
}
}

pub fn consume_all(&self) -> HashMap<Counter, Arc<CachedCounterValue>> {
let mut batch = self.updates.lock().unwrap();
std::mem::take(&mut *batch)
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
self.updates.lock().unwrap().entry(counter).or_insert(value);
pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>, priority: bool) {
let priority = priority || value.value.ttl() < self.interval;
self.updates.entry(counter).or_insert(value);
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}
}
Expand Down Expand Up @@ -140,6 +187,12 @@ impl CachedCounterValue {
}
}

fn no_pending_writes(&self) -> bool {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
value - start == 0
}

pub fn hits(&self, _: &Counter) -> i64 {
self.value.value_at(SystemTime::now())
}
Expand Down Expand Up @@ -240,15 +293,17 @@ impl CountersCache {
}

pub fn increase_by(&self, counter: &Counter, delta: i64) {
let mut priority = false;
let val = self.cache.get_with_by_ref(counter, || {
priority = true;
Arc::new(
// this TTL is wrong, it needs to be the cache's TTL, not the time window of our limit
// todo fix when introducing the Batcher type!
CachedCounterValue::from(counter, 0, Duration::from_secs(counter.seconds())),
)
});
val.delta(counter, delta);
self.batcher.add(counter.clone(), val.clone());
self.batcher.add(counter.clone(), val.clone(), priority);
}

fn ttl_from_redis_ttl(
Expand Down
11 changes: 6 additions & 5 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,9 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
flip_partitioned(&partitioned, false);
}
} else {
let counters = cached_counters.batcher().consume(1).await;

let time_start_update_counters = Instant::now();

let updated_counters = update_counters(&mut redis_conn, counters)
let updated_counters = cached_counters
.batcher()
.consume(1, |counters| update_counters(&mut redis_conn, counters))
.await
.or_else(|err| {
if err.is_transient() {
Expand All @@ -438,6 +436,8 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
})
.expect("Unrecoverable Redis error!");

let time_start_update_counters = Instant::now();

for (counter, value, ttl) in updated_counters {
cached_counters.insert(
counter,
Expand Down Expand Up @@ -568,6 +568,7 @@ mod tests {
2,
Duration::from_secs(60),
)),
false,
);
cache.insert(
counter.clone(),
Expand Down

0 comments on commit 826b8a9

Please sign in to comment.