Skip to content

Commit

Permalink
benchmarks for create_client
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Dec 13, 2024
1 parent 7c7dbdb commit c3caaf7
Show file tree
Hide file tree
Showing 19 changed files with 300 additions and 116 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ wasm-bindgen-futures = "0.4"
wasm-bindgen-test = "0.3.49"
web-sys = "0.3"
zeroize = "1.8"
criterion = { version = "0.5", features = [
"html_reports",
"async_tokio",
]}
once_cell = "1.2"

# Internal Crate Dependencies
xmtp_api_grpc = { path = "xmtp_api_grpc" }
Expand All @@ -112,6 +117,9 @@ xmtp_common = { path = "common" }
# and we don't rely on it for debugging that much.
debug = 0

[profile.bench]
debug = true

# Setting opt-level to 3 for proc macros/build scripts
# speeds up buildtime
[profile.dev.build-override]
Expand Down
14 changes: 14 additions & 0 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ xmtp_user_preferences = { path = "../xmtp_user_preferences" }
xmtp_v2 = { path = "../xmtp_v2" }
xmtp_common.workspace = true

# Bench
criterion = { workspace = true, optional = true }
fdlimit = { version = "0.3", optional = true}


[target.'cfg(target_os = "android")'.dependencies]
paranoid-android = "0.2"

Expand All @@ -47,3 +52,12 @@ uuid = { workspace = true, features = ["v4", "fast-rng"] }
xmtp_api_grpc = { path = "../xmtp_api_grpc", features = ["test-utils"] }
xmtp_mls = { path = "../xmtp_mls", features = ["test-utils"] }
xmtp_proto = { path = "../xmtp_proto", features = ["test-utils"] }

[features]
bench = ["xmtp_mls/bench", "xmtp_common/bench", "dep:criterion", "dep:fdlimit"]

[[bench]]
harness = false
name = "create_client"
required-features = ["bench"]

148 changes: 148 additions & 0 deletions bindings_ffi/benches/create_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! NOTE:
// `MAX_DB_POOL_SIZE` in `configuration.rs` must be set to `10`
// in order for these benchmarks to succesfully run & generate a report.
// (file descriptor issue)

use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::{Builder, Runtime};
use xmtp_common::{bench::BENCH_ROOT_SPAN, tmp_path};
use xmtp_id::InboxOwner;
use xmtp_mls::utils::test::HISTORY_SYNC_URL;
use xmtpv3::generate_inbox_id;

#[macro_use]
extern crate tracing;

fn setup() -> Runtime {
Builder::new_multi_thread()
.enable_time()
.enable_io()
.thread_name("xmtp-bencher")
.build()
.unwrap()
}

fn network_url() -> (String, bool) {
let dev = std::env::var("DEV_GRPC");
let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1");

if is_dev_network {
(xmtp_api_grpc::DEV_ADDRESS.to_string(), true)
} else {
(xmtp_api_grpc::LOCALHOST_ADDRESS.to_string(), false)
}
}

