Skip to content

Commit

Permalink
Refactor the way we deal with partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 22, 2024
1 parent 6d40864 commit 9d5eed7
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use async_trait::async_trait;
use redis::aio::{ConnectionLike, ConnectionManager};
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -252,7 +251,7 @@ impl CachedRedisStorage {
flush_batcher_and_update_counters(
conn.clone(),
batcher_flusher.clone(),
storage.is_alive(),
storage.is_alive().await,
counters_cache_clone.clone(),
p.clone(),
)
Expand All @@ -276,12 +275,7 @@ impl CachedRedisStorage {
}

fn partitioned(&self, partition: bool) -> bool {
if partition {
error!("Partition to Redis detected!")
}
self.partitioned
.compare_exchange(!partition, partition, Ordering::Release, Ordering::Acquire)
.is_ok()
flip_partitioned(&self.partitioned, partition)
}

fn fallback_vals_ttls(&self, counters: &Vec<&mut Counter>) -> (Vec<Option<i64>>, Vec<i64>) {
Expand Down Expand Up @@ -328,6 +322,20 @@ impl CachedRedisStorage {
}
}

fn flip_partitioned(storage: &AtomicBool, partition: bool) -> bool {
let we_flipped = storage
.compare_exchange(!partition, partition, Ordering::Release, Ordering::Acquire)
.is_ok();
if we_flipped {
if partition {
error!("Partition to Redis detected!")
} else {
warn!("Partition to Redis resolved!");
}
}
we_flipped
}

pub struct CachedRedisStorageBuilder {
redis_url: String,
flushing_period: Duration,
Expand Down Expand Up @@ -431,14 +439,14 @@ async fn update_counters<C: ConnectionLike>(
async fn flush_batcher_and_update_counters<C: ConnectionLike>(
mut redis_conn: C,
batcher: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>>,
storage_is_alive: impl Future<Output = bool>,
storage_is_alive: bool,
cached_counters: Arc<CountersCache>,
partitioned: Arc<AtomicBool>,
) {
if partitioned.load(Ordering::Acquire) {
if storage_is_alive.await {
warn!("Partition to Redis resolved!");
partitioned.store(false, Ordering::Release);
if partitioned.load(Ordering::Acquire) || !storage_is_alive {
let batch = batcher.lock().unwrap();
if !batch.is_empty() {
flip_partitioned(&partitioned, false);
}
} else {
let counters = {
Expand All @@ -452,7 +460,7 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
.await
.or_else(|err| {
if err.is_transient() {
partitioned.store(true, Ordering::Release);
flip_partitioned(&partitioned, true);
Ok(Vec::new())
} else {
Err(err)
Expand Down Expand Up @@ -596,18 +604,14 @@ mod tests {
let cached_counters: Arc<CountersCache> = Arc::new(cache);
let partitioned = Arc::new(AtomicBool::new(false));

async fn future_true() -> bool {
true
}

if let Some(c) = cached_counters.get(&counter) {
assert_eq!(c.hits(&counter), 1);
}

flush_batcher_and_update_counters(
mock_client,
batcher,
future_true(),
true,
cached_counters.clone(),
partitioned,
)
Expand Down

0 comments on commit 9d5eed7

Please sign in to comment.