Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pending writes within CachedCounterValue #299

Merged
merged 10 commits into from
Apr 30, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lenient_conditions = []

[dependencies]
moka = { version = "0.12", features = ["sync"] }
dashmap = "5.5.3"
getrandom = { version = "0.2", features = ["js"] }
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
Expand Down
205 changes: 192 additions & 13 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,132 @@
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use dashmap::DashMap;
use moka::sync::Cache;
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
use tokio::time::interval;

pub struct CachedCounterValue {
value: AtomicExpiringValue,
initial_value: AtomicI64,
expiry: AtomicExpiryTime,
}

pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DashMap makes it much cleaner and easier to consume <3

notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl Batcher {
fn new(period: Duration) -> Self {
Self {
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut interval = interval(self.interval);
let mut ready = self.updates.len() >= min;
loop {
if ready {
let mut batch = Vec::with_capacity(min);
let mut probably_fake = Vec::with_capacity(min);
for entry in &self.updates {
if entry.value().value.ttl() < self.interval {
batch.push(entry.key().clone());
if batch.len() == min {
break;
}
}
if entry.value().expiry.duration() == Duration::from_secs(entry.key().seconds())
alexsnaps marked this conversation as resolved.
Show resolved Hide resolved
{
probably_fake.push(entry.key().clone());
if probably_fake.len() == min {
break;
}
}
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = probably_fake.into_iter().take(remaining);
batch.append(&mut take.collect());
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = self.updates.iter().take(remaining);
batch.append(&mut take.map(|e| e.key().clone()).collect());
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
for counter in &batch {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
}
Comment on lines +75 to +78
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If after having flushed to redis no additional pending writes were added, we can remove these entries from the queue.

return result;
} else {
ready = select! {
_ = self.notifier.notified() => {
self.updates.len() >= min ||
self.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
},
_ = interval.tick() => true,
}
}
}
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>, priority: bool) {
let priority = priority || value.value.ttl() < self.interval;
self.updates.entry(counter).or_insert(value);
if priority {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we start constraining the batch size sent to Redis, we may want to add the priority flag in the value too so that the first batch we execute contains the priority items first.

Copy link
Member Author

@alexsnaps alexsnaps Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I do this "hack" where I check the TTL on entry… to decide the things that are "important", but I think I should maybe just waste the additional bit… 

self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CachedCounterValue {
pub fn from(counter: &Counter, value: i64, ttl: Duration) -> Self {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())),
initial_value: AtomicI64::new(value),
expiry: AtomicExpiryTime::from_now(ttl),
}
}
Expand All @@ -34,13 +140,57 @@

pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) {
let time_window = Duration::from_secs(counter.seconds());
self.initial_value.store(value, Ordering::SeqCst);
self.value.set(value, time_window);
self.expiry.update(expiry);
}

pub fn delta(&self, counter: &Counter, delta: i64) -> i64 {
self.value
.update(delta, counter.seconds(), SystemTime::now())
let value = self
.value
.update(delta, counter.seconds(), SystemTime::now());
if value == delta {
// new window, invalidate initial value
self.initial_value.store(0, Ordering::SeqCst);
}
value
}

pub fn pending_writes(&self) -> Result<i64, ()> {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
let offset = if start == 0 {
value
} else {
let writes = value - start;
if writes > 0 {
writes
} else {
value
}
};
match self
.initial_value
.compare_exchange(start, value, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => Ok(offset),
Err(newer) => {
if newer == 0 {
// We got expired in the meantime, this fresh value can wait the next iteration
Ok(0)
} else {
// Concurrent call to this method?
// We could support that with a CAS loop in the future if needed
Err(())
}
}
}
}

fn no_pending_writes(&self) -> bool {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
value - start == 0
}

pub fn hits(&self, _: &Counter) -> i64 {
Expand Down Expand Up @@ -90,18 +240,31 @@
self
}

pub fn build(&self) -> CountersCache {
pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
}
}

impl CountersCache {
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> {
self.cache.get(counter)
let option = self.cache.get(counter);
if option.is_none() {
let from_queue = self.batcher.updates.get(counter);

Check warning on line 257 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/limitador/limitador/limitador/src/storage/redis/counters_cache.rs

Check warning on line 257 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/limitador/limitador/limitador/src/storage/redis/counters_cache.rs
if let Some(entry) = from_queue {
self.cache.insert(counter.clone(), entry.value().clone());
return Some(entry.value().clone())
}
}
option
}

pub fn batcher(&self) -> &Batcher {
&self.batcher
}

pub fn insert(
Expand All @@ -122,7 +285,11 @@
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
let previous = self.cache.get_with(counter.clone(), || {
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl))
if let Some(entry) = self.batcher.updates.get(&counter) {
entry.value().clone()
} else {
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl))
}
});
if previous.expired_at(now) || previous.value.value() < counter_val {
previous.set_from_authority(&counter, counter_val, cache_ttl);
Expand All @@ -138,9 +305,21 @@
}

pub fn increase_by(&self, counter: &Counter, delta: i64) {
if let Some(val) = self.cache.get(counter) {
val.delta(counter, delta);
};
let mut priority = false;
let val = self.cache.get_with_by_ref(counter, || {
if let Some(entry) = self.batcher.updates.get(&counter) {

Check failure on line 310 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 310 in limitador/src/storage/redis/counters_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
entry.value().clone()
} else {
priority = true;
Arc::new(
// this TTL is wrong, it needs to be the cache's TTL, not the time window of our limit
// todo fix when introducing the Batcher type!
CachedCounterValue::from(counter, 0, Duration::from_secs(counter.seconds())),
)
}
});
val.delta(counter, delta);
self.batcher.add(counter.clone(), val.clone(), priority);
}

fn ttl_from_redis_ttl(
Expand Down Expand Up @@ -209,7 +388,7 @@
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(10),
Expand All @@ -236,7 +415,7 @@
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());

assert!(cache.get(&counter).is_none());
}
Expand All @@ -258,7 +437,7 @@
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_value),
Expand Down Expand Up @@ -289,7 +468,7 @@
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
None,
Expand Down Expand Up @@ -318,7 +497,7 @@
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_val),
Expand Down
Loading
Loading