fn create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client");

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let path = tmp_path();
let (network, is_secure) = network_url();
(
inbox_id,
wallet.get_address(),
nonce,
path,
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(HISTORY_SYNC_URL.to_string()),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

fn cached_create_ffi_client(c: &mut Criterion) {
xmtp_common::bench::logger();

let runtime = setup();

let _ = fdlimit::raise_fd_limit();
let mut benchmark_group = c.benchmark_group("create_client_from_cached");
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let nonce = 1;
let inbox_id = generate_inbox_id(wallet.get_address(), nonce).unwrap();
let address = wallet.get_address();
let path = tmp_path();

// benchmark_group.sample_size(10);
benchmark_group.sampling_mode(criterion::SamplingMode::Flat);
benchmark_group.bench_function("cached_create_ffi_client", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
let (network, is_secure) = network_url();
(
inbox_id.clone(),
address.clone(),
nonce,
path.clone(),
HISTORY_SYNC_URL.to_string(),
network,
is_secure,
span.clone(),
)
},
|(inbox_id, address, nonce, path, history_sync, network, is_secure, span)| async move {
xmtpv3::mls::create_client(
network,
is_secure,
Some(path),
Some(vec![0u8; 32]),
&inbox_id,
address,
nonce,
None,
Some(history_sync),
)
.instrument(span)
.await
.unwrap();
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

criterion_group!(
name = create_client;
config = Criterion::default().sample_size(10);
targets = create_ffi_client, cached_create_ffi_client
);
criterion_main!(create_client);
2 changes: 1 addition & 1 deletion bindings_ffi/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ static LOGGER_INIT: Once = Once::new();
pub fn init_logger() {
LOGGER_INIT.call_once(|| {
let native_layer = native_layer();
tracing_subscriber::registry().with(native_layer).init()
let _ = tracing_subscriber::registry().with(native_layer).try_init();
});
}
3 changes: 3 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ xmtp_cryptography.workspace = true

parking_lot = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi", "json"], optional = true }
once_cell = { workspace = true, optional = true }
tracing-flame = { version = "0.2", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { workspace = true, features = ["js"] }
Expand All @@ -36,3 +38,4 @@ tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sy

[features]
test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"]
bench = ["test-utils", "dep:tracing-subscriber", "dep:once_cell", "dep:tracing-flame"]
74 changes: 74 additions & 0 deletions common/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use once_cell::sync::OnceCell;
use std::sync::Once;
use tracing::{Metadata, Subscriber};
use tracing_flame::{FlameLayer, FlushGuard};
use tracing_subscriber::{
layer::{Context, Filter, Layer, SubscriberExt},
registry::LookupSpan,
util::SubscriberInitExt,
EnvFilter,
};
static INIT: Once = Once::new();

static LOGGER: OnceCell<FlushGuard<std::io::BufWriter<std::fs::File>>> = OnceCell::new();

pub const BENCH_ROOT_SPAN: &str = "xmtp-trace-bench";

/// initializes logging for benchmarks
/// - FMT logging is enabled by passing the normal `RUST_LOG` environment variable options.
/// - Generate a flamegraph from tracing data by passing `XMTP_FLAMEGRAPH=trace`
pub fn logger() {
INIT.call_once(|| {
let (flame_layer, guard) = FlameLayer::with_file("./tracing.folded").unwrap();
let flame_layer = flame_layer
.with_threads_collapsed(true)
.with_module_path(true);
// .with_empty_samples(false);

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()))
.with(
flame_layer
// .with_filter(BenchFilter)
.with_filter(EnvFilter::from_env("XMTP_FLAMEGRAPH")),
)
.init();

LOGGER.set(guard).unwrap();
})
}

/// criterion `batch_iter` surrounds the closure in a `Runtime.block_on` despite being a sync
/// function, even in the async 'to_async` setup. Therefore we do this (only _slightly_) hacky
/// workaround to allow us to async setup some groups.
pub fn bench_async_setup<F, T, Fut>(fun: F) -> T
where
F: Fn() -> Fut,
Fut: futures::future::Future<Output = T>,
{
use tokio::runtime::Handle;
tokio::task::block_in_place(move || Handle::current().block_on(async move { fun().await }))
}

/// Filters for only spans where the root span name is "bench"
pub struct BenchFilter;

impl<S> Filter<S> for BenchFilter
where
S: Subscriber + for<'lookup> LookupSpan<'lookup> + std::fmt::Debug,
for<'lookup> <S as LookupSpan<'lookup>>::Data: std::fmt::Debug,
{
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
if meta.name() == BENCH_ROOT_SPAN {
return true;
}
if let Some(id) = cx.current_span().id() {
if let Some(s) = cx.span_scope(id) {
if let Some(s) = s.from_root().take(1).collect::<Vec<_>>().first() {
return s.name() == BENCH_ROOT_SPAN;
}
}
}
false
}
}
3 changes: 3 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod test;
#[cfg(feature = "test-utils")]
pub use test::*;

#[cfg(feature = "bench")]
pub mod bench;

pub mod retry;
pub use retry::*;

Expand Down
5 changes: 5 additions & 0 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::sync::oneshot;
use tonic::transport::ClientTlsConfig;
use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming};
use tracing::Instrument;

use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams};
use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage};
Expand All @@ -28,7 +29,9 @@ use xmtp_proto::{
Error, ErrorKind,
};

#[tracing::instrument(level = "trace", skip_all)]
pub async fn create_tls_channel(address: String) -> Result<Channel, Error> {
let span = tracing::trace_span!("grpc_connect", address);
let channel = Channel::from_shared(address)
.map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))?
// Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC.
Expand Down Expand Up @@ -58,6 +61,7 @@ pub async fn create_tls_channel(address: String) -> Result<Channel, Error> {
.tls_config(ClientTlsConfig::new().with_enabled_roots())
.map_err(|e| Error::new(ErrorKind::SetupTLSConfigError).with(e))?
.connect()
.instrument(span)
.await
.map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?;

Expand All @@ -74,6 +78,7 @@ pub struct Client {
}

impl Client {
#[tracing::instrument(level = "trace", skip_all)]
pub async fn create(host: impl ToString, is_secure: bool) -> Result<Self, Error> {
let host = host.to_string();
let app_version = MetadataValue::try_from(&String::from("0.0.0"))
Expand Down
Loading

0 comments on commit c3caaf7

Please sign in to comment.