Skip to content

Commit

Permalink
feat(propdefs): make fast good (#24660)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Aug 28, 2024
1 parent c6bbc98 commit f0f18b1
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 82 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
quick_cache = "0.6.5"
quick_cache = "0.6.5"
ahash = "0.8.11"
6 changes: 5 additions & 1 deletion rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl EphemeralTopic {
// TODO: check for name collision?
let topic_name = random_string("events_", 16);
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
admin
let created = admin
.create_topics(
&[NewTopic {
name: &topic_name,
Expand All @@ -166,6 +166,10 @@ impl EphemeralTopic {
.await
.expect("failed to create topic");

for result in created {
result.expect("failed to create topic");
}

let consumer: BaseConsumer = config.create().expect("failed to create consumer");
let mut assignment = TopicPartitionList::new();
assignment.add_partition(&topic_name, 0);
Expand Down
2 changes: 1 addition & 1 deletion rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.1.0"
edition = "2021"

[dependencies]
uuid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
rdkafka = { workspace = true }
Expand All @@ -22,6 +21,7 @@ metrics = { workspace = true }
chrono = { workspace = true }
quick_cache = { workspace = true }
common-metrics = { path = "../common/metrics" }
ahash = { workspace = true }

[lints]
workspace = true
147 changes: 147 additions & 0 deletions rust/property-defs-rs/src/bin/benchmark_1million.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use property_defs_rs::types::{Event, Update};
use quick_cache::sync::Cache;
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
};

// This is a bad hack to just copy the function like this, but I'll refactor later
async fn spawn_producer_loop(
mut consumer: mpsc::Receiver<Event>,
channel: mpsc::Sender<Update>,
shared_cache: Arc<Cache<Update, ()>>,
skip_threshold: usize,
compaction_batch_size: usize,
total_updates_received: Arc<std::sync::atomic::AtomicUsize>,
) {
let mut batch = ahash::AHashSet::with_capacity(compaction_batch_size);
let mut last_send = tokio::time::Instant::now();
loop {
let event = match consumer.try_recv() {
Ok(event) => event,
Err(TryRecvError::Empty) => {
println!("Empty");
consumer.recv().await.unwrap()
}
Err(TryRecvError::Disconnected) => {
return;
}
};

let updates = event.into_updates(skip_threshold);
total_updates_received.fetch_add(updates.len(), std::sync::atomic::Ordering::Relaxed);

for update in updates {
if batch.contains(&update) {
continue;
}
batch.insert(update);

if batch.len() >= compaction_batch_size || last_send.elapsed() > Duration::from_secs(10)
{
last_send = tokio::time::Instant::now();
for update in batch.drain() {
if shared_cache.get(&update).is_some() {
continue;
}
shared_cache.insert(update.clone(), ());
match channel.try_send(update) {
Ok(_) => {}
Err(TrySendError::Full(update)) => {
println!("Worker blocked");
channel.send(update).await.unwrap();
}
Err(e) => {
panic!("Coordinator send failed: {:?}", e);
}
}
}
}
}
}
}

const EVENT_COUNT: usize = 1_000_000;
const COMPACTION_BATCH_SIZE: usize = 10_000;
const SKIP_THRESHOLD: usize = 10_000;
const CACHE_SIZE: usize = 5_000_000;
const CHANNEL_SIZE: usize = 50_000;

#[tokio::main]
async fn main() {
let (in_tx, in_rx) = mpsc::channel(CHANNEL_SIZE);
let (out_tx, mut out_rx) = mpsc::channel(CHANNEL_SIZE);
let cache = Arc::new(Cache::new(CACHE_SIZE));
let total_updates_received = Arc::new(std::sync::atomic::AtomicUsize::new(0));

let test_handle = tokio::spawn(spawn_producer_loop(
in_rx,
out_tx,
cache.clone(),
SKIP_THRESHOLD,
COMPACTION_BATCH_SIZE,
total_updates_received.clone(),
));

let test_events = (0..EVENT_COUNT)
.map(generate_test_event)
.collect::<Vec<_>>();

let total_updates_issued: Arc<std::sync::atomic::AtomicUsize> =
Arc::new(std::sync::atomic::AtomicUsize::new(0));
let total_updates_issued_mv = total_updates_issued.clone();
let return_handle = tokio::spawn(async move {
let mut batch = Vec::with_capacity(CHANNEL_SIZE);
while out_rx.recv_many(&mut batch, CHANNEL_SIZE).await > 0 {
total_updates_issued_mv.fetch_add(batch.len(), std::sync::atomic::Ordering::Relaxed);
batch.clear()
}
});

let sender_handle = tokio::spawn(async move {
for event in test_events {
in_tx.send(event).await.unwrap();
}
});

// Give that a second to run
tokio::time::sleep(Duration::from_secs(1)).await;

let start = tokio::time::Instant::now();
test_handle.await.unwrap();
let elapsed = start.elapsed();
println!(
"Processed {} events in {}s, {} events/s, issued {} updates, {} total updates ({} ratio)",
EVENT_COUNT,
elapsed.as_secs_f64(),
EVENT_COUNT as f64 / elapsed.as_secs_f64(),
total_updates_issued.load(std::sync::atomic::Ordering::Relaxed),
total_updates_received.load(std::sync::atomic::Ordering::Relaxed),
total_updates_issued.load(std::sync::atomic::Ordering::Relaxed) as f64
/ total_updates_received.load(std::sync::atomic::Ordering::Relaxed) as f64
);

sender_handle.await.unwrap();
return_handle.await.unwrap();
}

// This generates "random" events, in a world where we have N teams, each sending 8 different events, each with 100 properties
// That means we have N * 8 * 100 = N*800 EventProperties, as well as N*8 event definitions and N*100 properties
// in the universe of possible updates to generate. Setting N to 1000 gives 800_000 possible EventProperties,
// 8000 event definitions and 100_000 properties.
fn generate_test_event(seed: usize) -> Event {
let team_id = (seed % 1000) as i32;
let event_name = format!("test_event_{}", seed % 8); // Imagine each team sends about 8 different events
let properties: HashMap<String, String> =
(0..100) // The average event has 100 properties
.map(|i| (format!("key_{}", i), format!("val_{}", i)))
.collect();

Event {
team_id,
event: event_name,
properties: Some(serde_json::to_string(&properties).unwrap()),
}
}
8 changes: 5 additions & 3 deletions rust/property-defs-rs/src/bin/generate_test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use rdkafka::{
fn generate_test_event(seed: usize) -> Event {
let team_id = (seed % 100) as i32;
let event_name = format!("test_event_{}", seed % 8);
let properties: HashMap<String, String> = (0..200)
.map(|i| (format!("prop_{}", i), format!("val_{}", i)))
.collect();
let prop_key = format!("prop_{}", seed % 1000);
let properties: HashMap<String, String> =
(0..100) // The average event has 100 properties
.map(|i| (prop_key.clone(), format!("val_{}", i)))
.collect();

Event {
team_id,
Expand Down
20 changes: 12 additions & 8 deletions rust/property-defs-rs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub struct Config {
#[envconfig(nested = true)]
pub kafka: KafkaConfig,

// Update sets are batches into at least min_batch_size (unless we haven't sent a batch in more than a few seconds)
#[envconfig(default = "10")]
pub max_concurrent_transactions: usize,

Expand All @@ -27,22 +26,27 @@ pub struct Config {
pub max_issue_period: u64,

// Propdefs spawns N workers to pull events from kafka,
// marshal, and convert ot updates. The number of
// marshal, and convert to updates. The number of
// concurrent update batches sent to postgres is controlled
// by max_concurrent_transactions
#[envconfig(default = "10")]
#[envconfig(default = "4")]
pub worker_loop_count: usize,

// We maintain an internal cache, to avoid sending the same UPSERT multiple times. This is it's size.
#[envconfig(default = "100000")]
#[envconfig(default = "1000000")]
pub cache_capacity: usize,

// Each worker thread internally batches up to this number, then de-dupes
// across this batch before releasing to the main thread
#[envconfig(default = "50000")]
// Each worker maintains a small local batch of updates, which it
// flushes to the main thread (updating/filtering by the
// cross-thread cache while it does). This is that batch size.
#[envconfig(default = "10000")]
pub compaction_batch_size: usize,

#[envconfig(default = "100")]
// Workers send updates back to the main thread over a channel,
// which has a depth of this many slots. If the main thread slows,
// which usually means if postgres is slow, the workers will block
// after filling this channel.
#[envconfig(default = "1000")]
pub channel_slots_per_worker: usize,

// If an event has some ridiculous number of updates, we skip it
Expand Down
37 changes: 37 additions & 0 deletions rust/property-defs-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,41 @@
use metrics_consts::{EMPTY_EVENTS, EVENT_PARSE_ERROR};
use rdkafka::{message::BorrowedMessage, Message};
use tracing::warn;
use types::Event;

pub mod app_context;
pub mod config;
pub mod metrics_consts;
pub mod types;

// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains
// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now.
pub fn message_to_event(msg: BorrowedMessage) -> Option<Event> {
let Some(payload) = msg.payload() else {
warn!("Received empty event");
metrics::counter!(EMPTY_EVENTS).increment(1);
return None;
};

let event = serde_json::from_slice::<Event>(payload);
let event = match event {
Ok(e) => e,
Err(e) => {
metrics::counter!(EVENT_PARSE_ERROR).increment(1);
warn!("Failed to parse event: {:?}", e);
return None;
}
};
Some(event)
}

pub fn retain_from<T>(buffer: &mut Vec<T>, from: usize, predicate: impl Fn(&T) -> bool) {
let mut i = from;
while i < buffer.len() {
if !predicate(&buffer[i]) {
buffer.swap_remove(i);
} else {
i += 1;
}
}
}
Loading

0 comments on commit f0f18b1

Please sign in to comment.