Skip to content

Commit

Permalink
benchmarks for create_client (#1392)
Browse files Browse the repository at this point in the history
Benchmarks against dev_grpc:

![Screenshot 2024-12-13 at 11 38 00 AM](https://github.com/user-attachments/assets/af86c211-6c85-44db-bb07-fd7dcc474518)
![Screenshot 2024-12-13 at 11 37 52 AM](https://github.com/user-attachments/assets/eebd1957-90fa-47ce-a787-6f9597891c4f)


Flamegraph:
![tracing-flamegraph](https://github.com/user-attachments/assets/fd3a9c7c-df6e-4e3d-8c82-4e7c05b7bb03)



According to flamegraph, we're spending the most time in [`tonic::Endpoint::connect`](https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.connect)

We could use [`connect_lazy`](https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.connect_lazy) but that just offloads it to the next api call, which in this case is related to initializing the identity (get_inbox_ids).

In another PR I'd like to either:
- Separate gRPC channel connection from client creation. Allow SDKs to initialize the backend connection somewhere separate from client creation
- use `connect_lazy` but would require a refactor of client creation to somehow defer identity verification to the background. Seems like maybe a messy/big refactor path
  • Loading branch information
insipx authored Dec 16, 2024
1 parent fb3fdcd commit 97eca90
Show file tree
Hide file tree
Showing 23 changed files with 387 additions and 191 deletions.
1 change: 1 addition & 0 deletions .cargo/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[profile.default]
retries = 3
default-filter = "not test(test_stream_all_messages_does_not_lose_messages)"
4 changes: 2 additions & 2 deletions .github/workflows/test-http-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ jobs:
- name: Install nextest
uses: taiki-e/install-action@nextest
- name: build tests
run: cargo nextest run --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api
run: cargo nextest run --config-file ".cargo/nextest.toml" --no-run --tests --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api
- name: cargo test
run: cargo nextest run --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2
run: cargo nextest run --config-file ".cargo/nextest.toml" --workspace --exclude xmtp_api_grpc --exclude xmtpv3 --exclude bindings_node --exclude bindings_wasm --features http-api --test-threads 2
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ wasm-bindgen-futures = "0.4"
wasm-bindgen-test = "0.3.49"
web-sys = "0.3"
zeroize = "1.8"
criterion = { version = "0.5", features = [
"html_reports",
"async_tokio",
]}
once_cell = "1.2"

# Internal Crate Dependencies
xmtp_api_grpc = { path = "xmtp_api_grpc" }
Expand All @@ -112,6 +117,9 @@ xmtp_proto = { path = "xmtp_proto" }
# and we don't rely on it for debugging that much.
debug = 0

[profile.bench]
debug = true

# Setting opt-level to 3 for proc macros/build scripts
# speeds up buildtime
[profile.dev.build-override]
Expand Down
14 changes: 14 additions & 0 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ xmtp_user_preferences = { path = "../xmtp_user_preferences" }
xmtp_v2 = { path = "../xmtp_v2" }
xmtp_common.workspace = true

# Bench
criterion = { workspace = true, optional = true }
fdlimit = { version = "0.3", optional = true}


[target.'cfg(target_os = "android")'.dependencies]
paranoid-android = "0.2"

Expand All @@ -47,3 +52,12 @@ uuid = { workspace = true, features = ["v4", "fast-rng"] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_mls = { path = "../xmtp_mls", features = ["test-utils"] }
xmtp_proto = { path = "../xmtp_proto", features = ["test-utils"] }

[features]
bench = ["xmtp_mls/bench", "xmtp_common/bench", "dep:criterion", "dep:fdlimit"]

[[bench]]
harness = false
name = "create_client"
required-features = ["bench"]

148 changes: 148 additions & 0 deletions bindings_ffi/benches/create_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! NOTE:
// `MAX_DB_POOL_SIZE` in `configuration.rs` must be set to `10`
// in order for these benchmarks to succesfully run & generate a report.
// (file descriptor issue)

use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::{Builder, Runtime};
use xmtp_common::{bench::BENCH_ROOT_SPAN, tmp_path};
use xmtp_id::InboxOwner;
use xmtp_mls::utils::test::HISTORY_SYNC_URL;
use xmtpv3::generate_inbox_id;

#[macro_use]
extern crate tracing;

fn setup() -> Runtime {
Builder::new_multi_thread()
.enable_time()
.enable_io()
.thread_name("xmtp-bencher")
.build()
.unwrap()
}

fn network_url() -> (String, bool) {
let dev = std::env::var("DEV_GRPC");
let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1");

if is_dev_network {
(xmtp_api_grpc::DEV_ADDRESS.to_string(), true)
} else {
(xmtp_api_grpc::LOCALHOST_ADDRESS.to_string(), false)
}
}

fn create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client");

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let path = tmp_path();
let (network, is_secure) = network_url();
(
inbox_id,
wallet.get_address(),
nonce,
path,
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(HISTORY_SYNC_URL.to_string()),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

fn cached_create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client_from_cached");
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let address = wallet.get_address();
let path = tmp_path();

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("cached_create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let (network, is_secure) = network_url();
(
inbox_id.clone(),
address.clone(),
nonce,
path.clone(),
HISTORY_SYNC_URL.to_string(),
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, history_sync, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(history_sync),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

criterion_group!(
name = create_client;
config = Criterion::default().sample_size(10);
targets = create_ffi_client, cached_create_ffi_client
);
criterion_main!(create_client);
2 changes: 1 addition & 1 deletion bindings_ffi/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ static LOGGER_INIT: Once = Once::new();
pub fn init_logger() {
LOGGER_INIT.call_once(|| {
let native_layer = native_layer();
tracing_subscriber::registry().with(native_layer).init()
let _ = tracing_subscriber::registry().with(native_layer).try_init();
});
}
3 changes: 3 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ xmtp_cryptography.workspace = true

parking_lot = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi", "json"], optional = true }
once_cell = { workspace = true, optional = true }
tracing-flame = { version = "0.2", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { workspace = true, features = ["js"] }
Expand All @@ -36,3 +38,4 @@ tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sy

[features]
test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"]
bench = ["test-utils", "dep:tracing-subscriber", "dep:once_cell", "dep:tracing-flame"]
74 changes: 74 additions & 0 deletions common/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use once_cell::sync::OnceCell;
use std::sync::Once;
use tracing::{Metadata, Subscriber};
use tracing_flame::{FlameLayer, FlushGuard};
use tracing_subscriber::{
layer::{Context, Filter, Layer, SubscriberExt},
registry::LookupSpan,
util::SubscriberInitExt,
EnvFilter,
};
static INIT: Once = Once::new();

static LOGGER: OnceCell<FlushGuard<std::io::BufWriter<std::fs::File>>> = OnceCell::new();

pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench";

/// initializes logging for benchmarks
/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options.
/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace`
pub fn logger() {
INIT.call_once(|| {
let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap();
let flame_layer = flame_layer
.with_threads_collapsed(true)
.with_module_path(true);
// .with_empty_samples(false);

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()))
.with(
flame_layer
// .with_filter(BenchFilter)
.with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")),
)
.init();

LOGGER.set(guard).unwrap();
})
}

/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync
/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky
/// workaround to allow us to async setup some groups.
pub fn bench_async_setup<F, T, Fut>(fun: F) -> T
where
F: Fn() -> Fut,
Fut: futures::future::Future<Output = T>,
{
use tokio::runtime::Handle;
tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await }))
}

/// Filters for only spans where the root span name is "bench"
pub struct BenchFilter;

impl<S> Filter<S> for BenchFilter
where
S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug,
for<'lookup> <S as LookupSpan<'lookup>>::Data: std::fmt::Debug,
{
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
if meta.name() == BENCH_ROOT_SPAN {
return true;
}
if let Some(id) = cx.current_span().id() {
if let Some(s) = cx.span_scope(id) {
if let Some(s) = s.from_root().take(1).collect::<Vec<_>>().first() {
return s.name() == BENCH_ROOT_SPAN;
}
}
}
false
}
}
3 changes: 3 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod test;
#[cfg(feature = "test-utils")]
pub use test::*;

#[cfg(feature = "bench")]
pub mod bench;

pub mod retry;
pub use retry::*;

Expand Down
Loading

0 comments on commit 97eca90

Please sign in to comment.