Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store qualified counters in dedicated capacity limited cache #195

Merged
merged 4 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 229 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ notify = "6.0.1"
const_format = "0.2.31"
lazy_static = "1.4.0"
clap = "4.3"
sysinfo = "0.29.7"

[build-dependencies]
tonic-build = "0.9.2"
11 changes: 9 additions & 2 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ impl Default for Configuration {
fn default() -> Self {
Configuration {
limits_file: "".to_string(),
storage: StorageConfiguration::InMemory,
storage: StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: Some(10_000),
}),
rls_host: "".to_string(),
rls_port: 0,
http_host: "".to_string(),
Expand All @@ -125,13 +127,18 @@ impl Default for Configuration {

#[derive(PartialEq, Eq, Debug)]
pub enum StorageConfiguration {
InMemory,
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "infinispan")]
Infinispan(InfinispanStorageConfiguration),
}

#[derive(PartialEq, Eq, Debug)]
pub struct InMemoryStorageConfiguration {
pub cache_size: Option<u64>,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
Expand Down
8 changes: 4 additions & 4 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
vec!["app_id"],
);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing how this work, but I don't understand why is this change needed? Shouldn't invoke the default set the expected cache_size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no more any implementation of Default for RateLimiter. I couldn't really make a case for it. Why would 10k be a valid value for something using the crate? I thought that I could do the magic of using the 70% within default() could be an option, but again... why would 70% be any more desirable than 10% in the context of the crate? So I decided to push this "up" in the server's code (and avoid adding a dependency to the crate too).

Finally, for some reason, adding a impl Default for RateLimiter that'd only be for config test somehow failed. So didn't investigate much and added the value in the tests themselves.

limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down Expand Up @@ -366,7 +366,7 @@ mod tests {

#[tokio::test]
async fn test_takes_into_account_all_the_descriptors() {
let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);

let namespace = "test_namespace";

Expand Down Expand Up @@ -434,7 +434,7 @@ mod tests {
let namespace = "test_namespace";
let limit = Limit::new(namespace, 10, 60, vec!["x == '1'"], vec!["y"]);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down Expand Up @@ -499,7 +499,7 @@ mod tests {
let namespace = "test_namespace";
let limit = Limit::new(namespace, 1, 60, vec!["x == '1'"], vec!["y"]);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down
52 changes: 42 additions & 10 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ extern crate clap;
#[cfg(feature = "infinispan")]
use crate::config::InfinispanStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, RedisStorageCacheConfiguration,
RedisStorageConfiguration, StorageConfiguration,
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use env_logger::Builder;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
use limitador::storage::disk::DiskStorage;
Expand All @@ -38,6 +39,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, process};
use sysinfo::{RefreshKind, System, SystemExt};
use thiserror::Error;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -82,7 +84,9 @@ impl Limiter {
StorageConfiguration::Infinispan(cfg) => {
Self::infinispan_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::InMemory => Self::in_memory_limiter(config),
StorageConfiguration::InMemory(cfg) => {
Self::in_memory_limiter(cfg, config.limit_name_in_labels)
}
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels),
};

Expand Down Expand Up @@ -210,7 +214,7 @@ impl Limiter {
}
};
let mut rate_limiter_builder =
RateLimiterBuilder::new().storage(Storage::with_counter_storage(Box::new(storage)));
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
Expand All @@ -219,10 +223,11 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

fn in_memory_limiter(cfg: Configuration) -> Self {
let mut rate_limiter_builder = RateLimiterBuilder::new();
fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self {
let mut rate_limiter_builder =
RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap());

if cfg.limit_name_in_labels {
if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Expand Down Expand Up @@ -513,7 +518,16 @@ fn create_config() -> (Configuration, &'static str) {
.subcommand(
Command::new("memory")
.display_order(1)
.about("Counters are held in Limitador (ephemeral)"),
.about("Counters are held in Limitador (ephemeral)")
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(1)
.help("Sets the size of the cache for 'qualified counters'"),
),
)
.subcommand(
Command::new("disk")
Expand Down Expand Up @@ -698,7 +712,9 @@ fn create_config() -> (Configuration, &'static str) {
consistency: Some(sub.get_one::<String>("consistency").unwrap().to_string()),
})
}
Some(("memory", _sub)) => StorageConfiguration::InMemory,
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
None => match storage_config_from_env() {
Ok(storage_cfg) => storage_cfg,
Err(_) => {
Expand Down Expand Up @@ -785,10 +801,26 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
consistency: env::var("INFINISPAN_COUNTERS_CONSISTENCY").ok(),
},
)),
_ => Ok(StorageConfiguration::InMemory),
_ => Ok(StorageConfiguration::InMemory(
InMemoryStorageConfiguration { cache_size: None },
)),
}
}

