From 9948988515cde2df0d2ad33c6e1f80bbf41ddb57 Mon Sep 17 00:00:00 2001 From: Scott Prutton Date: Wed, 18 Dec 2024 15:50:34 -0500 Subject: [PATCH] fix: retrieve from memory when waiting for snapshots --- lib/si-layer-cache/src/db/rebase_batch.rs | 2 +- .../src/db/workspace_snapshot.rs | 2 +- lib/si-layer-cache/src/hybrid_cache.rs | 137 +++++++++++++----- lib/si-layer-cache/src/layer_cache.rs | 16 +- .../tests/integration_test/db/cas.rs | 17 ++- .../tests/integration_test/db/func_run.rs | 13 +- .../tests/integration_test/db/func_run_log.rs | 13 +- .../integration_test/db/workspace_snapshot.rs | 18 ++- .../tests/integration_test/layer_cache.rs | 2 +- 9 files changed, 158 insertions(+), 62 deletions(-) diff --git a/lib/si-layer-cache/src/db/rebase_batch.rs b/lib/si-layer-cache/src/db/rebase_batch.rs index 34333d15dc..2960c600d5 100644 --- a/lib/si-layer-cache/src/db/rebase_batch.rs +++ b/lib/si-layer-cache/src/db/rebase_batch.rs @@ -102,7 +102,7 @@ where let mut tried = 0; let read_wait = Instant::now(); while tried < MAX_TRIES { - if let Some(v) = self.cache.cache().get(&key).await { + if let Some(v) = self.cache.cache().get_from_memory(key.clone()).await { span.record("si.layer_cache.memory_cache.hit", true); span.record( "si.layer_cache.memory_cache.read_wait_ms", diff --git a/lib/si-layer-cache/src/db/workspace_snapshot.rs b/lib/si-layer-cache/src/db/workspace_snapshot.rs index 56743c6995..dc6c4f1764 100644 --- a/lib/si-layer-cache/src/db/workspace_snapshot.rs +++ b/lib/si-layer-cache/src/db/workspace_snapshot.rs @@ -102,7 +102,7 @@ where let mut tried = 0; let read_wait = Instant::now(); while tried < MAX_TRIES { - if let Some(v) = self.cache.cache().get(&key).await { + if let Some(v) = self.cache.cache().get_from_memory(key.clone()).await { span.record("si.layer_cache.memory_cache.hit", true); span.record( "si.layer_cache.memory_cache.read_wait_ms", diff --git a/lib/si-layer-cache/src/hybrid_cache.rs b/lib/si-layer-cache/src/hybrid_cache.rs index 06a895a855..ebfe3d5b7b 100644 --- a/lib/si-layer-cache/src/hybrid_cache.rs +++ b/lib/si-layer-cache/src/hybrid_cache.rs @@ -1,16 +1,22 @@ use foyer::opentelemetry_0_26::OpenTelemetryMetricsRegistry; -use foyer::{Cache as MemCache, CacheBuilder}; +use foyer::{ + DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, + RateLimitPicker, RecoverMode, TokioRuntimeOptions, +}; +use std::cmp::max; use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock}; use telemetry::opentelemetry::global; use telemetry::tracing::{error, info}; +use tokio::fs; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::db::serialize; use crate::error::LayerDbResult; +use crate::LayerDbError; -const _FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb +const FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb const DEFAULT_MEMORY_RESERVED_PERCENT: u8 = 40; const DEFAULT_MEMORY_USABLE_MAX_PERCENT: u8 = 100; const DEFAULT_DISK_RESERVED_PERCENT: u8 = 5; @@ -41,7 +47,7 @@ pub struct Cache where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { - cache: MemCache, MaybeDeserialized>, + cache: HybridCache, MaybeDeserialized>, } impl Cache @@ -64,8 +70,28 @@ where computed_memory_cache_capacity_bytes.try_into()? }; + fs::create_dir_all(config.disk_path.as_path()).await?; + // Compute total disk which is in use for `disk_path` + let total_disk_bytes = fs4::total_space(config.disk_path.as_path())?; + + let disk_cache_capacity_bytes = { + // Subtract reserved disk percentage to determine total usable cache disk + let total_usable_disk_bytes = (total_disk_bytes as f64 + * (1.0 - (config.disk_reserved_percent as f64 / 100.0))) + .floor() as u64; + // Compute final usable disk as a percentage of the maximum usable disk + let computed_disk_cache_capacity_bytes = (total_usable_disk_bytes as f64 + * (config.disk_usable_max_percent as f64 / 100.0)) + .floor() as u64; + + // Ensure that the computed value is at least as big as the Foyer minimum + max(computed_disk_cache_capacity_bytes, FOYER_DISK_CACHE_MINUMUM).try_into()? + }; + info!( cache.name = &config.name, + cache.disk.total_bytes = total_disk_bytes, + cache.disk.size_bytes = disk_cache_capacity_bytes, cache.disk.reserved_percent = config.disk_reserved_percent, cache.disk.usable_max_percent = config.disk_usable_max_percent, cache.disk.rate_limit = config.disk_admission_rate_limit, @@ -78,47 +104,77 @@ where let cache_name: &'static str = config.name.leak(); - let cache: MemCache, MaybeDeserialized> = - CacheBuilder::new(memory_cache_capacity_bytes) - .with_name(cache_name) - .with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name))) - .with_weighter( - |_key: &Arc, value: &MaybeDeserialized| match value { - MaybeDeserialized::RawBytes(bytes) => bytes.len(), - MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint, - }, - ) - .build(); + let cache: HybridCache, MaybeDeserialized> = HybridCacheBuilder::new() + .with_name(cache_name) + .with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name))) + .memory(memory_cache_capacity_bytes) + .with_weighter( + |_key: &Arc, value: &MaybeDeserialized| match value { + MaybeDeserialized::RawBytes(bytes) => bytes.len(), + MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint, + }, + ) + .storage(Engine::Large) + .with_runtime_options(foyer::RuntimeOptions::Unified(TokioRuntimeOptions { + max_blocking_threads: 0, + worker_threads: 0, + })) + .with_admission_picker(Arc::new(RateLimitPicker::new( + config.disk_admission_rate_limit, + ))) + .with_device_options( + DirectFsDeviceOptions::new(config.disk_path) + .with_capacity(disk_cache_capacity_bytes), + ) + .with_large_object_disk_cache_options( + LargeEngineOptions::new() + .with_buffer_pool_size(config.disk_buffer_size) + .with_eviction_pickers(vec![Box::::default()]) + .with_flushers(config.disk_buffer_flushers) + .with_recover_concurrency(config.disk_recover_concurrency) + .with_indexer_shards(config.disk_indexer_shards) + .with_reclaimers(config.disk_reclaimers), + ) + .with_recover_mode(RecoverMode::Quiet) + .build() + .await + .map_err(|e| LayerDbError::Foyer(e.into()))?; Ok(Self { cache }) } - pub async fn get(&self, key: &str) -> Option { - match self.cache.get(key) { - Some(entry) => match entry.value() { - MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()), - MaybeDeserialized::RawBytes(bytes) => { - // If we fail to deserialize the raw bytes for some reason, pretend that we never - // had the key in the first place, and also remove it from the cache. - match serialize::from_bytes_async::(bytes).await { - Ok(deserialized) => { - self.insert(key.into(), deserialized.clone(), bytes.len()); - Some(deserialized) - } - Err(e) => { - error!( - "Failed to deserialize stored bytes from memory cache for key ({:?}): {}", - key, - e - ); - self.remove(key); - None - } + pub async fn get(&self, key: Arc) -> Option { + if let Ok(Some(entry)) = self.cache.obtain(key.clone()).await { + return self.maybe_deserialize(key, entry.value().clone()).await; + } + None + } + + pub async fn get_from_memory(&self, key: Arc) -> Option { + if let Some(entry) = self.cache.memory().get(&key) { + return self.maybe_deserialize(key, entry.value().clone()).await; + } + None + } + + async fn maybe_deserialize(&self, key: Arc, entry: MaybeDeserialized) -> Option { + match entry { + MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()), + MaybeDeserialized::RawBytes(bytes) => { + // If we fail to deserialize the raw bytes for some reason, pretend that we never + // had the key in the first place, and also remove it from the cache. + match serialize::from_bytes_async::(&bytes).await { + Ok(deserialized) => { + self.insert(key, deserialized.clone(), bytes.len()); + Some(deserialized) + } + Err(e) => { + error!("Failed to deserialize stored bytes from memory cache for key ({:?}): {}", key, e); + self.remove(&key); + None } } - }, - - _ => None, + } } } @@ -143,7 +199,10 @@ where } pub async fn close(&self) -> LayerDbResult<()> { - self.cache.clear(); + self.cache + .close() + .await + .map_err(|e| LayerDbError::Foyer(e.into()))?; Ok(()) } } diff --git a/lib/si-layer-cache/src/layer_cache.rs b/lib/si-layer-cache/src/layer_cache.rs index 83ad7182ec..361e1e3f29 100644 --- a/lib/si-layer-cache/src/layer_cache.rs +++ b/lib/si-layer-cache/src/layer_cache.rs @@ -65,7 +65,7 @@ where } pub async fn get(&self, key: Arc) -> LayerDbResult> { - Ok(match self.cache.get(&key).await { + Ok(match self.cache.get(key.clone()).await { Some(memory_value) => Some(memory_value), None => match self.pg.get(&key).await? { @@ -82,6 +82,18 @@ where }) } + #[instrument( + name = "layer_cache.get_from_memory", + level = "debug", + skip_all, + fields( + si.layer_cache.key = key.as_ref(), + ), + )] + pub async fn get_from_memory(&self, key: Arc) -> LayerDbResult> { + Ok(self.cache().get_from_memory(key).await) + } + #[instrument( name = "layer_cache.get_bytes_from_durable_storage", level = "debug", @@ -107,7 +119,7 @@ where for key in keys { let key_str: Arc = key.to_string().into(); - if let Some(found) = match self.cache.get(&key_str).await { + if let Some(found) = match self.cache.get(key_str.clone()).await { Some(value) => Some(value), None => { not_found.push(key_str.clone()); diff --git a/lib/si-layer-cache/tests/integration_test/db/cas.rs b/lib/si-layer-cache/tests/integration_test/db/cas.rs index 715b62ada6..51762b6594 100644 --- a/lib/si-layer-cache/tests/integration_test/db/cas.rs +++ b/lib/si-layer-cache/tests/integration_test/db/cas.rs @@ -43,7 +43,7 @@ async fn write_to_db() { let cas_pk_str: Arc = cas_pk.to_string().into(); // Are we in memory? - let in_memory = ldb.cas().cache.cache().get(&cas_pk_str).await; + let in_memory = ldb.cas().cache.cache().get(cas_pk_str.clone()).await; assert_eq!(Some(cas_value.clone()), in_memory); // Are we in pg? @@ -145,7 +145,7 @@ async fn cold_read_from_db() { // Delete from cache ldb.cas().cache.cache().remove(&cas_pk_str); - let not_in_cache = ldb.cas().cache.cache().get(&cas_pk_str).await; + let not_in_cache = ldb.cas().cache.cache().get(cas_pk_str.clone()).await; assert_eq!(not_in_cache, None); // Read the data from the cache @@ -159,7 +159,7 @@ async fn cold_read_from_db() { assert_eq!(&cas_value, &data); // Are we in cache after the read? - let in_cache = ldb.cas().cache.cache().get(&cas_pk_str).await; + let in_cache = ldb.cas().cache.cache().get(cas_pk_str.clone()).await; assert_eq!(Some(cas_value.clone()), in_cache); // Are we in pg? @@ -233,7 +233,7 @@ async fn writes_are_gossiped() { let mut memory_check_count = 0; while memory_check_count <= max_check_count { - let in_memory = ldb_axl.cas().cache.cache().get(&cas_pk_str).await; + let in_memory = ldb_axl.cas().cache.cache().get(cas_pk_str.clone()).await; match in_memory { Some(value) => { assert_eq!(cas_value.clone(), value); @@ -311,7 +311,7 @@ async fn stress_test() { let cas_value = Arc::new(CasValue::String(big_string.to_string())); let (postcard_value, _) = serialize::to_vec(&cas_value).expect("cannot deserialize big ass string"); - let cas_pk_string = ContentHash::new(&postcard_value).to_string(); + let cas_pk_string: Arc = ContentHash::new(&postcard_value).to_string().into(); let ldb_slash_task = ldb_slash.clone(); let _write_big_string = big_string.clone(); let write_cas_value = cas_value.clone(); @@ -333,7 +333,12 @@ async fn stress_test() { let max_check_count = 10_000; let mut memory_check_count = 0; while memory_check_count < max_check_count { - let in_memory = ldb_axl_task.cas().cache.cache().get(&cas_pk_string).await; + let in_memory = ldb_axl_task + .cas() + .cache + .cache() + .get(cas_pk_string.clone()) + .await; match in_memory { Some(value) => { let cas_value: Arc = diff --git a/lib/si-layer-cache/tests/integration_test/db/func_run.rs b/lib/si-layer-cache/tests/integration_test/db/func_run.rs index d9680176ff..162ee12c3a 100644 --- a/lib/si-layer-cache/tests/integration_test/db/func_run.rs +++ b/lib/si-layer-cache/tests/integration_test/db/func_run.rs @@ -46,7 +46,7 @@ async fn write_to_db() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!(value.id(), in_memory.expect("func run not in memory").id()); // Are we in pg? @@ -104,7 +104,7 @@ async fn update() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!(value.id(), in_memory.expect("func run not in memory").id()); // Are we in pg? @@ -132,7 +132,7 @@ async fn update() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await; assert_eq!( update_func_run.state(), in_memory.expect("func run not in memory").state(), @@ -155,7 +155,12 @@ async fn update() { let max_check_count = 10; let mut memory_check_count = 0; while memory_check_count <= max_check_count { - let in_memory = ldb_remote.func_run().cache.cache().get(&key_str).await; + let in_memory = ldb_remote + .func_run() + .cache + .cache() + .get(key_str.clone()) + .await; match in_memory { Some(value) => { assert_eq!(update_func_run.state(), value.state()); diff --git a/lib/si-layer-cache/tests/integration_test/db/func_run_log.rs b/lib/si-layer-cache/tests/integration_test/db/func_run_log.rs index 3049415707..7871508625 100644 --- a/lib/si-layer-cache/tests/integration_test/db/func_run_log.rs +++ b/lib/si-layer-cache/tests/integration_test/db/func_run_log.rs @@ -41,7 +41,7 @@ async fn write_to_db() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await; assert_eq!( value.id(), in_memory.expect("func run log not in memory").id() @@ -101,7 +101,7 @@ async fn update() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await; assert_eq!( value.id(), in_memory.expect("func run log not in memory").id() @@ -140,7 +140,7 @@ async fn update() { .expect("failed to write to layerdb"); // Are we in memory? - let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await; + let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await; assert_eq!( update_func_run_log.logs(), in_memory.expect("func run log not in memory").logs(), @@ -163,7 +163,12 @@ async fn update() { let max_check_count = 10; let mut memory_check_count = 0; while memory_check_count <= max_check_count { - let in_memory = ldb_remote.func_run_log().cache.cache().get(&key_str).await; + let in_memory = ldb_remote + .func_run_log() + .cache + .cache() + .get(key_str.clone()) + .await; match in_memory { Some(value) => { assert_eq!(update_func_run_log.logs(), value.logs()); diff --git a/lib/si-layer-cache/tests/integration_test/db/workspace_snapshot.rs b/lib/si-layer-cache/tests/integration_test/db/workspace_snapshot.rs index 70e5ff64ad..1d9a9c8be2 100644 --- a/lib/si-layer-cache/tests/integration_test/db/workspace_snapshot.rs +++ b/lib/si-layer-cache/tests/integration_test/db/workspace_snapshot.rs @@ -45,7 +45,12 @@ async fn write_to_db() { let key_str: Arc = key.to_string().into(); // Are we in memory? - let in_memory = ldb.workspace_snapshot().cache.cache().get(&key_str).await; + let in_memory = ldb + .workspace_snapshot() + .cache + .cache() + .get(key_str.clone()) + .await; assert_eq!(Some(value.clone()), in_memory); // Are we in pg? @@ -109,7 +114,12 @@ async fn evict_from_db() { } // Are we in memory? - let in_memory = ldb.workspace_snapshot().cache.cache().get(&key_str).await; + let in_memory = ldb + .workspace_snapshot() + .cache + .cache() + .get(key_str.clone()) + .await; assert_ne!(Some(value.clone()), in_memory); assert!( @@ -188,7 +198,7 @@ async fn evictions_are_gossiped() { .workspace_snapshot() .cache .cache() - .get(&pk_str) + .get(pk_str.clone()) .await; match in_memory { Some(value) => { @@ -241,7 +251,7 @@ async fn evictions_are_gossiped() { .workspace_snapshot() .cache .cache() - .get(&pk_str) + .get(pk_str.clone()) .await; match in_memory { Some(_value) => { diff --git a/lib/si-layer-cache/tests/integration_test/layer_cache.rs b/lib/si-layer-cache/tests/integration_test/layer_cache.rs index f74e6f8db1..5f5106fa27 100644 --- a/lib/si-layer-cache/tests/integration_test/layer_cache.rs +++ b/lib/si-layer-cache/tests/integration_test/layer_cache.rs @@ -37,7 +37,7 @@ async fn empty_insert_and_get() { // Confirm the insert went into the memory cache let memory_result = layer_cache .cache() - .get(&skid_row) + .get(skid_row.clone()) .await .expect("cannot find value in memory cache"); assert_eq!("slave to the grind", &memory_result[..]);