Skip to content

Commit

Permalink
Add a method (and benchmark) to Context to speed up active span deter…
Browse files Browse the repository at this point in the history
…mination by 7x (open-telemetry#1140)
  • Loading branch information
shaun-cox authored Jul 6, 2023
1 parent 89bf91c commit 0bf18ce
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 31 deletions.
37 changes: 25 additions & 12 deletions opentelemetry-api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::Arc;

thread_local! {
static CURRENT_CONTEXT: RefCell<Context> = RefCell::new(Context::default());
static DEFAULT_CONTEXT: Context = Context::default();
}

/// An execution-scoped collection of values.
Expand Down Expand Up @@ -107,7 +106,19 @@ impl Context {
/// do_work()
/// ```
pub fn current() -> Self {
get_current(|cx| cx.clone())
Context::map_current(|cx| cx.clone())
}

/// Applys a function to the current context returning its value.
///
/// This can be used to build higher performing algebraic expressions for
/// optionally creating a new context without the overhead of cloning the
/// current one and dropping it.
///
/// Note: This function will panic if you attempt to attach another context
/// while the current one is still borrowed.
pub fn map_current<T>(f: impl FnOnce(&Context) -> T) -> T {
CURRENT_CONTEXT.with(|cx| f(&cx.borrow()))
}

/// Returns a clone of the current thread's context with the given value.
Expand Down Expand Up @@ -318,16 +329,6 @@ impl Drop for ContextGuard {
}
}

/// Executes a closure with a reference to this thread's current context.
///
/// Note: This function will panic if you attempt to attach another context
/// while the context is still borrowed.
fn get_current<F: FnMut(&Context) -> T, T>(mut f: F) -> T {
CURRENT_CONTEXT
.try_with(|cx| f(&cx.borrow()))
.unwrap_or_else(|_| DEFAULT_CONTEXT.with(|cx| f(cx)))
}

/// With TypeIds as keys, there's no need to hash them. They are already hashes
/// themselves, coming from the compiler. The IdHasher holds the u64 of
/// the TypeId, and then returns it, instead of doing any bit fiddling.
Expand Down Expand Up @@ -373,11 +374,23 @@ mod tests {
let current = Context::current();
assert_eq!(current.get(), Some(&ValueA("a")));
assert_eq!(current.get(), Some(&ValueB(42)));

assert!(Context::map_current(|cx| {
assert_eq!(cx.get(), Some(&ValueA("a")));
assert_eq!(cx.get(), Some(&ValueB(42)));
true
}));
}

// Resets to only value `a` when inner guard is dropped
let current = Context::current();
assert_eq!(current.get(), Some(&ValueA("a")));
assert_eq!(current.get::<ValueB>(), None);

assert!(Context::map_current(|cx| {
assert_eq!(cx.get(), Some(&ValueA("a")));
assert_eq!(cx.get::<ValueB>(), None);
true
}));
}
}
4 changes: 2 additions & 2 deletions opentelemetry-api/src/propagation/text_map_propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait TextMapPropagator: Debug {
/// [`Context`]: crate::Context
/// [`Injector`]: crate::propagation::Injector
fn inject(&self, injector: &mut dyn Injector) {
self.inject_context(&Context::current(), injector)
Context::map_current(|cx| self.inject_context(cx, injector))
}

/// Properly encodes the values of the [`Context`] and injects them into the
Expand All @@ -35,7 +35,7 @@ pub trait TextMapPropagator: Debug {
/// [`Context`]: crate::Context
/// [`Injector`]: crate::propagation::Extractor
fn extract(&self, extractor: &dyn Extractor) -> Context {
self.extract_with_context(&Context::current(), extractor)
Context::map_current(|cx| self.extract_with_context(cx, extractor))
}

/// Retrieves encoded data using the provided [`Extractor`]. If no data for this
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-api/src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub trait Tracer {
where
T: Into<Cow<'static, str>>,
{
self.build_with_context(SpanBuilder::from_name(name), &Context::current())
Context::map_current(|cx| self.start_with_context(name, cx))
}

/// Starts a new [`Span`] with a given context.
Expand Down Expand Up @@ -169,7 +169,7 @@ pub trait Tracer {

/// Start a [`Span`] from a [`SpanBuilder`].
fn build(&self, builder: SpanBuilder) -> Self::Span {
self.build_with_context(builder, &Context::current())
Context::map_current(|cx| self.build_with_context(builder, cx))
}

/// Start a span from a [`SpanBuilder`] with a parent context.
Expand Down Expand Up @@ -382,7 +382,7 @@ impl SpanBuilder {

/// Builds a span with the given tracer from this configuration.
pub fn start<T: Tracer>(self, tracer: &T) -> T::Span {
tracer.build_with_context(self, &Context::current())
Context::map_current(|cx| tracer.build_with_context(self, cx))
}

/// Builds a span with the given tracer from this configuration and parent.
Expand Down
8 changes: 6 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ rustdoc-args = ["--cfg", "docsrs"]

[dev-dependencies]
indexmap = "1.8"
criterion = { version = "0.4.0", features = ["html_reports"] }
pprof = { version = "0.11.1", features = ["flamegraph", "criterion"] }
criterion = { version = "0.5", features = ["html_reports"] }
pprof = { version = "0.12", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
Expand All @@ -51,6 +51,10 @@ rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]

[[bench]]
name = "context"
harness = false

[[bench]]
name = "key_value_map"
harness = false
Expand Down
94 changes: 94 additions & 0 deletions opentelemetry-sdk/benches/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::fmt::Display;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use futures_util::future::BoxFuture;
use opentelemetry_api::{
trace::{TraceContextExt, Tracer, TracerProvider},
Context,
};
use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
trace as sdktrace,
};
use pprof::criterion::{Output, PProfProfiler};

fn criterion_benchmark(c: &mut Criterion) {
benchmark_group(c, BenchmarkParameter::NoActiveSpan);
benchmark_group(c, BenchmarkParameter::WithActiveSpan);
}

fn benchmark_group(c: &mut Criterion, p: BenchmarkParameter) {
let _guard = match p {
BenchmarkParameter::NoActiveSpan => None,
BenchmarkParameter::WithActiveSpan => {
let (provider, tracer) = tracer();
let guard = Context::current_with_span(tracer.start("span")).attach();
Some((guard, provider))
}
};

let mut group = c.benchmark_group("context");

group.bench_function(BenchmarkId::new("baseline current()", p), |b| {
b.iter(|| {
black_box(Context::current());
})
});

group.bench_function(BenchmarkId::new("current().has_active_span()", p), |b| {
b.iter(|| {
black_box(Context::current().has_active_span());
})
});

group.bench_function(
BenchmarkId::new("map_current(|cx| cx.has_active_span())", p),
|b| {
b.iter(|| {
black_box(Context::map_current(|cx| cx.has_active_span()));
})
},
);

group.finish();
}

#[derive(Copy, Clone)]
enum BenchmarkParameter {
NoActiveSpan,
WithActiveSpan,
}

impl Display for BenchmarkParameter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
BenchmarkParameter::NoActiveSpan => write!(f, "no-active-span"),
BenchmarkParameter::WithActiveSpan => write!(f, "with-active-span"),
}
}
}

fn tracer() -> (sdktrace::TracerProvider, sdktrace::Tracer) {
let provider = sdktrace::TracerProvider::builder()
.with_config(sdktrace::config().with_sampler(sdktrace::Sampler::AlwaysOn))
.with_simple_exporter(NoopExporter)
.build();
let tracer = provider.tracer(module_path!());
(provider, tracer)
}

#[derive(Debug)]
struct NoopExporter;

impl SpanExporter for NoopExporter {
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
Box::pin(futures_util::future::ready(Ok(())))
}
}

criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = criterion_benchmark
}
criterion_main!(benches);
19 changes: 11 additions & 8 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use opentelemetry_api::{
global::{self},
logs::{LogRecord, LogResult},
logs::{LogRecord, LogResult, TraceContext},
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand Down Expand Up @@ -206,16 +206,19 @@ impl opentelemetry_api::logs::Logger for Logger {
Some(provider) => provider,
None => return,
};

let trace_context = if self.include_trace_context {
Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
})
} else {
None
};
let config = provider.config();
for processor in provider.log_processors() {
let mut record = record.clone();
if self.include_trace_context {
let ctx = Context::current();
if ctx.has_active_span() {
let span = ctx.span();
record.trace_context = Some(span.span_context().into());
}
if let Some(ref trace_context) = trace_context {
record.trace_context = Some(trace_context.clone())
}
let data = LogData {
record,
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/propagation/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let mut injector: HashMap<String, String> = HashMap::new();
injector.set(TRACESTATE_HEADER, state.to_string());

propagator.inject_context(&Context::current(), &mut injector);
Context::map_current(|cx| propagator.inject_context(cx, &mut injector));

assert_eq!(Extractor::get(&injector, TRACESTATE_HEADER), Some(state))
}
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-sdk/src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,16 @@ mod tests {
let span = tracer.span_builder("must_not_be_sampled").start(&tracer);
assert!(!span.span_context().is_sampled());

let _attached = Context::current()
.with_remote_span_context(SpanContext::new(
let context = Context::map_current(|cx| {
cx.with_remote_span_context(SpanContext::new(
TraceId::from_u128(1),
SpanId::from_u64(1),
TraceFlags::default(),
true,
Default::default(),
))
.attach();
});
let _attached = context.attach();
let span = tracer.span_builder("must_not_be_sampled").start(&tracer);

assert!(!span.span_context().is_sampled());
Expand Down

0 comments on commit 0bf18ce

Please sign in to comment.