fn guess_cache_size() -> Option<u64> {
let sys = System::new_with_specifics(RefreshKind::new().with_memory());
let free_mem = sys.available_memory();
let memory = free_mem as f64 * 0.7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the 70% too much (or too little xD)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... this math is "wrong", the size of the entries don't account for the overhead of the memory required by the cache itself... but most importantly don't account for the size of the variables data that's on the heap... i.e. all the entries of name/value pairs... we need some overhead for that.
Finally the "rest" also needs memory, i.e. the actix & tonic & all other plumbing (if only storing the Limits themselves... So for tiny amounts of sys.available_memory() 70% might actually be way too eager, while for large ones, probably on the overly cautious end... Which is why this will trigger a WARN level log.

let size = (memory
/ (std::mem::size_of::<Counter>() + 16/* size_of::<AtomicExpiringValue>() */) as f64)
as u64;
warn!(
"No cache size provided, aiming at 70% of {}MB, i.e. {size} entries",
free_mem / 1024 / 1024
);
Some(size)
}

fn env_option_is_enabled(env_name: &str) -> bool {
match env::var(env_name) {
Ok(value) => value == "1",
Expand Down
2 changes: 2 additions & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ infinispan_storage = ["infinispan", "reqwest", "base64"]
lenient_conditions = []

[dependencies]
moka = "0.11.2"
getrandom = { version = "0.2", features = ["js"] }
ttl_cache = "0.5"
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
Expand Down
2 changes: 1 addition & 1 deletion limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn bench_in_mem(c: &mut Criterion) {
#[cfg(feature = "disk_storage")]
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
for (index, scenario) in TEST_SCENARIOS.iter().enumerate() {
for scenario in TEST_SCENARIOS.iter() {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
Expand Down
4 changes: 4 additions & 0 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl Counter {
self.expires_in = Some(duration)
}

pub fn is_qualified(&self) -> bool {
!self.set_variables.is_empty()
}

#[cfg(feature = "disk_storage")]
pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> {
let mut variables = Vec::with_capacity(self.set_variables.len());
Expand Down
37 changes: 17 additions & 20 deletions limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
//! Limitador. Storing the limits in Redis is slower, but they can be shared
//! between instances.
//!
//! By default, the rate limiter is configured to store the counters in memory:
//! By default, the rate limiter is configured to store the counters in memory.
//! It'll store only a limited amount of "qualified counters", specified as a
//! `u64` value in the constructor.
//! ```
//! use limitador::RateLimiter;
//! let rate_limiter = RateLimiter::default();
//! let rate_limiter = RateLimiter::new(1000);
//! ```
//!
//! To use Redis:
Expand Down Expand Up @@ -72,7 +74,7 @@
//! vec!["req.method == 'GET'"],
//! vec!["user_id"],
//! );
//! let mut rate_limiter = RateLimiter::default();
//! let mut rate_limiter = RateLimiter::new(1000);
//!
//! // Add a limit
//! rate_limiter.add_limit(limit.clone());
Expand All @@ -95,7 +97,7 @@
//! use limitador::limit::Limit;
//! use std::collections::HashMap;
//!
//! let mut rate_limiter = RateLimiter::default();
//! let mut rate_limiter = RateLimiter::new(1000);
//!
//! let limit = Limit::new(
//! "my_namespace",
Expand Down Expand Up @@ -236,9 +238,16 @@ impl From<CheckResult> for bool {
}

impl RateLimiterBuilder {
pub fn new() -> Self {
pub fn with_storage(storage: Storage) -> Self {
Self {
storage: Storage::new(),
storage,
prometheus_limit_name_labels_enabled: false,
}
}

pub fn new(cache_size: u64) -> Self {
Self {
storage: Storage::new(cache_size),
prometheus_limit_name_labels_enabled: false,
}
}
Expand Down Expand Up @@ -267,12 +276,6 @@ impl RateLimiterBuilder {
}
}

impl Default for RateLimiterBuilder {
fn default() -> Self {
Self::new()
}
}

pub struct AsyncRateLimiterBuilder {
storage: AsyncStorage,
prometheus_limit_name_labels_enabled: bool,
Expand Down Expand Up @@ -306,9 +309,9 @@ impl AsyncRateLimiterBuilder {
}

impl RateLimiter {
pub fn new() -> Self {
pub fn new(cache_size: u64) -> Self {
didierofrivia marked this conversation as resolved.
Show resolved Hide resolved
Self {
storage: Storage::new(),
storage: Storage::new(cache_size),
prometheus_metrics: PrometheusMetrics::new(),
}
}
Expand Down Expand Up @@ -492,12 +495,6 @@ impl RateLimiter {
}
}

impl Default for RateLimiter {
fn default() -> Self {
Self::new()
}
}

// TODO: the code of this implementation is almost identical to the blocking
// one. The only exception is that the functions defined are "async" and all the
// calls to the storage need to include ".await". We'll need to think about how
Expand Down
7 changes: 7 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,11 @@ mod tests {
});
assert!([2i64, 3i64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst)));
}

#[test]
fn size_of_struct() {
// This is ugly, but we don't have access to `AtomicExpiringValue` in the server,
didierofrivia marked this conversation as resolved.
Show resolved Hide resolved
// so this is hardcoded in main.rs
assert_eq!(16, std::mem::size_of::<AtomicExpiringValue>());
}
}
Loading
Loading