From d97adf05af821e63555e7b607d111e7b1b0b526a Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 30 Apr 2024 07:16:34 -0400 Subject: [PATCH 1/5] Refactored the Batcher.consume method --- limitador/src/storage/redis/counters_cache.rs | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index c9af9091..938bcad8 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -44,27 +44,25 @@ impl Batcher { self.updates.is_empty() } - pub async fn consume(&self, min: usize, consumer: F) -> O + pub async fn consume(&self, max: usize, consumer: F) -> O where F: FnOnce(HashMap>) -> Fut, Fut: Future, { let mut interval = interval(self.interval); - let mut ready = self.updates.len() >= min; + let mut ready = self.batch_ready(max); loop { if ready { - let mut batch = Vec::with_capacity(min); - for entry in &self.updates { - if entry.value().requires_fast_flush(&self.interval) { - batch.push(entry.key().clone()); - if batch.len() == min { - break; - } - } - } - if let Some(remaining) = min.checked_sub(batch.len()) { - let take = self.updates.iter().take(remaining); - batch.append(&mut take.map(|e| e.key().clone()).collect()); + let mut batch = Vec::with_capacity(max); + batch.extend( + self.updates + .iter() + .filter(|entry| entry.value().requires_fast_flush(&self.interval)) + .take(max) + .map(|e| e.key().clone()), + ); + if let Some(remaining) = max.checked_sub(batch.len()) { + batch.extend(self.updates.iter().take(remaining).map(|e| e.key().clone())); } let mut result = HashMap::new(); for counter in &batch { @@ -72,19 +70,13 @@ impl Batcher { result.insert(counter.clone(), value); } let result = consumer(result).await; - for counter in &batch { - self.updates - .remove_if(counter, |_, v| v.no_pending_writes()); - } + batch.iter().for_each(|counter| { + self.updates.remove_if(counter, |_, v| v.no_pending_writes()); + }); return result; } else { ready = select! { - _ = self.notifier.notified() => { - self.updates.len() >= min || - self.priority_flush - .compare_exchange(true, false, Ordering::Release, Ordering::Acquire) - .is_ok() - }, + _ = self.notifier.notified() => self.batch_ready(max), _ = interval.tick() => true, } } @@ -109,6 +101,13 @@ impl Batcher { } self.notifier.notify_one(); } + + fn batch_ready(&self, size: usize) -> bool { + self.updates.len() >= size || + self.priority_flush + .compare_exchange(true, false, Ordering::Release, Ordering::Acquire) + .is_ok() + } } impl Default for Batcher { From f393527e1ce3f939e5eb5e9b7a05149c684d27cc Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 30 Apr 2024 07:20:40 -0400 Subject: [PATCH 2/5] Reordered things to make navigating the code easier --- limitador/src/storage/redis/counters_cache.rs | 252 +++++++++--------- 1 file changed, 127 insertions(+), 125 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 938bcad8..49f7f6c6 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -23,106 +23,6 @@ pub struct CachedCounterValue { from_authority: AtomicBool, } -pub struct Batcher { - updates: DashMap>, - notifier: Notify, - interval: Duration, - priority_flush: AtomicBool, -} - -impl Batcher { - fn new(period: Duration) -> Self { - Self { - updates: Default::default(), - notifier: Default::default(), - interval: period, - priority_flush: AtomicBool::new(false), - } - } - - pub fn is_empty(&self) -> bool { - self.updates.is_empty() - } - - pub async fn consume(&self, max: usize, consumer: F) -> O - where - F: FnOnce(HashMap>) -> Fut, - Fut: Future, - { - let mut interval = interval(self.interval); - let mut ready = self.batch_ready(max); - loop { - if ready { - let mut batch = Vec::with_capacity(max); - batch.extend( - self.updates - .iter() - .filter(|entry| entry.value().requires_fast_flush(&self.interval)) - .take(max) - .map(|e| e.key().clone()), - ); - if let Some(remaining) = max.checked_sub(batch.len()) { - batch.extend(self.updates.iter().take(remaining).map(|e| e.key().clone())); - } - let mut result = HashMap::new(); - for counter in &batch { - let value = self.updates.get(counter).unwrap().clone(); - result.insert(counter.clone(), value); - } - let result = consumer(result).await; - batch.iter().for_each(|counter| { - self.updates.remove_if(counter, |_, v| v.no_pending_writes()); - }); - return result; - } else { - ready = select! { - _ = self.notifier.notified() => self.batch_ready(max), - _ = interval.tick() => true, - } - } - } - } - - pub fn add(&self, counter: Counter, value: Arc) { - let priority = value.requires_fast_flush(&self.interval); - match self.updates.entry(counter.clone()) { - Entry::Occupied(needs_merge) => { - let arc = needs_merge.get(); - if !Arc::ptr_eq(arc, &value) { - arc.delta(&counter, value.pending_writes().unwrap()); - } - } - Entry::Vacant(miss) => { - miss.insert_entry(value); - } - }; - if priority { - self.priority_flush.store(true, Ordering::Release); - } - self.notifier.notify_one(); - } - - fn batch_ready(&self, size: usize) -> bool { - self.updates.len() >= size || - self.priority_flush - .compare_exchange(true, false, Ordering::Release, Ordering::Acquire) - .is_ok() - } -} - -impl Default for Batcher { - fn default() -> Self { - Self::new(Duration::from_millis(100)) - } -} - -pub struct CountersCache { - max_ttl_cached_counters: Duration, - pub ttl_ratio_cached_counters: u64, - cache: Cache>, - batcher: Batcher, -} - impl CachedCounterValue { pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self { let now = SystemTime::now(); @@ -228,46 +128,108 @@ impl CachedCounterValue { } } -pub struct CountersCacheBuilder { - max_cached_counters: usize, - max_ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, +pub struct Batcher { + updates: DashMap>, + notifier: Notify, + interval: Duration, + priority_flush: AtomicBool, } -impl CountersCacheBuilder { - pub fn new() -> Self { +impl Batcher { + fn new(period: Duration) -> Self { Self { - max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, - max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), - ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, + updates: Default::default(), + notifier: Default::default(), + interval: period, + priority_flush: AtomicBool::new(false), } } - pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self { - self.max_cached_counters = max_cached_counters; - self + pub fn add(&self, counter: Counter, value: Arc) { + let priority = value.requires_fast_flush(&self.interval); + match self.updates.entry(counter.clone()) { + Entry::Occupied(needs_merge) => { + let arc = needs_merge.get(); + if !Arc::ptr_eq(arc, &value) { + arc.delta(&counter, value.pending_writes().unwrap()); + } + } + Entry::Vacant(miss) => { + miss.insert_entry(value); + } + }; + if priority { + self.priority_flush.store(true, Ordering::Release); + } + self.notifier.notify_one(); } - pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self { - self.max_ttl_cached_counters = max_ttl_cached_counter; - self + pub async fn consume(&self, max: usize, consumer: F) -> O + where + F: FnOnce(HashMap>) -> Fut, + Fut: Future, + { + let mut interval = interval(self.interval); + let mut ready = self.batch_ready(max); + loop { + if ready { + let mut batch = Vec::with_capacity(max); + batch.extend( + self.updates + .iter() + .filter(|entry| entry.value().requires_fast_flush(&self.interval)) + .take(max) + .map(|e| e.key().clone()), + ); + if let Some(remaining) = max.checked_sub(batch.len()) { + batch.extend(self.updates.iter().take(remaining).map(|e| e.key().clone())); + } + let mut result = HashMap::new(); + for counter in &batch { + let value = self.updates.get(counter).unwrap().clone(); + result.insert(counter.clone(), value); + } + let result = consumer(result).await; + batch.iter().for_each(|counter| { + self.updates + .remove_if(counter, |_, v| v.no_pending_writes()); + }); + return result; + } else { + ready = select! { + _ = self.notifier.notified() => self.batch_ready(max), + _ = interval.tick() => true, + } + } + } } - pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self { - self.ttl_ratio_cached_counters = ttl_ratio_cached_counter; - self + pub fn is_empty(&self) -> bool { + self.updates.is_empty() } - pub fn build(&self, period: Duration) -> CountersCache { - CountersCache { - max_ttl_cached_counters: self.max_ttl_cached_counters, - ttl_ratio_cached_counters: self.ttl_ratio_cached_counters, - cache: Cache::new(self.max_cached_counters as u64), - batcher: Batcher::new(period), - } + fn batch_ready(&self, size: usize) -> bool { + self.updates.len() >= size + || self + .priority_flush + .compare_exchange(true, false, Ordering::Release, Ordering::Acquire) + .is_ok() } } +impl Default for Batcher { + fn default() -> Self { + Self::new(Duration::from_millis(100)) + } +} + +pub struct CountersCache { + max_ttl_cached_counters: Duration, + pub ttl_ratio_cached_counters: u64, + cache: Cache>, + batcher: Batcher, +} + impl CountersCache { pub fn get(&self, counter: &Counter) -> Option> { let option = self.cache.get(counter); @@ -382,6 +344,46 @@ impl CountersCache { } } +pub struct CountersCacheBuilder { + max_cached_counters: usize, + max_ttl_cached_counters: Duration, + ttl_ratio_cached_counters: u64, +} + +impl CountersCacheBuilder { + pub fn new() -> Self { + Self { + max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, + max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), + ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, + } + } + + pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self { + self.max_cached_counters = max_cached_counters; + self + } + + pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self { + self.max_ttl_cached_counters = max_ttl_cached_counter; + self + } + + pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self { + self.ttl_ratio_cached_counters = ttl_ratio_cached_counter; + self + } + + pub fn build(&self, period: Duration) -> CountersCache { + CountersCache { + max_ttl_cached_counters: self.max_ttl_cached_counters, + ttl_ratio_cached_counters: self.ttl_ratio_cached_counters, + cache: Cache::new(self.max_cached_counters as u64), + batcher: Batcher::new(period), + } + } +} + #[cfg(test)] mod tests { use super::*; From 65d34f15a35240c9ffdf2a0d0609b68b74dde45e Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 30 Apr 2024 08:38:47 -0400 Subject: [PATCH 3/5] Sleep rather than tick --- limitador/src/storage/redis/counters_cache.rs | 4 +--- limitador/tests/integration_tests.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 49f7f6c6..3754a46e 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::select; use tokio::sync::Notify; -use tokio::time::interval; pub struct CachedCounterValue { value: AtomicExpiringValue, @@ -169,7 +168,6 @@ impl Batcher { F: FnOnce(HashMap>) -> Fut, Fut: Future, { - let mut interval = interval(self.interval); let mut ready = self.batch_ready(max); loop { if ready { @@ -198,7 +196,7 @@ impl Batcher { } else { ready = select! { _ = self.notifier.notified() => self.batch_ready(max), - _ = interval.tick() => true, + _ = tokio::time::sleep(self.interval) => true, } } } diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 844a90ed..96392ba7 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -537,7 +537,7 @@ mod test { } // We wait for the flushing period to pass so the counters are flushed in the cached storage - tokio::time::sleep(Duration::from_millis(3)).await; + tokio::time::sleep(Duration::from_millis(4)).await; assert!(rate_limiter .is_rate_limited(namespace, &get_values, 1) From f7b4b9915e79f34d1d7fad55d33756bcd57cd648 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 30 Apr 2024 08:39:00 -0400 Subject: [PATCH 4/5] Default batch size of 100 --- limitador/src/storage/redis/redis_cached.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 20dedcc0..f1d3d29e 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -339,7 +339,7 @@ async fn flush_batcher_and_update_counters( } else { let updated_counters = cached_counters .batcher() - .consume(1, |counters| update_counters(&mut redis_conn, counters)) + .consume(100, |counters| update_counters(&mut redis_conn, counters)) .await .or_else(|err| { if err.is_transient() { From c5212cf080c827c4347046405b397f452dadfe27 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 30 Apr 2024 08:51:55 -0400 Subject: [PATCH 5/5] Avoid invoking the script with an empty batch --- limitador/src/storage/redis/redis_cached.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index f1d3d29e..0f5c36ff 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -293,7 +293,11 @@ async fn update_counters( let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); - let mut res: Vec<(Counter, i64, i64)> = Vec::new(); + let mut res: Vec<(Counter, i64, i64)> = Vec::with_capacity(counters_and_deltas.len()); + if counters_and_deltas.is_empty() { + return Ok(res); + } + for (counter, delta) in counters_and_deltas { let delta = delta.pending_writes().expect("State machine is wrong!"); if delta > 0 {