Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resilient cache #279

Merged
merged 10 commits into from
Apr 3, 2024
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion limitador-server/examples/limits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
conditions:
- "req.method == 'POST'"
variables:
- user_id
- user_id
1 change: 1 addition & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub struct RedisStorageCacheConfiguration {
pub max_ttl: u64,
pub ttl_ratio: u64,
pub max_counters: usize,
pub response_timeout: u64,
}

#[derive(PartialEq, Eq, Debug)]
Expand Down
45 changes: 22 additions & 23 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use limitador::storage::disk::DiskStorage;
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
Expand Down Expand Up @@ -137,29 +137,17 @@ impl Limiter {
) -> CachedRedisStorage {
// TODO: Not all the options are configurable via ENV. Add them as needed.

let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url);
let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url)
.flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64))
.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl))
.ttl_ratio_cached_counters(cache_cfg.ttl_ratio)
.max_cached_counters(cache_cfg.max_counters)
.response_timeout(Duration::from_millis(cache_cfg.response_timeout));

if cache_cfg.flushing_period < 0 {
cached_redis_storage = cached_redis_storage.flushing_period(None)
} else {
cached_redis_storage = cached_redis_storage.flushing_period(Some(
Duration::from_millis(cache_cfg.flushing_period as u64),
))
}

cached_redis_storage =
cached_redis_storage.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl));

cached_redis_storage = cached_redis_storage.ttl_ratio_cached_counters(cache_cfg.ttl_ratio);
cached_redis_storage = cached_redis_storage.max_cached_counters(cache_cfg.max_counters);

match cached_redis_storage.build().await {
Ok(storage) => storage,
Err(err) => {
eprintln!("Failed to connect to Redis at {redis_url}: {err}");
process::exit(1)
}
}
cached_redis_storage.build().await.unwrap_or_else(|err| {
eprintln!("Failed to connect to Redis at {redis_url}: {err}");
process::exit(1)
})
}

#[cfg(feature = "infinispan")]
Expand Down Expand Up @@ -653,6 +641,15 @@ fn create_config() -> (Configuration, &'static str) {
.default_value(leak(DEFAULT_MAX_CACHED_COUNTERS))
.display_order(5)
.help("Maximum amount of counters cached"),
)
.arg(
Arg::new("timeout")
.long("response-timeout")
.action(ArgAction::Set)
.value_parser(clap::value_parser!(u64))
.default_value(leak(DEFAULT_RESPONSE_TIMEOUT_MS))
.display_order(6)
.help("Timeout for Redis commands in milliseconds"),
),
);

Expand Down Expand Up @@ -760,6 +757,7 @@ fn create_config() -> (Configuration, &'static str) {
max_ttl: *sub.get_one("TTL").unwrap(),
ttl_ratio: *sub.get_one("ratio").unwrap(),
max_counters: *sub.get_one("max").unwrap(),
response_timeout: *sub.get_one("timeout").unwrap(),
}),
}),
#[cfg(feature = "infinispan")]
Expand Down Expand Up @@ -851,6 +849,7 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
.parse()
.expect("Expected an u64"),
max_counters: DEFAULT_MAX_CACHED_COUNTERS,
response_timeout: DEFAULT_RESPONSE_TIMEOUT_MS,
})
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ tracing = "0.1.40"

# Optional dependencies
rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.23.1", optional = true, features = [
redis = { version = "0.25.1", optional = true, features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl From<TryFromSliceError> for StorageErr {
fn from(_: TryFromSliceError) -> Self {
Self {
msg: "Corrupted byte sequence while reading 8 bytes for 64-bit integer".to_owned(),
transient: false,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions limitador/src/storage/disk/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::storage::StorageErr;
use rocksdb::ErrorKind;

mod expiring_value;
mod rocksdb_storage;
Expand All @@ -9,6 +10,7 @@ impl From<rocksdb::Error> for StorageErr {
fn from(error: rocksdb::Error) -> Self {
Self {
msg: format!("Underlying storage error: {error}"),
transient: error.kind() == ErrorKind::TimedOut || error.kind() == ErrorKind::TryAgain,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/infinispan/dist_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub async fn lock(
if retries >= RETRIES {
return Err(StorageErr {
msg: "can't acquire lock".into(),
transient: false,
});
}

Expand Down
10 changes: 8 additions & 2 deletions limitador/src/storage/infinispan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ pub use infinispan_storage::InfinispanStorageBuilder;

impl From<reqwest::Error> for StorageErr {
fn from(e: reqwest::Error) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: false,
}
}
}

impl From<InfinispanError> for StorageErr {
fn from(e: InfinispanError) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: false,
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,15 @@ pub trait AsyncCounterStorage: Sync + Send {
#[error("error while accessing the limits storage: {msg}")]
pub struct StorageErr {
msg: String,
transient: bool,
}

impl StorageErr {
pub fn msg(&self) -> &str {
&self.msg
}

pub fn is_transient(&self) -> bool {
self.transient
}
}
4 changes: 2 additions & 2 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ttl_cache::TtlCache;

pub struct CountersCache {
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
pub ttl_ratio_cached_counters: u64,
cache: TtlCache<Counter, i64>,
}

Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
}

#[test]
fn insert_saves_0_when_redis_val_is_none() {
fn insert_saves_zero_when_redis_val_is_none() {
let max_val = 10;
let mut values = HashMap::new();
values.insert("app_id".to_string(), "1".to_string());
Expand Down
12 changes: 5 additions & 7 deletions limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1;
pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000;
pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5;
pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10;
pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350;

use crate::counter::Counter;
use crate::storage::{Authorization, StorageErr};
Expand All @@ -21,13 +22,10 @@ pub use redis_sync::RedisStorage;

impl From<RedisError> for StorageErr {
fn from(e: RedisError) -> Self {
Self { msg: e.to_string() }
}
}

impl From<::r2d2::Error> for StorageErr {
fn from(e: ::r2d2::Error) -> Self {
Self { msg: e.to_string() }
Self {
msg: e.to_string(),
transient: e.is_timeout() || e.is_connection_dropped() || e.is_cluster_error(),
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ impl AsyncRedisStorage {
Self { conn_manager }
}

pub async fn is_alive(&self) -> bool {
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();

Expand Down
Loading
Loading