Skip to content

Commit

Permalink
feat: propdefs perf + metrics (#24629)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Aug 28, 2024
1 parent f23b3af commit 04d32a6
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 68 deletions.
11 changes: 11 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,5 @@ tracing-opentelemetry = "0.23.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
neon = "1"
lru = "0.12.4"
2 changes: 2 additions & 0 deletions rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ axum = { workspace = true }
serve-metrics = { path = "../common/serve_metrics" }
metrics = { workspace = true }
chrono = { workspace = true }
lru = { workspace = true }
common-metrics = { path = "../common/metrics" }

[lints]
workspace = true
16 changes: 6 additions & 10 deletions rust/property-defs-rs/src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
use std::collections::HashSet;

use health::{HealthHandle, HealthRegistry};
use sqlx::{postgres::PgPoolOptions, PgPool};

use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update};

pub struct AppContext {
pub pool: PgPool,
//pub pool: PgPool,
pub liveness: HealthRegistry,
pub worker_liveness: HealthHandle,
}

impl AppContext {
pub async fn new(config: &Config) -> Result<Self, sqlx::Error> {
let options = PgPoolOptions::new().max_connections(config.max_pg_connections);

let pool = options.connect(&config.database_url).await?;
pub async fn new(_config: &Config) -> Result<Self, sqlx::Error> {
//let options = PgPoolOptions::new().max_connections(config.max_pg_connections);
//let pool = options.connect(&config.database_url).await?;

let liveness: HealthRegistry = HealthRegistry::new("liveness");
let worker_liveness = liveness
.register("worker".to_string(), time::Duration::seconds(60))
.await;

Ok(Self {
pool,
//pool,
liveness,
worker_liveness,
})
}

pub async fn issue(&self, updates: HashSet<Update>) -> Result<(), sqlx::Error> {
pub async fn issue(&self, updates: Vec<Update>) -> Result<(), sqlx::Error> {
metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64);
Ok(())
}
Expand Down
27 changes: 19 additions & 8 deletions rust/property-defs-rs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,30 @@ pub struct Config {
#[envconfig(nested = true)]
pub kafka: KafkaConfig,

// Update sets are batches into at least min_batch_size (unless we haven't sent a batch in more than a few seconds)
#[envconfig(default = "10")]
pub max_concurrent_transactions: usize,

#[envconfig(default = "10000")]
pub max_batch_size: usize,

// If a worker recieves a batch smaller than this, it will simply not commit the offset and
// sleep for a while, since DB ops/event scales inversely to batch size
// We issue writes (UPSERTS) to postgres in batches of this size.
// Total concurrent DB ops is max_concurrent_transactions * update_batch_size
#[envconfig(default = "1000")]
pub min_batch_size: usize,
pub update_batch_size: usize,

// We issue updates in batches of update_batch_size, or when we haven't
// received a new update in this many seconds
#[envconfig(default = "300")]
pub max_issue_period: u64,

// Propdefs spawns N workers to pull events from kafka,
// marshal, and convert ot updates. The number of
// concurrent update batches sent to postgres is controlled
// by max_concurrent_transactions
#[envconfig(default = "10")]
pub worker_loop_count: usize,

#[envconfig(default = "100")]
pub next_event_wait_timeout_ms: u64,
// We maintain an internal cache, to avoid sending the same UPSERT multiple times. This is it's size.
#[envconfig(default = "100000")]
pub cache_capacity: usize,

#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,
Expand Down
173 changes: 126 additions & 47 deletions rust/property-defs-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

use axum::{routing::get, Router};
use envconfig::Envconfig;
use futures::future::ready;
use lru::LruCache;
use property_defs_rs::{
app_context::AppContext,
config::Config,
metrics_consts::{BATCH_SKIPPED, EVENTS_RECEIVED, FORCED_SMALL_BATCH, SMALL_BATCH_SLEEP},
metrics_consts::{
BATCH_ACQUIRE_TIME, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, FORCED_SMALL_BATCH,
PERMIT_WAIT_TIME, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE,
UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED,
},
types::{Event, Update},
};
use rdkafka::{
Expand All @@ -15,7 +20,13 @@ use rdkafka::{
ClientConfig, Message,
};
use serve_metrics::{serve, setup_metrics_routes};
use tokio::{select, task::JoinHandle, time::sleep};
use tokio::{
sync::{
mpsc::{self, error::TrySendError},
Semaphore,
},
task::JoinHandle,
};
use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

Expand Down Expand Up @@ -50,6 +61,43 @@ fn start_health_liveness_server(config: &Config, context: Arc<AppContext>) -> Jo
})
}

async fn spawn_producer_loop(consumer: Arc<StreamConsumer>, channel: mpsc::Sender<Update>) {
loop {
let message = consumer
.recv()
.await
.expect("TODO - workers panic on kafka recv fail");

let Some(event) = message_to_event(message) else {
continue;
};

let updates = event.into_updates();

metrics::counter!(EVENTS_RECEIVED).increment(1);
metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64);
metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64);

for update in updates {
// We first try to non-blocking send, so we can get a metric on backpressure
match channel.try_send(update) {
Ok(_) => continue,
Err(TrySendError::Full(u)) => {
metrics::counter!(WORKER_BLOCKED).increment(1);
channel
.send(u)
.await
.expect("TODO - workers panic on send fail");
}
Err(TrySendError::Closed(_)) => {
warn!("Channel closed, shutting down worker");
return;
}
};
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
setup_tracing();
Expand All @@ -59,7 +107,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let kafka_config: ClientConfig = (&config.kafka).into();

let consumer: StreamConsumer = kafka_config.create()?;
let consumer: Arc<StreamConsumer> = Arc::new(kafka_config.create()?);

let context = Arc::new(AppContext::new(&config).await?);

Expand All @@ -69,50 +117,77 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

start_health_liveness_server(&config, context.clone());

let mut batch = Vec::with_capacity(config.max_batch_size);
let (tx, mut rx) = mpsc::channel(config.update_batch_size * 10);
let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions));
let mut cache = LruCache::new(NonZeroUsize::new(config.cache_capacity).unwrap());

