Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use jemalloc everywhere we're rusty #24796

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"common/health",
"common/metrics",
"common/dns",
"common/alloc",
"feature-flags",
"hook-api",
"hook-common",
Expand Down
5 changes: 1 addition & 4 deletions rust/capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ edition = "2021"
workspace = true

[dependencies]

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
Expand All @@ -21,6 +17,7 @@ envconfig = { workspace = true }
flate2 = { workspace = true }
governor = { workspace = true }
health = { path = "../common/health" }
common-alloc = { path = "../common/alloc" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
opentelemetry = { workspace = true }
Expand Down
7 changes: 1 addition & 6 deletions rust/capture/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ use tracing_subscriber::{EnvFilter, Layer};
use capture::config::Config;
use capture::server::serve;

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
common_alloc::used!();

async fn shutdown() {
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate())
Expand Down
11 changes: 11 additions & 0 deletions rust/common/alloc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "common-alloc"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[dependencies]
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
12 changes: 12 additions & 0 deletions rust/common/alloc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# What is this?

We use jemalloc everywhere we can, for any binary that we expect to run in a long-lived process. The reason for this is that our workloads are:
- multi-threaded
- extremely prone to memory fragmentation (due to our heavy use of `serde_json`, or json generally)

jemalloc helps reduce memory fragmentation hugely, to the point of solving production OOMs that would have made use of capture-rs for replay a non-starter with the default system allocator.

At time of writing (2024-09-04), rust workspaces don't have good support for specifying dependencies on a per-target basis, so this crate does the work of pulling in jemalloc only when compiling for supported targets, and then exposes a simple macro to use jemalloc as the global allocator. Anyone writing a binary crate should put this macro at the top of their `main.rs`. Libraries should not make use of this crate.

## Future work
Functions could be added to this crate to, in situations where jemalloc is in use, report a set of metrics about the allocator, as well as other functionality (health/liveness, a way to specify hooks to execute when memory usage exceeds a certain threshold, etc). Right now, it's prety barebones.
12 changes: 12 additions & 0 deletions rust/common/alloc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#[cfg(target_env = "msvc")]
pub use std::alloc::System as DefaultAllocator;
#[cfg(not(target_env = "msvc"))]
pub use tikv_jemallocator::Jemalloc as DefaultAllocator;

#[macro_export]
macro_rules! used {
() => {
#[global_allocator]
static GLOBAL: $crate::DefaultAllocator = $crate::DefaultAllocator;
};
}
1 change: 1 addition & 0 deletions rust/cyclotron-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cyclotron-core = { path = "../cyclotron-core" }
common-metrics = { path = "../common/metrics" }
common-dns = { path = "../common/dns" }
common-kafka = { path = "../common/kafka" }
common-alloc = { path = "../common/alloc" }
health = { path = "../common/health" }
reqwest = { workspace = true }
serde = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions rust/cyclotron-fetch/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use health::HealthRegistry;
use std::{future::ready, sync::Arc};
use tracing::{error, info};

common_alloc::used!();

async fn listen(app: Router, bind: String) -> Result<(), std::io::Error> {
let listener = tokio::net::TcpListener::bind(bind).await?;

Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cyclotron-core = { path = "../cyclotron-core" }
common-metrics = { path = "../common/metrics" }
common-kafka = { path = "../common/kafka" }
health = { path = "../common/health" }
common-alloc = { path = "../common/alloc" }
time = { workspace = true }
rdkafka = { workspace = true }

Expand Down
3 changes: 1 addition & 2 deletions rust/cyclotron-janitor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use health::{HealthHandle, HealthRegistry};
use std::{future::ready, time::Duration};
use tracing::{error, info};

/// Most of this stuff is stolen pretty shamelessly from the rustyhook janitor. It'll diverge more
/// once we introduce the management command stuff, but for now it's a good starting point.
common_alloc::used!();

async fn cleanup_loop(janitor: Janitor, livenes: HealthHandle, interval_secs: u64) -> Result<()> {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
Expand Down
6 changes: 2 additions & 4 deletions rust/cyclotron-janitor/tests/janitor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use chrono::{DateTime, Duration, Timelike, Utc};
use chrono::{Duration, Timelike, Utc};
use common_kafka::kafka_messages::app_metrics2::{
AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source,
};
use cyclotron_core::{JobInit, JobState, QueueManager, Worker};
use cyclotron_janitor::{config::JanitorSettings, janitor::Janitor};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr};
use rdkafka::{ClientConfig, Message};
use sqlx::PgPool;
use std::str::FromStr;
use uuid::Uuid;

use common_kafka::{test::create_mock_kafka, APP_METRICS2_TOPIC};
Expand Down Expand Up @@ -58,7 +56,7 @@ async fn janitor_test(db: PgPool) {
queue_name: queue_name.clone(),
priority: 0,
scheduled: now,
function_id: Some(uuid.clone()),
function_id: Some(uuid),
vm_state: None,
parameters: None,
blob: None,
Expand Down
1 change: 1 addition & 0 deletions rust/feature-flags/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ regex = "1.10.4"
maxminddb = "0.17"
sqlx = { workspace = true }
uuid = { workspace = true }
common-alloc = { path = "../common/alloc" }

[lints]
workspace = true
Expand Down
2 changes: 2 additions & 0 deletions rust/feature-flags/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tracing_subscriber::{EnvFilter, Layer};
use feature_flags::config::Config;
use feature_flags::server::serve;

common_alloc::used!();

async fn shutdown() {
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");
Expand Down
1 change: 1 addition & 0 deletions rust/hook-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
2 changes: 2 additions & 0 deletions rust/hook-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use hook_common::pgqueue::PgQueue;
mod config;
mod handlers;

common_alloc::used!();

async fn listen(app: Router, bind: String) -> Result<()> {
let listener = tokio::net::TcpListener::bind(bind).await?;

Expand Down
1 change: 1 addition & 0 deletions rust/hook-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-kafka = { path = "../common/kafka" }
common-alloc = { path = "../common/alloc" }
2 changes: 2 additions & 0 deletions rust/hook-janitor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod config;
mod handlers;
mod webhooks;

common_alloc::used!();

async fn listen(app: Router, bind: String) -> Result<()> {
let listener = tokio::net::TcpListener::bind(bind).await?;

Expand Down
1 change: 1 addition & 0 deletions rust/hook-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ url = { version = "2.2" }
common-metrics = { path = "../common/metrics" }
common-dns = { path = "../common/dns" }
common-kafka = { path = "../common/kafka" }
common-alloc = { path = "../common/alloc" }

[dev-dependencies]
httpmock = { workspace = true }
2 changes: 2 additions & 0 deletions rust/hook-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use hook_worker::config::Config;
use hook_worker::error::WorkerError;
use hook_worker::worker::WebhookWorker;

common_alloc::used!();

#[tokio::main]
async fn main() -> Result<(), WorkerError> {
tracing_subscriber::fmt::init();
Expand Down
1 change: 1 addition & 0 deletions rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ metrics = { workspace = true }
chrono = { workspace = true }
quick_cache = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
ahash = { workspace = true }
uuid = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions rust/property-defs-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use tokio::{
use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

common_alloc::used!();

fn setup_tracing() {
let log_layer: tracing_subscriber::filter::Filtered<
tracing_subscriber::fmt::Layer<tracing_subscriber::Registry>,
Expand Down
Loading