Skip to content

Commit

Permalink
Always use the batch writes to redis
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Mar 28, 2024
1 parent 01977a4 commit 1d92ae1
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 71 deletions.
13 changes: 2 additions & 11 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,8 @@ impl Limiter {
) -> CachedRedisStorage {
// TODO: Not all the options are configurable via ENV. Add them as needed.

let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url);

if cache_cfg.flushing_period < 0 {
cached_redis_storage = cached_redis_storage.flushing_period(None)
} else {
cached_redis_storage = cached_redis_storage.flushing_period(Some(
Duration::from_millis(cache_cfg.flushing_period as u64),
))
}

cached_redis_storage = cached_redis_storage
let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url)
.flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64))
.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl))
.ttl_ratio_cached_counters(cache_cfg.ttl_ratio)
.max_cached_counters(cache_cfg.max_counters)
Expand Down
99 changes: 39 additions & 60 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub struct CachedRedisStorage {
batcher_counter_updates: Arc<Mutex<HashMap<Counter, i64>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
batching_is_enabled: bool,
partitioned: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -139,9 +138,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
counter_ttls_msecs[i],
ttl_margin,
);
let remaining = counter.max_value()
- counter_vals[i].unwrap_or(0)
- delta;
let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta;
if first_limited.is_none() && remaining < 0 {
first_limited = Some(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
Expand All @@ -168,24 +165,9 @@ impl AsyncCounterStorage for CachedRedisStorage {
}

// Batch or update depending on configuration
if self.is_partitioned() || self.batching_is_enabled {
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
Self::batch_counter(delta, &mut batcher, counter);
}
} else {
for counter in counters.iter() {
self.update_counter(counter, delta).await.or_else(|err| {
if err.is_transient() {
self.partitioned(true);
let mut batcher = self.batcher_counter_updates.lock().unwrap();
Self::batch_counter(delta, &mut batcher, counter);
Ok(())
} else {
Err(err)
}
})?
}
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
Self::batch_counter(delta, &mut batcher, counter);
}

Ok(Authorization::Ok)
Expand All @@ -211,7 +193,7 @@ impl CachedRedisStorage {
pub async fn new(redis_url: &str) -> Result<Self, RedisError> {
Self::new_with_options(
redis_url,
Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)),
Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC),
DEFAULT_MAX_CACHED_COUNTERS,
Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
Expand All @@ -222,7 +204,7 @@ impl CachedRedisStorage {

async fn new_with_options(
redis_url: &str,
flushing_period: Option<Duration>,
flushing_period: Duration,
max_cached_counters: usize,
ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
Expand All @@ -247,40 +229,38 @@ impl CachedRedisStorage {
let storage = async_redis_storage.clone();
let batcher: Arc<Mutex<HashMap<Counter, i64>>> = Arc::new(Mutex::new(Default::default()));
let p = Arc::clone(&partitioned);
if let Some(flushing_period) = flushing_period {
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
if p.load(Ordering::Acquire) {
if storage.is_alive().await {
warn!("Partition to Redis resolved!");
p.store(false, Ordering::Release);
}
} else {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage
.update_counter(&counter, delta)
.await
.or_else(|err| {
if err.is_transient() {
p.store(true, Ordering::Release);
Ok(())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");
}
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
if p.load(Ordering::Acquire) {
if storage.is_alive().await {
warn!("Partition to Redis resolved!");
p.store(false, Ordering::Release);
}
} else {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage
.update_counter(&counter, delta)
.await
.or_else(|err| {
if err.is_transient() {
p.store(true, Ordering::Release);
Ok(())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");
}
interval.tick().await;
}
});
}
interval.tick().await;
}
});

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
Expand All @@ -293,7 +273,6 @@ impl CachedRedisStorage {
batcher_counter_updates: batcher,
redis_conn_manager,
async_redis_storage,
batching_is_enabled: flushing_period.is_some(),
partitioned,
})
}
Expand Down Expand Up @@ -379,7 +358,7 @@ impl CachedRedisStorage {

pub struct CachedRedisStorageBuilder {
redis_url: String,
flushing_period: Option<Duration>,
flushing_period: Duration,
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
Expand All @@ -390,15 +369,15 @@ impl CachedRedisStorageBuilder {
pub fn new(redis_url: &str) -> Self {
Self {
redis_url: redis_url.to_string(),
flushing_period: Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)),
flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC),
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
response_timeout: Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS),
}
}

pub fn flushing_period(mut self, flushing_period: Option<Duration>) -> Self {
pub fn flushing_period(mut self, flushing_period: Duration) -> Self {
self.flushing_period = flushing_period;
self
}
Expand Down
10 changes: 10 additions & 0 deletions limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,17 @@ mod test {

for counter in result.counters.iter() {
if let Some(ttl) = counter.expires_in() {
<<<<<<< HEAD

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Clippy

encountered diff marker

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

encountered diff marker

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Test Suite

encountered diff marker

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Clippy

encountered diff marker

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

encountered diff marker

Check failure on line 686 in limitador/tests/integration_tests.rs

View workflow job for this annotation

GitHub Actions / Test Suite

encountered diff marker
assert!(ttl.as_secs() <= 60);
=======
let ttl_secs = ttl.as_secs();
assert!(
ttl_secs <= 60,
"Unexpected ttl of {} seconds after {} hits",
ttl_secs,
hit
);
>>>>>>> 59d1296 (Always use the batch writes to redis)
}
assert_eq!(counter.remaining().unwrap(), 3 - (hit + 1));
}
Expand Down

0 comments on commit 1d92ae1

Please sign in to comment.