Skip to content

Commit

Permalink
first step - split fetching and parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 committed Oct 30, 2024
1 parent 13f48d7 commit abb52a1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 80 deletions.
3 changes: 1 addition & 2 deletions rust/cymbal/src/langs/js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ impl RawJSFrame {
where
C: SymbolCatalog<Url, SourceMap>,
{
let store = catalog.get();
let url = self.source_url()?;

let sourcemap = store.fetch(team_id, url).await?;
let sourcemap = catalog.lookup(team_id, url).await?;
let Some(token) = sourcemap.lookup_token(self.line, self.column) else {
return Err(
JsResolveErr::TokenNotFound(self.fn_name.clone(), self.line, self.column).into(),
Expand Down
62 changes: 5 additions & 57 deletions rust/cymbal/src/symbol_store/caching.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,26 @@
use std::{any::Any, collections::HashMap, sync::Arc, time::Instant};

use axum::async_trait;
use crate::metric_consts::{STORE_CACHED_BYTES, STORE_CACHE_EVICTIONS};
use sourcemap::SourceMap;
use tokio::sync::Mutex;

use crate::{
error::Error,
metric_consts::{
STORE_CACHED_BYTES, STORE_CACHE_EVICTIONS, STORE_CACHE_HITS, STORE_CACHE_MISSES,
STORE_CACHE_SIZE,
},
};

use super::SymbolProvider;

pub struct CachingProvider<P> {
cache: Arc<Mutex<CacheInner>>,
provider: P,
}

impl<P> CachingProvider<P> {
pub fn new(max_bytes: usize, provider: P, shared: Arc<Mutex<CacheInner>>) -> Self {
metrics::gauge!(STORE_CACHE_SIZE).set(max_bytes as f64);
Self {
cache: shared,
provider,
}
}
}

#[async_trait]
impl<P> SymbolProvider for CachingProvider<P>
where
P: SymbolProvider,
P::Ref: ToString + Send,
P::Set: Cacheable,
{
type Ref = P::Ref;
type Set = P::Set;

async fn fetch(&self, team_id: i32, r: Self::Ref) -> Result<Arc<Self::Set>, Error> {
let key = r.to_string();
let mut cache = self.cache.lock().await;
if let Some(res) = cache.get::<Self::Set>(&key) {
metrics::counter!(STORE_CACHE_HITS).increment(1);
return Ok(res);
}
let res = self.provider.fetch(team_id, r).await?;
cache.insert(key, res.clone(), res.bytes());
metrics::counter!(STORE_CACHE_MISSES).increment(1);
Ok(res)
}
}

pub struct CacheInner {
pub struct SymbolSetCache {
// We expect this cache to consist of few, but large, items.
// TODO - handle cases where two CachedSymbolSets have identical keys but different types
cached: HashMap<String, CachedSymbolSet>,
held_bytes: usize,
max_bytes: usize,
}

impl CacheInner {
impl SymbolSetCache {
pub fn new(max_bytes: usize) -> Self {
Self {
cached: HashMap::new(),
held_bytes: 0,
max_bytes,
}
}
}

impl CacheInner {
fn insert<T>(&mut self, key: String, value: Arc<T>, bytes: usize)
pub fn insert<T>(&mut self, key: String, value: Arc<T>, bytes: usize)
where
T: Any + Send + Sync,
{
Expand All @@ -89,7 +37,7 @@ impl CacheInner {
self.evict();
}

fn get<T>(&mut self, key: &str) -> Option<Arc<T>>
pub fn get<T>(&mut self, key: &str) -> Option<Arc<T>>
where
T: Any + Send + Sync,
{
Expand Down
43 changes: 30 additions & 13 deletions rust/cymbal/src/symbol_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::Arc;

use ::sourcemap::SourceMap;
use axum::async_trait;
use caching::{CacheInner, CachingProvider};
use caching::SymbolSetCache;

use ::sourcemap::SourceMap;
use reqwest::Url;
use sourcemap::SourcemapProvider;
use tokio::sync::Mutex;
Expand All @@ -12,34 +13,50 @@ use crate::error::Error;
pub mod caching;
pub mod sourcemap;

#[async_trait]
pub trait SymbolCatalog<Ref, Set>: Send + Sync + 'static {
fn get(&self) -> &dyn SymbolProvider<Ref = Ref, Set = Set>;
// TODO - this doesn't actually need to return an Arc, but it does for now, because I'd
// need to re-write the cache to let it return &'s instead, and the Arc overhead is not
// going to be per critical right now
async fn lookup(&self, team_id: i32, r: Ref) -> Result<Arc<Set>, Error>;
}

#[async_trait]
pub trait SymbolProvider: Send + Sync + 'static {
pub trait Fetcher: Send + Sync + 'static {
type Ref;
async fn fetch(&self, team_id: i32, r: Self::Ref) -> Result<Vec<u8>, Error>;
}

pub trait Parser: Send + Sync + 'static {
type Set;
// Symbol stores return an Arc, to allow them to cache (and evict) without any consent from callers
async fn fetch(&self, team_id: i32, r: Self::Ref) -> Result<Arc<Self::Set>, Error>;
fn parse(&self, data: Vec<u8>) -> Result<Self::Set, Error>;
}

pub struct Catalog {
pub sourcemap: CachingProvider<SourcemapProvider>,
pub cache: Arc<Mutex<CacheInner>>,
pub cache: Mutex<SymbolSetCache>,
pub sourcemap: SourcemapProvider,
}

impl Catalog {
pub fn new(max_bytes: usize, sourcemap: SourcemapProvider) -> Self {
let cache = Arc::new(Mutex::new(CacheInner::new(max_bytes)));
let sourcemap = CachingProvider::new(max_bytes, sourcemap, cache.clone());

let cache = Mutex::new(SymbolSetCache::new(max_bytes));
Self { sourcemap, cache }
}
}

#[async_trait]
impl SymbolCatalog<Url, SourceMap> for Catalog {
fn get(&self) -> &dyn SymbolProvider<Ref = Url, Set = SourceMap> {
&self.sourcemap
async fn lookup(&self, team_id: i32, r: Url) -> Result<Arc<SourceMap>, Error> {
let mut cache = self.cache.lock().await;
let cache_key = format!("{}:{}", team_id, r);
if let Some(set) = cache.get(&cache_key) {
return Ok(set);
}
let fetched = self.sourcemap.fetch(team_id, r).await?;
let bytes = fetched.len();
let parsed = self.sourcemap.parse(fetched)?;
let parsed = Arc::new(parsed);
cache.insert(cache_key, parsed.clone(), bytes);
Ok(parsed)
}
}
18 changes: 10 additions & 8 deletions rust/cymbal/src/symbol_store/sourcemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
},
};

use super::SymbolProvider;
use super::{Fetcher, Parser};

pub struct SourcemapProvider {
pub client: reqwest::Client,
Expand All @@ -38,16 +38,18 @@ impl SourcemapProvider {
}

#[async_trait]
impl SymbolProvider for SourcemapProvider {
impl Fetcher for SourcemapProvider {
type Ref = Url;
type Set = SourceMap;
async fn fetch(&self, _: i32, r: Url) -> Result<Arc<SourceMap>, Error> {
async fn fetch(&self, _: i32, r: Url) -> Result<Vec<u8>, Error> {
let sourcemap_url = find_sourcemap_url(&self.client, r).await?;
let data = fetch_source_map(&self.client, sourcemap_url).await?;
Ok(fetch_source_map(&self.client, sourcemap_url).await?)
}
}

Ok(Arc::new(
SourceMap::from_reader(data.as_slice()).map_err(JsResolveErr::from)?,
))
impl Parser for SourcemapProvider {
type Set = SourceMap;
fn parse(&self, data: Vec<u8>) -> Result<Self::Set, Error> {
Ok(SourceMap::from_reader(data.as_slice()).map_err(JsResolveErr::from)?)
}
}

Expand Down

0 comments on commit abb52a1

Please sign in to comment.