Skip to content

Commit

Permalink
fix(err): assorted perf/concurrency things (#26235)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Nov 18, 2024
1 parent ead289e commit afeb552
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 34 deletions.
8 changes: 7 additions & 1 deletion rust/cymbal/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
frames::resolver::Resolver,
symbol_store::{
caching::{Caching, SymbolSetCache},
concurrency,
saving::Saving,
sourcemap::SourcemapProvider,
Catalog, S3Client,
Expand Down Expand Up @@ -80,13 +81,18 @@ impl AppContext {
config.ss_prefix.clone(),
);
let caching_smp = Caching::new(saving_smp, ss_cache);
// We want to fetch each sourcemap from the outside world
// exactly once, and if it isn't in the cache, load/parse
// it from s3 exactly once too. Limiting the per symbol set
// reference concurreny to 1 ensures this.
let limited_smp = concurrency::AtMostOne::new(caching_smp);

info!(
"AppContext initialized, subscribed to topic {}",
config.consumer.kafka_consumer_topic
);

let catalog = Catalog::new(caching_smp);
let catalog = Catalog::new(limited_smp);
let resolver = Resolver::new(config);

Ok(Self {
Expand Down
57 changes: 30 additions & 27 deletions rust/cymbal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use app_context::AppContext;
use common_types::ClickHouseEvent;
Expand All @@ -20,7 +20,7 @@ pub mod symbol_store;
pub mod types;

pub async fn handle_event(
context: &AppContext,
context: Arc<AppContext>,
mut event: ClickHouseEvent,
) -> Result<Option<ClickHouseEvent>, UnhandledError> {
let mut props = match get_props(&event) {
Expand All @@ -45,7 +45,7 @@ pub async fn handle_event(
// If we get an unhandled error during exception processing, we return an error, which should
// cause the caller to drop the offset without storing it - unhandled exceptions indicate
// a dependency is down, or some bug, adn we want to take lag in those situations.
results.push(process_exception(context, event.team_id, exception).await?);
results.push(process_exception(context.clone(), event.team_id, exception).await?);
}

let fingerprint = generate_fingerprint(&results);
Expand Down Expand Up @@ -79,7 +79,7 @@ fn get_props(event: &ClickHouseEvent) -> Result<RawErrProps, EventError> {
}

async fn process_exception(
context: &AppContext,
context: Arc<AppContext>,
team_id: i32,
mut e: Exception,
) -> Result<Exception, UnhandledError> {
Expand All @@ -96,40 +96,43 @@ async fn process_exception(
return Ok(e);
}

let mut results = Vec::with_capacity(frames.len());

// Cluster the frames by symbol set
// TODO - we really want to cluster across exceptions (and even across events),
// rather than just within a single exception
let mut groups = HashMap::new();
for (i, frame) in frames.into_iter().enumerate() {
let group = groups
.entry(frame.symbol_set_ref())
.or_insert_with(Vec::new);
group.push((i, frame));
}

for (_, frames) in groups.into_iter() {
for (i, frame) in frames {
let resolved_frame = context
let mut handles = Vec::with_capacity(frames.len());
let mut resolved_frames = Vec::with_capacity(frames.len());

for frame in frames.into_iter() {
let context = context.clone();
// Spawn a concurrent task for resolving every frame - we're careful elsewhere to
// ensure this kind of concurrency is fine, although this "throw it at the wall"
// data flow structure is pretty questionable. Once we switch to handling more than
// 1 event at a time, we should re-group frames into associated groups and then
// process those groups in-order (but the individual frames in them can still be
// thrown at the wall), with some cross-group concurrency.
handles.push(tokio::spawn(async move {
context
.resolver
.resolve(&frame, team_id, &context.pool, &context.catalog)
.await?;
results.push((i, resolved_frame));
}
.await
}));
}

results.sort_unstable_by_key(|(i, _)| *i);
// Collect the results
for handle in handles {
// Joinhandles wrap the returned type in a Result, because if the task panics,
// tokio catches it and returns an error. If any of our tasks panicked, we want
// to propogate that panic, so we unwrap the outer Result here.
let res = handle.await.unwrap()?;
resolved_frames.push(res)
}

e.stack = Some(Stacktrace::Resolved {
frames: results.into_iter().map(|(_, frame)| frame).collect(),
frames: resolved_frames,
});

Ok(e)
}

// This is stupidly expensive, since it round-trips the event through JSON, lol. We should change ClickhouseEvent to only do serde at the
// edges
// This is expensive, since it round-trips the event through JSON.
// We could maybe change ClickhouseEvent to only do serde at the edges
pub fn add_error_to_event(
event: &mut ClickHouseEvent,
e: impl ToString,
Expand Down
2 changes: 1 addition & 1 deletion rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn main() {
};
metrics::counter!(EVENT_RECEIVED).increment(1);

let _processed_event = match handle_event(&context, event).await {
let _processed_event = match handle_event(context.clone(), event).await {
Ok(r) => {
offset.store().unwrap();
r
Expand Down
24 changes: 19 additions & 5 deletions rust/cymbal/src/symbol_store/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::{

use super::{saving::Saveable, Fetcher, Parser, Provider};

// This is a type-specific symbol provider layer, designed to
// wrap some inner provider and provide a type-safe caching layer
pub struct Caching<P> {
inner: P,
cache: Arc<Mutex<SymbolSetCache>>,
Expand Down Expand Up @@ -42,16 +44,30 @@ where
return Ok(set);
}
metrics::counter!(STORE_CACHE_MISSES).increment(1);
drop(cache);

// Do the fetch, not holding the lock across it to allow
// concurrent fetches to occur (de-duping fetches is
// up to the caller of `lookup`, since relying on the
// cache to do it means assuming the caching layer is
// the outer layer, which is not something the interface
// guarentees)
let found = self.inner.fetch(team_id, r).await?;
let bytes = found.byte_count();
let parsed = self.inner.parse(found).await?;

let mut cache = self.cache.lock().await; // Re-acquire the cache-wide lock to insert, dropping the ref_lock

let parsed = Arc::new(parsed);
cache.insert(cache_key, parsed.clone(), bytes);
Ok(parsed)
}
}

// This is a cache shared across multiple symbol set providers, through the `Caching` above,
// such that two totally different "layers" can share an underlying "pool" of cache space. This
// is injected into the `Caching` layer at construct time, to allow this sharing across multiple
// provider layer "stacks" within the catalog.
pub struct SymbolSetCache {
// We expect this cache to consist of few, but large, items.
// TODO - handle cases where two CachedSymbolSets have identical keys but different types
Expand Down Expand Up @@ -113,16 +129,14 @@ impl SymbolSetCache {
// remove them in a separate pass.
let mut to_remove = vec![];
while self.held_bytes > self.max_bytes && !vals.is_empty() {
// We can unwrap here because we know we're not empty from the line above
// We can unwrap here because we know we're not empty from the line above (and
// really, even the !empty check could be skipped - if held_bytes is non-zero, we
// must have at least one element in vals)
let (to_remove_key, to_remove_val) = vals.pop().unwrap();
self.held_bytes -= to_remove_val.bytes;
to_remove.push(to_remove_key.clone());
}

for key in to_remove {
self.cached.remove(&key);
}

metrics::gauge!(STORE_CACHED_BYTES).set(self.held_bytes as f64);
}
}
Expand Down
79 changes: 79 additions & 0 deletions rust/cymbal/src/symbol_store/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{
collections::HashMap,
sync::{Arc, Weak},
};

use axum::async_trait;
use tokio::sync::{Mutex, OwnedMutexGuard};

use crate::error::Error;

use super::Provider;

// Limits the number of concurrent lookups
// for a given symbol set to 1. Note this places
// no concurrency limit /across/ different symbol
// sets, and places no limit on the number of users
// using the returned symbol set concurrently. Designed
// to wrap the caching/saving layers, allowing us to
// ensure we only fetch any given symbol set from the
// outside world exactly once
pub struct AtMostOne<P> {
pub inner: P,
limiters: Mutex<HashMap<String, Weak<Mutex<()>>>>,
}

impl<P> AtMostOne<P> {
pub fn new(inner: P) -> Self {
Self {
inner,
limiters: Default::default(),
}
}

// This needs to be async even though all it does is take a lock because
// the returned owned guard can be (and is) held across an await point, so
// if this was a sync mutex it'd block the executor. It so happens that the
// std library Mutex doesn't provide lock_owned anyway, so we'd have to pull
// in a new dependency if we wanted to write a sync version of this, but
// that's secondary to it actually needing to be async
pub async fn acquire(&self, key: impl ToString) -> OwnedMutexGuard<()> {
let key = key.to_string();
let mut state = self.limiters.lock().await;
let limiter = state.entry(key).or_default();

if let Some(lock) = limiter.upgrade() {
// If there's already a mutex in our shared state for this particular
// source ref, drop the global lock, and wait for the underlying source
// ref to be freed up
drop(state);
lock.lock_owned().await
} else {
// If there's no mutex in our shared state for this particular source ref,
// create one, acquire it, put a Weak to it in the shared state, and return
// the owning mutex guard (and therefore the underling Arc to the new mutex)
let new = Arc::new(Mutex::new(()));
*limiter = Arc::downgrade(&new);
let acquired = new.lock_owned().await;
drop(state);
acquired
}
}
}

#[async_trait]
impl<P> Provider for AtMostOne<P>
where
P: Provider,
P::Ref: ToString + Send,
{
type Ref = P::Ref;
type Set = P::Set;

async fn lookup(&self, team_id: i32, r: Self::Ref) -> Result<Arc<Self::Set>, Error> {
let lock = self.acquire(r.to_string()).await;
let result = self.inner.lookup(team_id, r).await;
drop(lock);
result
}
}
1 change: 1 addition & 0 deletions rust/cymbal/src/symbol_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reqwest::Url;
use crate::error::Error;

pub mod caching;
pub mod concurrency;
pub mod saving;
pub mod sourcemap;

Expand Down
4 changes: 4 additions & 0 deletions rust/cymbal/src/symbol_store/saving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ where
error!("Found a record with no data and no error: {:?}", record);
panic!("Found a record with no data and no error");
}
// TODO - this can fail due to changes in how we serialise, or changes in
// the error type - and we should handle that by deleting the symbol record
// and re-fetching, I think (we don't need to cleanup s3 since it's a failure
// case, there is no saved data).
let error = serde_json::from_str(&record.failure_reason.unwrap())
.map_err(UnhandledError::from)?;
return Err(Error::ResolutionError(error));
Expand Down

0 comments on commit afeb552

Please sign in to comment.