let mut sleep_count = 0;
loop {
context.worker_liveness.report_healthy().await;
for _ in 0..config.worker_loop_count {
tokio::spawn(spawn_producer_loop(consumer.clone(), tx.clone()));
}

while batch.len() < config.max_batch_size {
// Try to grab from the consumer, but use a select! to timeout if we'd block for more than some time
select! {
res = consumer.recv() => {
batch.push(res?); // Workers die on an kafka error
loop {
let mut batch = Vec::with_capacity(config.update_batch_size);

let batch_start = tokio::time::Instant::now();
let batch_time = common_metrics::timing_guard(BATCH_ACQUIRE_TIME, &[]);
while batch.len() < config.update_batch_size {
context.worker_liveness.report_healthy().await;

let before_recv = batch.len();
let remaining_capacity = config.update_batch_size - batch.len();
// We race these two, so we can escape this loop and do a small batch if we've been waiting too long
let recv = rx.recv_many(&mut batch, remaining_capacity);
let sleep = tokio::time::sleep(Duration::from_secs(1));

tokio::select! {
got = recv => {
if got == 0 {
warn!("Coordinator recv failed, dying");
return Ok(());
}
assert!(batch.len() == before_recv + got);

// It's important that we only filter /newly seen/ elements, because
// we immediately insert them into the cache, so a full-pass filter
// on cache membership would empty the batch.
retain_from(&mut batch, before_recv, |u| !cache.contains(u));
batch[before_recv..].iter().for_each(|u| {
cache.put(u.clone(), ());
});

let filtered = (before_recv + got) - batch.len();
metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(filtered as u64);
continue;
}
_ = sleep(Duration::from_millis(config.next_event_wait_timeout_ms)) => {
break;
_ = sleep => {
if batch_start.elapsed() > Duration::from_secs(config.max_issue_period) {
warn!("Forcing small batch due to time limit");
metrics::counter!(FORCED_SMALL_BATCH).increment(1);
break;
}
}
}
}
batch_time.fin();

// We only process batches over a certain threshold, unless we haven't received anything in a while, to reduce DB load
if batch.len() < config.min_batch_size {
sleep_count += 1;
info!("Batch size is less than min_batch_size, sleeping for 2 seconds");
metrics::counter!(BATCH_SKIPPED).increment(1);
sleep(Duration::from_millis(2000)).await;
if sleep_count > 10 {
warn!("Slept too many times, continuing with a small batch");
metrics::counter!(FORCED_SMALL_BATCH).increment(1);
} else {
metrics::counter!(SMALL_BATCH_SLEEP).increment(1);
continue;
}
}
sleep_count = 0;

metrics::counter!(EVENTS_RECEIVED).increment(batch.len() as u64);

let updates: HashSet<Update> = batch
.drain(..)
.filter_map(message_to_event)
.flat_map(Event::into_updates)
.filter_map(filter_cached)
.collect();
metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set(
(config.max_concurrent_transactions - transaction_limit.available_permits()) as f64,
);

context.issue(updates).await?;
// We unconditionally wait to acquire a transaction permit - this is our backpressure mechanism. If we
// fail to acquire a permit for long enough, we will fail liveness checks (but that implies our ongoing
// transactions are halted, at which point DB health is a concern).
let permit_acquire_time = common_metrics::timing_guard(PERMIT_WAIT_TIME, &[]);
let permit = transaction_limit.clone().acquire_owned().await.unwrap();
permit_acquire_time.fin();

let context = context.clone();
tokio::spawn(async move {
let _permit = permit;
let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]);
context.issue(batch).await.unwrap();
issue_time.fin();
});
}
}

Expand All @@ -121,25 +196,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
fn message_to_event(msg: BorrowedMessage) -> Option<Event> {
let Some(payload) = msg.payload() else {
warn!("Received empty event");
metrics::counter!("empty_event").increment(1);
metrics::counter!(EMPTY_EVENTS).increment(1);
return None;
};

let event = serde_json::from_slice::<Event>(payload);
let event = match event {
Ok(e) => e,
Err(e) => {
metrics::counter!("event_parse_error").increment(1);
metrics::counter!(EVENT_PARSE_ERROR).increment(1);
warn!("Failed to parse event: {:?}", e);
return None;
}
};
Some(event)
}

// TODO: this is where caching would go, if we had any. Could probably use a bloom filter or something,
// rather than storing the entire update in memory, if we wanted to store some HUGE number of updates and
// be /really/ good about not hitting the DB when we don't need to. Right now this is just a no-op.
fn filter_cached(update: Update) -> Option<Update> {
Some(update)
pub fn retain_from<T>(buffer: &mut Vec<T>, from: usize, predicate: impl Fn(&T) -> bool) {
let mut i = from;
while i < buffer.len() {
if !predicate(&buffer[i]) {
buffer.swap_remove(i);
} else {
i += 1;
}
}
}
12 changes: 10 additions & 2 deletions rust/property-defs-rs/src/metrics_consts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
pub const UPDATES_ISSUED: &str = "prop_defs_issued_updates";
pub const BATCH_SKIPPED: &str = "prop_defs_batch_skipped";
pub const EVENTS_RECEIVED: &str = "prop_defs_events_received";
pub const EVENTS_SKIPPED: &str = "prop_defs_events_skipped";
pub const FORCED_SMALL_BATCH: &str = "prop_defs_forced_small_batch";
pub const SMALL_BATCH_SLEEP: &str = "prop_defs_small_batch_sleep";
pub const UPDATES_SEEN: &str = "prop_defs_seen_updates";
pub const WORKER_BLOCKED: &str = "prop_defs_worker_blocked";
pub const UPDATES_PER_EVENT: &str = "prop_defs_updates_per_event";
pub const UPDATES_FILTERED_BY_CACHE: &str = "prop_defs_filtered_by_cache";
pub const TRANSACTION_LIMIT_SATURATION: &str = "prop_defs_transaction_limit_saturation";
pub const EMPTY_EVENTS: &str = "prop_defs_empty_events";
pub const EVENT_PARSE_ERROR: &str = "prop_defs_event_parse_error";
pub const BATCH_ACQUIRE_TIME: &str = "prop_defs_batch_acquire_time_ms";
pub const PERMIT_WAIT_TIME: &str = "prop_defs_permit_wait_time_ms";
pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms";

0 comments on commit 04d32a6

Please sign in to comment.