Skip to content

Commit

Permalink
feat: CachedRedisStorage will now default to flushing a batch ASAP to…
Browse files Browse the repository at this point in the history
… reduce the length of time the cache is out of sync with Redis counters.

Setting a flushing_period is still supported to have a delay before flushing which can be useful to reduce the load on the Redis server in exchange for having staler counters in limitador. 

Signed-off-by: Hiram Chirino <[email protected]>
  • Loading branch information
chirino committed Apr 24, 2024
1 parent b7c748a commit 9a580b1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod redis_cached;
mod redis_sync;
mod scripts;

pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1;
pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 0;
pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000;
pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5;
pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10;
Expand Down
25 changes: 22 additions & 3 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct CachedRedisStorage {
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
partitioned: Arc<AtomicBool>,
flush_tx: tokio::sync::watch::Sender<()>,
}

#[async_trait]
Expand Down Expand Up @@ -154,10 +155,10 @@ impl AsyncCounterStorage for CachedRedisStorage {
self.cached_counters.increase_by(counter, delta);
}

// Batch or update depending on configuration
let mut batcher = self.batcher_counter_updates.lock().unwrap();
let now = SystemTime::now();
for counter in counters.iter() {
// Update an existing batch entry or add a new batch entry
match batcher.get_mut(counter) {
Some(val) => {
val.update(delta, counter.seconds(), now);
Expand All @@ -174,6 +175,9 @@ impl AsyncCounterStorage for CachedRedisStorage {
}
}

// ask the flusher to flush the batch
self.flush_tx.send(()).unwrap();

Ok(Authorization::Ok)
}

Expand Down Expand Up @@ -240,15 +244,30 @@ impl CachedRedisStorage {
let batcher: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>> =
Arc::new(Mutex::new(Default::default()));

let (flush_tx, mut flush_rx) = tokio::sync::watch::channel(());

{
let storage = async_redis_storage.clone();
let counters_cache_clone = counters_cache.clone();
let conn = redis_conn_manager.clone();
let p = Arc::clone(&partitioned);
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);

tokio::spawn(async move {
loop {
// Wait for a new flush request,
flush_rx.changed().await.unwrap();

if flushing_period != Duration::ZERO {
// Set the flushing_period to reduce the load on Redis the downside to
// setting it the cached counters will not be as accurate as when it's zero.
tokio::time::sleep(flushing_period).await
}

// even if flushing_period is zero, the next batch/ will be growing while
// current batch is being executed against Redis. When under load, the flush
// frequency will proportional to batch execution latency.

flush_batcher_and_update_counters(
conn.clone(),
batcher_flusher.clone(),
Expand All @@ -257,7 +276,6 @@ impl CachedRedisStorage {
p.clone(),
)
.await;
interval.tick().await;
}
});
}
Expand All @@ -268,6 +286,7 @@ impl CachedRedisStorage {
redis_conn_manager,
async_redis_storage,
partitioned,
flush_tx,
})
}

Expand Down

0 comments on commit 9a580b1

Please sign in to comment.