diff --git a/rust/Cargo.lock b/rust/Cargo.lock index df595c78383ce..4e86fa5ab92b4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "getrandom", @@ -2118,15 +2118,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "lru" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" -dependencies = [ - "hashbrown 0.14.3", -] - [[package]] name = "mach" version = "0.3.2" @@ -2835,8 +2826,8 @@ dependencies = [ "envconfig", "futures", "health", - "lru", "metrics", + "quick_cache", "rdkafka", "serde", "serde_json", @@ -2903,6 +2894,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick_cache" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a893a83255c587d31137bc7e350387b49267b0deac44120fd8fa8bd0d61645" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.3", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.35" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 64f7f1e196a50..4a78b1a8dfa02 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -88,4 +88,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } neon = "1" -lru = "0.12.4" \ No newline at end of file +quick_cache = "0.6.5" \ No newline at end of file diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 87aa3afa966b3..8a210dcd163e6 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -20,7 +20,7 @@ axum = { workspace = true } serve-metrics = { path = "../common/serve_metrics" } metrics = { workspace = true } chrono = { workspace = true } -lru = { workspace = true } +quick_cache = { workspace = true } common-metrics = { path = "../common/metrics" } [lints] diff --git a/rust/property-defs-rs/src/bin/generate_test_data.rs b/rust/property-defs-rs/src/bin/generate_test_data.rs new file mode 100644 index 0000000000000..24c0492de223e --- /dev/null +++ b/rust/property-defs-rs/src/bin/generate_test_data.rs @@ -0,0 +1,62 @@ +use std::collections::HashMap; + +use envconfig::Envconfig; +use property_defs_rs::{config::Config, types::Event}; +use rdkafka::{ + producer::{FutureProducer, FutureRecord}, + ClientConfig, +}; + +fn generate_test_event(seed: usize) -> Event { + let team_id = (seed % 100) as i32; + let event_name = format!("test_event_{}", seed % 8); + let properties: HashMap = (0..200) + .map(|i| (format!("prop_{}", i), format!("val_{}", i))) + .collect(); + + Event { + team_id, + event: event_name, + properties: Some(serde_json::to_string(&properties).unwrap()), + } +} + +// A simple kafka producer that pushes a million events into a topic +#[tokio::main] +async fn main() -> Result<(), Box> { + let config = Config::init_from_env()?; + let kafka_config: ClientConfig = (&config.kafka).into(); + let producer: FutureProducer = kafka_config.create()?; + let topic = config.kafka.event_topic.as_str(); + + let mut acks = Vec::with_capacity(1_000_000); + for i in 0..10_000_000 { + let event = generate_test_event(i); + let key = event.team_id.to_string(); + let payload = serde_json::to_string(&event)?; + let record = FutureRecord { + topic, + key: Some(&key), + payload: Some(&payload), + partition: None, + timestamp: None, + headers: None, + }; + let ack = producer.send_result(record).unwrap(); + acks.push(ack); + + if i % 1000 == 0 { + println!("Sent {} events", i); + } + } + + let mut i = 0; + for ack in acks { + ack.await?.unwrap(); + i += 1; + if i % 1000 == 0 { + println!("Received ack for {} events", i); + } + } + Ok(()) +} diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index 83054ad00e043..44b625c843d54 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -37,6 +37,13 @@ pub struct Config { #[envconfig(default = "100000")] pub cache_capacity: usize, + #[envconfig(default = "100")] + pub channel_slots_per_worker: usize, + + // If an event has some ridiculous number of updates, we skip it + #[envconfig(default = "10000")] + pub update_count_skip_threshold: usize, + #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, @@ -54,7 +61,7 @@ pub struct KafkaConfig { pub kafka_tls: bool, #[envconfig(default = "false")] pub verify_ssl_certificate: bool, - #[envconfig(default = "autocomplete-rs")] + #[envconfig(default = "property-definitions-rs")] pub consumer_group: String, } diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index ae767ffe51b07..368bb09f69d56 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -1,19 +1,20 @@ -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use axum::{routing::get, Router}; use envconfig::Envconfig; use futures::future::ready; -use lru::LruCache; use property_defs_rs::{ app_context::AppContext, config::Config, metrics_consts::{ - BATCH_ACQUIRE_TIME, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, - PERMIT_WAIT_TIME, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, - UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED, + BATCH_ACQUIRE_TIME, CACHE_CONSUMED, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, + FORCED_SMALL_BATCH, PERMIT_WAIT_TIME, RECV_DEQUEUED, TRANSACTION_LIMIT_SATURATION, + UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, + WORKER_BLOCKED, }, types::{Event, Update}, }; +use quick_cache::sync::Cache; use rdkafka::{ consumer::{Consumer, StreamConsumer}, message::BorrowedMessage, @@ -61,7 +62,12 @@ fn start_health_liveness_server(config: &Config, context: Arc) -> Jo }) } -async fn spawn_producer_loop(consumer: Arc, channel: mpsc::Sender) { +async fn spawn_producer_loop( + consumer: Arc, + channel: mpsc::Sender, + cache: Arc>, + skip_threshold: usize, +) { loop { let message = consumer .recv() @@ -72,13 +78,18 @@ async fn spawn_producer_loop(consumer: Arc, channel: mpsc::Sende continue; }; - let updates = event.into_updates(); + let updates = event.into_updates(skip_threshold); metrics::counter!(EVENTS_RECEIVED).increment(1); metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64); metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64); for update in updates { + if cache.get(&update).is_some() { + metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(1); + continue; + } + cache.insert(update.clone(), ()); // We first try to non-blocking send, so we can get a metric on backpressure match channel.try_send(update) { Ok(_) => continue, @@ -117,12 +128,17 @@ async fn main() -> Result<(), Box> { start_health_liveness_server(&config, context.clone()); - let (tx, mut rx) = mpsc::channel(config.update_batch_size * 10); + let (tx, mut rx) = mpsc::channel(config.update_batch_size * config.channel_slots_per_worker); let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions)); - let mut cache = LruCache::new(NonZeroUsize::new(config.cache_capacity).unwrap()); + let cache = Arc::new(Cache::new(config.cache_capacity)); for _ in 0..config.worker_loop_count { - tokio::spawn(spawn_producer_loop(consumer.clone(), tx.clone())); + tokio::spawn(spawn_producer_loop( + consumer.clone(), + tx.clone(), + cache.clone(), + config.update_count_skip_threshold, + )); } loop { @@ -133,7 +149,6 @@ async fn main() -> Result<(), Box> { while batch.len() < config.update_batch_size { context.worker_liveness.report_healthy().await; - let before_recv = batch.len(); let remaining_capacity = config.update_batch_size - batch.len(); // We race these two, so we can escape this loop and do a small batch if we've been waiting too long let recv = rx.recv_many(&mut batch, remaining_capacity); @@ -145,18 +160,7 @@ async fn main() -> Result<(), Box> { warn!("Coordinator recv failed, dying"); return Ok(()); } - assert!(batch.len() == before_recv + got); - - // It's important that we only filter /newly seen/ elements, because - // we immediately insert them into the cache, so a full-pass filter - // on cache membership would empty the batch. - retain_from(&mut batch, before_recv, |u| !cache.contains(u)); - batch[before_recv..].iter().for_each(|u| { - cache.put(u.clone(), ()); - }); - - let filtered = (before_recv + got) - batch.len(); - metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(filtered as u64); + metrics::gauge!(RECV_DEQUEUED).set(got as f64); continue; } _ = sleep => { @@ -170,6 +174,8 @@ async fn main() -> Result<(), Box> { } batch_time.fin(); + metrics::gauge!(CACHE_CONSUMED).set(cache.len() as f64); + metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set( (config.max_concurrent_transactions - transaction_limit.available_permits()) as f64, ); diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs index 4571d0be11814..52de91fb9fdeb 100644 --- a/rust/property-defs-rs/src/metrics_consts.rs +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -12,3 +12,5 @@ pub const EVENT_PARSE_ERROR: &str = "prop_defs_event_parse_error"; pub const BATCH_ACQUIRE_TIME: &str = "prop_defs_batch_acquire_time_ms"; pub const PERMIT_WAIT_TIME: &str = "prop_defs_permit_wait_time_ms"; pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms"; +pub const CACHE_CONSUMED: &str = "prop_defs_cache_space"; +pub const RECV_DEQUEUED: &str = "prop_defs_recv_dequeued"; diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index 6312fc564d849..bc0c694fdf8cd 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -105,7 +105,7 @@ pub enum Update { EventProperty(EventProperty), } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Event { pub team_id: i32, pub event: String, @@ -126,12 +126,12 @@ impl From<&Event> for EventDefinition { } impl Event { - pub fn into_updates(self) -> Vec { + pub fn into_updates(self, skip_threshold: usize) -> Vec { let team_id = self.team_id; let event = self.event.clone(); let updates = self.into_updates_inner(); - if updates.len() > 10_000 { + if updates.len() > skip_threshold { warn!( "Event {} for team {} has more than 10,000 properties, skipping", event, team_id