diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6c15916314724..df595c78383ce 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2118,6 +2118,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "mach" version = "0.3.2" @@ -2822,9 +2831,11 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "chrono", + "common-metrics", "envconfig", "futures", "health", + "lru", "metrics", "rdkafka", "serde", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 347530f99d1bd..64f7f1e196a50 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -87,4 +87,5 @@ tracing-opentelemetry = "0.23.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } -neon = "1" \ No newline at end of file +neon = "1" +lru = "0.12.4" \ No newline at end of file diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 177b159c9093b..87aa3afa966b3 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -20,6 +20,8 @@ axum = { workspace = true } serve-metrics = { path = "../common/serve_metrics" } metrics = { workspace = true } chrono = { workspace = true } +lru = { workspace = true } +common-metrics = { path = "../common/metrics" } [lints] workspace = true diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs index 317ff2c355925..1ac2a97d4a842 100644 --- a/rust/property-defs-rs/src/app_context.rs +++ b/rust/property-defs-rs/src/app_context.rs @@ -1,21 +1,17 @@ -use std::collections::HashSet; - use health::{HealthHandle, HealthRegistry}; -use sqlx::{postgres::PgPoolOptions, PgPool}; use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update}; pub struct AppContext { - pub pool: PgPool, + //pub pool: PgPool, pub liveness: HealthRegistry, pub worker_liveness: HealthHandle, } impl AppContext { - pub async fn new(config: &Config) -> Result { - let options = PgPoolOptions::new().max_connections(config.max_pg_connections); - - let pool = options.connect(&config.database_url).await?; + pub async fn new(_config: &Config) -> Result { + //let options = PgPoolOptions::new().max_connections(config.max_pg_connections); + //let pool = options.connect(&config.database_url).await?; let liveness: HealthRegistry = HealthRegistry::new("liveness"); let worker_liveness = liveness @@ -23,13 +19,13 @@ impl AppContext { .await; Ok(Self { - pool, + //pool, liveness, worker_liveness, }) } - pub async fn issue(&self, updates: HashSet) -> Result<(), sqlx::Error> { + pub async fn issue(&self, updates: Vec) -> Result<(), sqlx::Error> { metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64); Ok(()) } diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index f74b3a90886e9..83054ad00e043 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -12,19 +12,30 @@ pub struct Config { #[envconfig(nested = true)] pub kafka: KafkaConfig, + // Update sets are batches into at least min_batch_size (unless we haven't sent a batch in more than a few seconds) #[envconfig(default = "10")] pub max_concurrent_transactions: usize, - #[envconfig(default = "10000")] - pub max_batch_size: usize, - - // If a worker recieves a batch smaller than this, it will simply not commit the offset and - // sleep for a while, since DB ops/event scales inversely to batch size + // We issue writes (UPSERTS) to postgres in batches of this size. + // Total concurrent DB ops is max_concurrent_transactions * update_batch_size #[envconfig(default = "1000")] - pub min_batch_size: usize, + pub update_batch_size: usize, + + // We issue updates in batches of update_batch_size, or when we haven't + // received a new update in this many seconds + #[envconfig(default = "300")] + pub max_issue_period: u64, + + // Propdefs spawns N workers to pull events from kafka, + // marshal, and convert ot updates. The number of + // concurrent update batches sent to postgres is controlled + // by max_concurrent_transactions + #[envconfig(default = "10")] + pub worker_loop_count: usize, - #[envconfig(default = "100")] - pub next_event_wait_timeout_ms: u64, + // We maintain an internal cache, to avoid sending the same UPSERT multiple times. This is it's size. + #[envconfig(default = "100000")] + pub cache_capacity: usize, #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index e502daae26c9f..ae767ffe51b07 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -1,12 +1,17 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use axum::{routing::get, Router}; use envconfig::Envconfig; use futures::future::ready; +use lru::LruCache; use property_defs_rs::{ app_context::AppContext, config::Config, - metrics_consts::{BATCH_SKIPPED, EVENTS_RECEIVED, FORCED_SMALL_BATCH, SMALL_BATCH_SLEEP}, + metrics_consts::{ + BATCH_ACQUIRE_TIME, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, + PERMIT_WAIT_TIME, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE, + UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED, + }, types::{Event, Update}, }; use rdkafka::{ @@ -15,7 +20,13 @@ use rdkafka::{ ClientConfig, Message, }; use serve_metrics::{serve, setup_metrics_routes}; -use tokio::{select, task::JoinHandle, time::sleep}; +use tokio::{ + sync::{ + mpsc::{self, error::TrySendError}, + Semaphore, + }, + task::JoinHandle, +}; use tracing::{info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; @@ -50,6 +61,43 @@ fn start_health_liveness_server(config: &Config, context: Arc) -> Jo }) } +async fn spawn_producer_loop(consumer: Arc, channel: mpsc::Sender) { + loop { + let message = consumer + .recv() + .await + .expect("TODO - workers panic on kafka recv fail"); + + let Some(event) = message_to_event(message) else { + continue; + }; + + let updates = event.into_updates(); + + metrics::counter!(EVENTS_RECEIVED).increment(1); + metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64); + metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64); + + for update in updates { + // We first try to non-blocking send, so we can get a metric on backpressure + match channel.try_send(update) { + Ok(_) => continue, + Err(TrySendError::Full(u)) => { + metrics::counter!(WORKER_BLOCKED).increment(1); + channel + .send(u) + .await + .expect("TODO - workers panic on send fail"); + } + Err(TrySendError::Closed(_)) => { + warn!("Channel closed, shutting down worker"); + return; + } + }; + } + } +} + #[tokio::main] async fn main() -> Result<(), Box> { setup_tracing(); @@ -59,7 +107,7 @@ async fn main() -> Result<(), Box> { let kafka_config: ClientConfig = (&config.kafka).into(); - let consumer: StreamConsumer = kafka_config.create()?; + let consumer: Arc = Arc::new(kafka_config.create()?); let context = Arc::new(AppContext::new(&config).await?); @@ -69,50 +117,77 @@ async fn main() -> Result<(), Box> { start_health_liveness_server(&config, context.clone()); - let mut batch = Vec::with_capacity(config.max_batch_size); + let (tx, mut rx) = mpsc::channel(config.update_batch_size * 10); + let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions)); + let mut cache = LruCache::new(NonZeroUsize::new(config.cache_capacity).unwrap()); - let mut sleep_count = 0; - loop { - context.worker_liveness.report_healthy().await; + for _ in 0..config.worker_loop_count { + tokio::spawn(spawn_producer_loop(consumer.clone(), tx.clone())); + } - while batch.len() < config.max_batch_size { - // Try to grab from the consumer, but use a select! to timeout if we'd block for more than some time - select! { - res = consumer.recv() => { - batch.push(res?); // Workers die on an kafka error + loop { + let mut batch = Vec::with_capacity(config.update_batch_size); + + let batch_start = tokio::time::Instant::now(); + let batch_time = common_metrics::timing_guard(BATCH_ACQUIRE_TIME, &[]); + while batch.len() < config.update_batch_size { + context.worker_liveness.report_healthy().await; + + let before_recv = batch.len(); + let remaining_capacity = config.update_batch_size - batch.len(); + // We race these two, so we can escape this loop and do a small batch if we've been waiting too long + let recv = rx.recv_many(&mut batch, remaining_capacity); + let sleep = tokio::time::sleep(Duration::from_secs(1)); + + tokio::select! { + got = recv => { + if got == 0 { + warn!("Coordinator recv failed, dying"); + return Ok(()); + } + assert!(batch.len() == before_recv + got); + + // It's important that we only filter /newly seen/ elements, because + // we immediately insert them into the cache, so a full-pass filter + // on cache membership would empty the batch. + retain_from(&mut batch, before_recv, |u| !cache.contains(u)); + batch[before_recv..].iter().for_each(|u| { + cache.put(u.clone(), ()); + }); + + let filtered = (before_recv + got) - batch.len(); + metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(filtered as u64); + continue; } - _ = sleep(Duration::from_millis(config.next_event_wait_timeout_ms)) => { - break; + _ = sleep => { + if batch_start.elapsed() > Duration::from_secs(config.max_issue_period) { + warn!("Forcing small batch due to time limit"); + metrics::counter!(FORCED_SMALL_BATCH).increment(1); + break; + } } } } + batch_time.fin(); - // We only process batches over a certain threshold, unless we haven't received anything in a while, to reduce DB load - if batch.len() < config.min_batch_size { - sleep_count += 1; - info!("Batch size is less than min_batch_size, sleeping for 2 seconds"); - metrics::counter!(BATCH_SKIPPED).increment(1); - sleep(Duration::from_millis(2000)).await; - if sleep_count > 10 { - warn!("Slept too many times, continuing with a small batch"); - metrics::counter!(FORCED_SMALL_BATCH).increment(1); - } else { - metrics::counter!(SMALL_BATCH_SLEEP).increment(1); - continue; - } - } - sleep_count = 0; - - metrics::counter!(EVENTS_RECEIVED).increment(batch.len() as u64); - - let updates: HashSet = batch - .drain(..) - .filter_map(message_to_event) - .flat_map(Event::into_updates) - .filter_map(filter_cached) - .collect(); + metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set( + (config.max_concurrent_transactions - transaction_limit.available_permits()) as f64, + ); - context.issue(updates).await?; + // We unconditionally wait to acquire a transaction permit - this is our backpressure mechanism. If we + // fail to acquire a permit for long enough, we will fail liveness checks (but that implies our ongoing + // transactions are halted, at which point DB health is a concern). + let permit_acquire_time = common_metrics::timing_guard(PERMIT_WAIT_TIME, &[]); + let permit = transaction_limit.clone().acquire_owned().await.unwrap(); + permit_acquire_time.fin(); + + let context = context.clone(); + tokio::spawn(async move { + let _permit = permit; + let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]); + context.issue(batch).await.unwrap(); + issue_time.fin(); + }); } } @@ -121,7 +196,7 @@ async fn main() -> Result<(), Box> { fn message_to_event(msg: BorrowedMessage) -> Option { let Some(payload) = msg.payload() else { warn!("Received empty event"); - metrics::counter!("empty_event").increment(1); + metrics::counter!(EMPTY_EVENTS).increment(1); return None; }; @@ -129,7 +204,7 @@ fn message_to_event(msg: BorrowedMessage) -> Option { let event = match event { Ok(e) => e, Err(e) => { - metrics::counter!("event_parse_error").increment(1); + metrics::counter!(EVENT_PARSE_ERROR).increment(1); warn!("Failed to parse event: {:?}", e); return None; } @@ -137,9 +212,13 @@ fn message_to_event(msg: BorrowedMessage) -> Option { Some(event) } -// TODO: this is where caching would go, if we had any. Could probably use a bloom filter or something, -// rather than storing the entire update in memory, if we wanted to store some HUGE number of updates and -// be /really/ good about not hitting the DB when we don't need to. Right now this is just a no-op. -fn filter_cached(update: Update) -> Option { - Some(update) +pub fn retain_from(buffer: &mut Vec, from: usize, predicate: impl Fn(&T) -> bool) { + let mut i = from; + while i < buffer.len() { + if !predicate(&buffer[i]) { + buffer.swap_remove(i); + } else { + i += 1; + } + } } diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs index 5cb4ba0091f61..4571d0be11814 100644 --- a/rust/property-defs-rs/src/metrics_consts.rs +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -1,6 +1,14 @@ pub const UPDATES_ISSUED: &str = "prop_defs_issued_updates"; -pub const BATCH_SKIPPED: &str = "prop_defs_batch_skipped"; pub const EVENTS_RECEIVED: &str = "prop_defs_events_received"; pub const EVENTS_SKIPPED: &str = "prop_defs_events_skipped"; pub const FORCED_SMALL_BATCH: &str = "prop_defs_forced_small_batch"; -pub const SMALL_BATCH_SLEEP: &str = "prop_defs_small_batch_sleep"; +pub const UPDATES_SEEN: &str = "prop_defs_seen_updates"; +pub const WORKER_BLOCKED: &str = "prop_defs_worker_blocked"; +pub const UPDATES_PER_EVENT: &str = "prop_defs_updates_per_event"; +pub const UPDATES_FILTERED_BY_CACHE: &str = "prop_defs_filtered_by_cache"; +pub const TRANSACTION_LIMIT_SATURATION: &str = "prop_defs_transaction_limit_saturation"; +pub const EMPTY_EVENTS: &str = "prop_defs_empty_events"; +pub const EVENT_PARSE_ERROR: &str = "prop_defs_event_parse_error"; +pub const BATCH_ACQUIRE_TIME: &str = "prop_defs_batch_acquire_time_ms"; +pub const PERMIT_WAIT_TIME: &str = "prop_defs_permit_wait_time_ms"; +pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms";