From f0f18b1e48572d2360f1e13598c0ab0c23958af7 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Thu, 29 Aug 2024 01:07:19 +0300 Subject: [PATCH] feat(propdefs): make fast good (#24660) --- rust/Cargo.lock | 2 +- rust/Cargo.toml | 3 +- rust/capture/tests/common.rs | 6 +- rust/property-defs-rs/Cargo.toml | 2 +- .../src/bin/benchmark_1million.rs | 147 ++++++++++++++++++ .../src/bin/generate_test_data.rs | 8 +- rust/property-defs-rs/src/config.rs | 20 ++- rust/property-defs-rs/src/lib.rs | 37 +++++ rust/property-defs-rs/src/main.rs | 85 ++++------ rust/property-defs-rs/src/types.rs | 15 +- 10 files changed, 243 insertions(+), 82 deletions(-) create mode 100644 rust/property-defs-rs/src/bin/benchmark_1million.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 4e86fa5ab92b4..470c2641e3b47 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2820,6 +2820,7 @@ dependencies = [ name = "property-defs-rs" version = "0.1.0" dependencies = [ + "ahash", "axum 0.7.5", "chrono", "common-metrics", @@ -2837,7 +2838,6 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", - "uuid", ] [[package]] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4a78b1a8dfa02..39fbcb8c48449 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -88,4 +88,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } neon = "1" -quick_cache = "0.6.5" \ No newline at end of file +quick_cache = "0.6.5" +ahash = "0.8.11" \ No newline at end of file diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index ee0b8659b42e5..c5671f12af939 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -153,7 +153,7 @@ impl EphemeralTopic { // TODO: check for name collision? let topic_name = random_string("events_", 16); let admin = AdminClient::from_config(&config).expect("failed to create admin client"); - admin + let created = admin .create_topics( &[NewTopic { name: &topic_name, @@ -166,6 +166,10 @@ impl EphemeralTopic { .await .expect("failed to create topic"); + for result in created { + result.expect("failed to create topic"); + } + let consumer: BaseConsumer = config.create().expect("failed to create consumer"); let mut assignment = TopicPartitionList::new(); assignment.add_partition(&topic_name, 0); diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 8a210dcd163e6..534977b5fedd8 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -uuid = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } rdkafka = { workspace = true } @@ -22,6 +21,7 @@ metrics = { workspace = true } chrono = { workspace = true } quick_cache = { workspace = true } common-metrics = { path = "../common/metrics" } +ahash = { workspace = true } [lints] workspace = true diff --git a/rust/property-defs-rs/src/bin/benchmark_1million.rs b/rust/property-defs-rs/src/bin/benchmark_1million.rs new file mode 100644 index 0000000000000..cbb5b1f74d75b --- /dev/null +++ b/rust/property-defs-rs/src/bin/benchmark_1million.rs @@ -0,0 +1,147 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use property_defs_rs::types::{Event, Update}; +use quick_cache::sync::Cache; +use tokio::sync::mpsc::{ + self, + error::{TryRecvError, TrySendError}, +}; + +// This is a bad hack to just copy the function like this, but I'll refactor later +async fn spawn_producer_loop( + mut consumer: mpsc::Receiver, + channel: mpsc::Sender, + shared_cache: Arc>, + skip_threshold: usize, + compaction_batch_size: usize, + total_updates_received: Arc, +) { + let mut batch = ahash::AHashSet::with_capacity(compaction_batch_size); + let mut last_send = tokio::time::Instant::now(); + loop { + let event = match consumer.try_recv() { + Ok(event) => event, + Err(TryRecvError::Empty) => { + println!("Empty"); + consumer.recv().await.unwrap() + } + Err(TryRecvError::Disconnected) => { + return; + } + }; + + let updates = event.into_updates(skip_threshold); + total_updates_received.fetch_add(updates.len(), std::sync::atomic::Ordering::Relaxed); + + for update in updates { + if batch.contains(&update) { + continue; + } + batch.insert(update); + + if batch.len() >= compaction_batch_size || last_send.elapsed() > Duration::from_secs(10) + { + last_send = tokio::time::Instant::now(); + for update in batch.drain() { + if shared_cache.get(&update).is_some() { + continue; + } + shared_cache.insert(update.clone(), ()); + match channel.try_send(update) { + Ok(_) => {} + Err(TrySendError::Full(update)) => { + println!("Worker blocked"); + channel.send(update).await.unwrap(); + } + Err(e) => { + panic!("Coordinator send failed: {:?}", e); + } + } + } + } + } + } +} + +const EVENT_COUNT: usize = 1_000_000; +const COMPACTION_BATCH_SIZE: usize = 10_000; +const SKIP_THRESHOLD: usize = 10_000; +const CACHE_SIZE: usize = 5_000_000; +const CHANNEL_SIZE: usize = 50_000; + +#[tokio::main] +async fn main() { + let (in_tx, in_rx) = mpsc::channel(CHANNEL_SIZE); + let (out_tx, mut out_rx) = mpsc::channel(CHANNEL_SIZE); + let cache = Arc::new(Cache::new(CACHE_SIZE)); + let total_updates_received = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + + let test_handle = tokio::spawn(spawn_producer_loop( + in_rx, + out_tx, + cache.clone(), + SKIP_THRESHOLD, + COMPACTION_BATCH_SIZE, + total_updates_received.clone(), + )); + + let test_events = (0..EVENT_COUNT) + .map(generate_test_event) + .collect::>(); + + let total_updates_issued: Arc = + Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let total_updates_issued_mv = total_updates_issued.clone(); + let return_handle = tokio::spawn(async move { + let mut batch = Vec::with_capacity(CHANNEL_SIZE); + while out_rx.recv_many(&mut batch, CHANNEL_SIZE).await > 0 { + total_updates_issued_mv.fetch_add(batch.len(), std::sync::atomic::Ordering::Relaxed); + batch.clear() + } + }); + + let sender_handle = tokio::spawn(async move { + for event in test_events { + in_tx.send(event).await.unwrap(); + } + }); + + // Give that a second to run + tokio::time::sleep(Duration::from_secs(1)).await; + + let start = tokio::time::Instant::now(); + test_handle.await.unwrap(); + let elapsed = start.elapsed(); + println!( + "Processed {} events in {}s, {} events/s, issued {} updates, {} total updates ({} ratio)", + EVENT_COUNT, + elapsed.as_secs_f64(), + EVENT_COUNT as f64 / elapsed.as_secs_f64(), + total_updates_issued.load(std::sync::atomic::Ordering::Relaxed), + total_updates_received.load(std::sync::atomic::Ordering::Relaxed), + total_updates_issued.load(std::sync::atomic::Ordering::Relaxed) as f64 + / total_updates_received.load(std::sync::atomic::Ordering::Relaxed) as f64 + ); + + sender_handle.await.unwrap(); + return_handle.await.unwrap(); +} + +// This generates "random" events, in a world where we have N teams, each sending 8 different events, each with 100 properties +// That means we have N * 8 * 100 = N*800 EventProperties, as well as N*8 event definitions and N*100 properties +// in the universe of possible updates to generate. Setting N to 1000 gives 800_000 possible EventProperties, +// 8000 event definitions and 100_000 properties. +fn generate_test_event(seed: usize) -> Event { + let team_id = (seed % 1000) as i32; + let event_name = format!("test_event_{}", seed % 8); // Imagine each team sends about 8 different events + let properties: HashMap = + (0..100) // The average event has 100 properties + .map(|i| (format!("key_{}", i), format!("val_{}", i))) + .collect(); + + Event { + team_id, + event: event_name, + properties: Some(serde_json::to_string(&properties).unwrap()), + } +} diff --git a/rust/property-defs-rs/src/bin/generate_test_data.rs b/rust/property-defs-rs/src/bin/generate_test_data.rs index 24c0492de223e..f8f226082f048 100644 --- a/rust/property-defs-rs/src/bin/generate_test_data.rs +++ b/rust/property-defs-rs/src/bin/generate_test_data.rs @@ -10,9 +10,11 @@ use rdkafka::{ fn generate_test_event(seed: usize) -> Event { let team_id = (seed % 100) as i32; let event_name = format!("test_event_{}", seed % 8); - let properties: HashMap = (0..200) - .map(|i| (format!("prop_{}", i), format!("val_{}", i))) - .collect(); + let prop_key = format!("prop_{}", seed % 1000); + let properties: HashMap = + (0..100) // The average event has 100 properties + .map(|i| (prop_key.clone(), format!("val_{}", i))) + .collect(); Event { team_id, diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index 6c93436067514..81864156a3c35 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -12,7 +12,6 @@ pub struct Config { #[envconfig(nested = true)] pub kafka: KafkaConfig, - // Update sets are batches into at least min_batch_size (unless we haven't sent a batch in more than a few seconds) #[envconfig(default = "10")] pub max_concurrent_transactions: usize, @@ -27,22 +26,27 @@ pub struct Config { pub max_issue_period: u64, // Propdefs spawns N workers to pull events from kafka, - // marshal, and convert ot updates. The number of + // marshal, and convert to updates. The number of // concurrent update batches sent to postgres is controlled // by max_concurrent_transactions - #[envconfig(default = "10")] + #[envconfig(default = "4")] pub worker_loop_count: usize, // We maintain an internal cache, to avoid sending the same UPSERT multiple times. This is it's size. - #[envconfig(default = "100000")] + #[envconfig(default = "1000000")] pub cache_capacity: usize, - // Each worker thread internally batches up to this number, then de-dupes - // across this batch before releasing to the main thread - #[envconfig(default = "50000")] + // Each worker maintains a small local batch of updates, which it + // flushes to the main thread (updating/filtering by the + // cross-thread cache while it does). This is that batch size. + #[envconfig(default = "10000")] pub compaction_batch_size: usize, - #[envconfig(default = "100")] + // Workers send updates back to the main thread over a channel, + // which has a depth of this many slots. If the main thread slows, + // which usually means if postgres is slow, the workers will block + // after filling this channel. + #[envconfig(default = "1000")] pub channel_slots_per_worker: usize, // If an event has some ridiculous number of updates, we skip it diff --git a/rust/property-defs-rs/src/lib.rs b/rust/property-defs-rs/src/lib.rs index 7c639d72efa90..b871a5a97738d 100644 --- a/rust/property-defs-rs/src/lib.rs +++ b/rust/property-defs-rs/src/lib.rs @@ -1,4 +1,41 @@ +use metrics_consts::{EMPTY_EVENTS, EVENT_PARSE_ERROR}; +use rdkafka::{message::BorrowedMessage, Message}; +use tracing::warn; +use types::Event; + pub mod app_context; pub mod config; pub mod metrics_consts; pub mod types; + +// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains +// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now. +pub fn message_to_event(msg: BorrowedMessage) -> Option { + let Some(payload) = msg.payload() else { + warn!("Received empty event"); + metrics::counter!(EMPTY_EVENTS).increment(1); + return None; + }; + + let event = serde_json::from_slice::(payload); + let event = match event { + Ok(e) => e, + Err(e) => { + metrics::counter!(EVENT_PARSE_ERROR).increment(1); + warn!("Failed to parse event: {:?}", e); + return None; + } + }; + Some(event) +} + +pub fn retain_from(buffer: &mut Vec, from: usize, predicate: impl Fn(&T) -> bool) { + let mut i = from; + while i < buffer.len() { + if !predicate(&buffer[i]) { + buffer.swap_remove(i); + } else { + i += 1; + } + } +} diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index fad0af68cfd6c..b3155a4207aed 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -1,24 +1,24 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; +use ahash::AHashSet; use axum::{routing::get, Router}; use envconfig::Envconfig; use futures::future::ready; use property_defs_rs::{ app_context::AppContext, config::Config, + message_to_event, metrics_consts::{ - BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EMPTY_EVENTS, EVENTS_RECEIVED, - EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, PERMIT_WAIT_TIME, RECV_DEQUEUED, - TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, - UPDATE_ISSUE_TIME, WORKER_BLOCKED, + BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EVENTS_RECEIVED, FORCED_SMALL_BATCH, + PERMIT_WAIT_TIME, RECV_DEQUEUED, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, + UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED, }, - types::{Event, Update}, + types::Update, }; use quick_cache::sync::Cache; use rdkafka::{ consumer::{Consumer, StreamConsumer}, - message::BorrowedMessage, - ClientConfig, Message, + ClientConfig, }; use serve_metrics::{serve, setup_metrics_routes}; use tokio::{ @@ -65,11 +65,12 @@ fn start_health_liveness_server(config: &Config, context: Arc) -> Jo async fn spawn_producer_loop( consumer: Arc, channel: mpsc::Sender, + shared_cache: Arc>, skip_threshold: usize, compaction_batch_size: usize, ) { - let mut batch = Vec::with_capacity(compaction_batch_size); - let mut sent = HashSet::with_capacity(compaction_batch_size); + let mut batch = AHashSet::with_capacity(compaction_batch_size); + let mut last_send = tokio::time::Instant::now(); loop { let message = consumer .recv() @@ -87,14 +88,21 @@ async fn spawn_producer_loop( metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64); for update in updates { - batch.push(update); - if batch.len() >= compaction_batch_size { - for update in batch.drain(..) { - if sent.contains(&update) { - metrics::counter!(COMPACTED_UPDATES).increment(1); + if batch.contains(&update) { + metrics::counter!(COMPACTED_UPDATES).increment(1); + continue; + } + batch.insert(update); + + if batch.len() >= compaction_batch_size || last_send.elapsed() > Duration::from_secs(10) + { + last_send = tokio::time::Instant::now(); + for update in batch.drain() { + if shared_cache.get(&update).is_some() { + metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(1); continue; } - sent.insert(update.clone()); + shared_cache.insert(update.clone(), ()); match channel.try_send(update) { Ok(_) => {} Err(TrySendError::Full(update)) => { @@ -108,7 +116,6 @@ async fn spawn_producer_loop( } } } - sent.clear(); } } } @@ -135,12 +142,13 @@ async fn main() -> Result<(), Box> { let (tx, mut rx) = mpsc::channel(config.update_batch_size * config.channel_slots_per_worker); let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions)); - let cache = Cache::new(config.cache_capacity); + let cache = Arc::new(Cache::new(config.cache_capacity)); for _ in 0..config.worker_loop_count { tokio::spawn(spawn_producer_loop( consumer.clone(), tx.clone(), + cache.clone(), config.update_count_skip_threshold, config.compaction_batch_size, )); @@ -165,15 +173,6 @@ async fn main() -> Result<(), Box> { warn!("Coordinator recv failed, dying"); return Ok(()); } - let before_recv = batch.len() - got; - let before_filter = batch.len(); - retain_from(&mut batch, before_recv, |u| cache.get(u).is_none()); - batch[..before_recv].iter().for_each(|u| { - cache.insert(u.clone(), ()); - }); - - let filtered = before_filter - batch.len(); - metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(filtered as u64); metrics::gauge!(RECV_DEQUEUED).set(got as f64); continue; } @@ -210,35 +209,3 @@ async fn main() -> Result<(), Box> { }); } } - -// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains -// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now. -fn message_to_event(msg: BorrowedMessage) -> Option { - let Some(payload) = msg.payload() else { - warn!("Received empty event"); - metrics::counter!(EMPTY_EVENTS).increment(1); - return None; - }; - - let event = serde_json::from_slice::(payload); - let event = match event { - Ok(e) => e, - Err(e) => { - metrics::counter!(EVENT_PARSE_ERROR).increment(1); - warn!("Failed to parse event: {:?}", e); - return None; - } - }; - Some(event) -} - -pub fn retain_from(buffer: &mut Vec, from: usize, predicate: impl Fn(&T) -> bool) { - let mut i = from; - while i < buffer.len() { - if !predicate(&buffer[i]) { - buffer.swap_remove(i); - } else { - i += 1; - } - } -} diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index bc0c694fdf8cd..5242ec921b67b 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -4,7 +4,6 @@ use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tracing::warn; -use uuid::Uuid; use crate::metrics_consts::EVENTS_SKIPPED; @@ -69,7 +68,6 @@ pub enum GroupType { #[derive(Clone, Debug, Eq, PartialEq)] pub struct PropertyDefinition { - pub id: Uuid, pub team_id: i32, pub name: String, pub is_numerical: bool, @@ -83,7 +81,6 @@ pub struct PropertyDefinition { #[derive(Clone, Debug, Eq, PartialEq)] pub struct EventDefinition { - pub id: Uuid, pub name: String, pub team_id: i32, pub last_seen_at: DateTime, @@ -115,7 +112,6 @@ pub struct Event { impl From<&Event> for EventDefinition { fn from(event: &Event) -> Self { EventDefinition { - id: Uuid::now_v7(), name: sanitize_event_name(&event.event), team_id: event.team_id, // We round last seen to the nearest day, as per the TS impl. Unwrap is safe here because we @@ -223,7 +219,6 @@ impl Event { let is_numerical = matches!(property_type, Some(PropertyValueType::Numeric)); let def = PropertyDefinition { - id: Uuid::now_v7(), team_id: self.team_id, name: key.clone(), is_numerical, @@ -274,8 +269,8 @@ fn detect_property_type(key: &str, value: &Value) -> Option { match value { Value::String(s) => { - let s = &s.trim().to_lowercase(); - if s == "true" || s == "false" { + let s = &s.trim(); + if *s == "true" || *s == "false" || *s == "TRUE" || *s == "FALSE" { Some(PropertyValueType::Boolean) } else { // TODO - we should try to auto-detect datetime strings here, but I'm skipping the chunk of regex necessary to do it for v0 @@ -287,7 +282,11 @@ fn detect_property_type(key: &str, value: &Value) -> Option { // "likely" to be a unix timestamp on the basis of the number of characters. I have mixed feelings about this, // so I'm going to leave it as just checking the key for now. This means we're being /less/ strict with datetime // detection here than in the TS - if key.to_lowercase().contains("timestamp") || key.to_lowercase().contains("time") { + if key.contains("timestamp") + || key.contains("TIMESTAMP") + || key.contains("time") + || key.contains("TIME") + { Some(PropertyValueType::DateTime) } else { Some(PropertyValueType::Numeric)