Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retrieve from memory when waiting for snapshots #5161

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/si-layer-cache/src/db/rebase_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
let mut tried = 0;
let read_wait = Instant::now();
while tried < MAX_TRIES {
if let Some(v) = self.cache.cache().get(&key).await {
if let Some(v) = self.cache.cache().get_from_memory(key.clone()).await {
span.record("si.layer_cache.memory_cache.hit", true);
span.record(
"si.layer_cache.memory_cache.read_wait_ms",
Expand Down
2 changes: 1 addition & 1 deletion lib/si-layer-cache/src/db/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
let mut tried = 0;
let read_wait = Instant::now();
while tried < MAX_TRIES {
if let Some(v) = self.cache.cache().get(&key).await {
if let Some(v) = self.cache.cache().get_from_memory(key.clone()).await {
span.record("si.layer_cache.memory_cache.hit", true);
span.record(
"si.layer_cache.memory_cache.read_wait_ms",
Expand Down
137 changes: 98 additions & 39 deletions lib/si-layer-cache/src/hybrid_cache.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use foyer::opentelemetry_0_26::OpenTelemetryMetricsRegistry;
use foyer::{Cache as MemCache, CacheBuilder};
use foyer::{
DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions,
RateLimitPicker, RecoverMode, TokioRuntimeOptions,
};
use std::cmp::max;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};
use telemetry::opentelemetry::global;
use telemetry::tracing::{error, info};
use tokio::fs;

use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::db::serialize;
use crate::error::LayerDbResult;
use crate::LayerDbError;

const _FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const DEFAULT_MEMORY_RESERVED_PERCENT: u8 = 40;
const DEFAULT_MEMORY_USABLE_MAX_PERCENT: u8 = 100;
const DEFAULT_DISK_RESERVED_PERCENT: u8 = 5;
Expand Down Expand Up @@ -41,7 +47,7 @@ pub struct Cache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
cache: MemCache<Arc<str>, MaybeDeserialized<V>>,
cache: HybridCache<Arc<str>, MaybeDeserialized<V>>,
}

impl<V> Cache<V>
Expand All @@ -64,8 +70,28 @@ where
computed_memory_cache_capacity_bytes.try_into()?
};

fs::create_dir_all(config.disk_path.as_path()).await?;
// Compute total disk which is in use for `disk_path`
let total_disk_bytes = fs4::total_space(config.disk_path.as_path())?;

let disk_cache_capacity_bytes = {
// Subtract reserved disk percentage to determine total usable cache disk
let total_usable_disk_bytes = (total_disk_bytes as f64
* (1.0 - (config.disk_reserved_percent as f64 / 100.0)))
.floor() as u64;
// Compute final usable disk as a percentage of the maximum usable disk
let computed_disk_cache_capacity_bytes = (total_usable_disk_bytes as f64
* (config.disk_usable_max_percent as f64 / 100.0))
.floor() as u64;

// Ensure that the computed value is at least as big as the Foyer minimum
max(computed_disk_cache_capacity_bytes, FOYER_DISK_CACHE_MINUMUM).try_into()?
};

info!(
cache.name = &config.name,
cache.disk.total_bytes = total_disk_bytes,
cache.disk.size_bytes = disk_cache_capacity_bytes,
cache.disk.reserved_percent = config.disk_reserved_percent,
cache.disk.usable_max_percent = config.disk_usable_max_percent,
cache.disk.rate_limit = config.disk_admission_rate_limit,
Expand All @@ -78,47 +104,77 @@ where

let cache_name: &'static str = config.name.leak();

let cache: MemCache<Arc<str>, MaybeDeserialized<V>> =
CacheBuilder::new(memory_cache_capacity_bytes)
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.build();
let cache: HybridCache<Arc<str>, MaybeDeserialized<V>> = HybridCacheBuilder::new()
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.memory(memory_cache_capacity_bytes)
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.storage(Engine::Large)
.with_runtime_options(foyer::RuntimeOptions::Unified(TokioRuntimeOptions {
max_blocking_threads: 0,
worker_threads: 0,
}))
.with_admission_picker(Arc::new(RateLimitPicker::new(
config.disk_admission_rate_limit,
)))
.with_device_options(
DirectFsDeviceOptions::new(config.disk_path)
.with_capacity(disk_cache_capacity_bytes),
)
.with_large_object_disk_cache_options(
LargeEngineOptions::new()
.with_buffer_pool_size(config.disk_buffer_size)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_flushers(config.disk_buffer_flushers)
.with_recover_concurrency(config.disk_recover_concurrency)
.with_indexer_shards(config.disk_indexer_shards)
.with_reclaimers(config.disk_reclaimers),
)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;

Ok(Self { cache })
}

pub async fn get(&self, key: &str) -> Option<V> {
match self.cache.get(key) {
Some(entry) => match entry.value() {
MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()),
MaybeDeserialized::RawBytes(bytes) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
// had the key in the first place, and also remove it from the cache.
match serialize::from_bytes_async::<V>(bytes).await {
Ok(deserialized) => {
self.insert(key.into(), deserialized.clone(), bytes.len());
Some(deserialized)
}
Err(e) => {
error!(
"Failed to deserialize stored bytes from memory cache for key ({:?}): {}",
key,
e
);
self.remove(key);
None
}
pub async fn get(&self, key: Arc<str>) -> Option<V> {
if let Ok(Some(entry)) = self.cache.obtain(key.clone()).await {
return self.maybe_deserialize(key, entry.value().clone()).await;
}
None
}

pub async fn get_from_memory(&self, key: Arc<str>) -> Option<V> {
if let Some(entry) = self.cache.memory().get(&key) {
return self.maybe_deserialize(key, entry.value().clone()).await;
}
None
}

async fn maybe_deserialize(&self, key: Arc<str>, entry: MaybeDeserialized<V>) -> Option<V> {
match entry {
MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()),
MaybeDeserialized::RawBytes(bytes) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
// had the key in the first place, and also remove it from the cache.
match serialize::from_bytes_async::<V>(&bytes).await {
Ok(deserialized) => {
self.insert(key, deserialized.clone(), bytes.len());
Some(deserialized)
}
Err(e) => {
error!("Failed to deserialize stored bytes from memory cache for key ({:?}): {}", key, e);
self.remove(&key);
None
}
}
},

_ => None,
}
}
}

Expand All @@ -143,7 +199,10 @@ where
}

pub async fn close(&self) -> LayerDbResult<()> {
self.cache.clear();
self.cache
.close()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;
Ok(())
}
}
Expand Down
16 changes: 14 additions & 2 deletions lib/si-layer-cache/src/layer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
}

pub async fn get(&self, key: Arc<str>) -> LayerDbResult<Option<V>> {
Ok(match self.cache.get(&key).await {
Ok(match self.cache.get(key.clone()).await {
Some(memory_value) => Some(memory_value),

None => match self.pg.get(&key).await? {
Expand All @@ -82,6 +82,18 @@ where
})
}

#[instrument(
name = "layer_cache.get_from_memory",
level = "debug",
skip_all,
fields(
si.layer_cache.key = key.as_ref(),
),
)]
pub async fn get_from_memory(&self, key: Arc<str>) -> LayerDbResult<Option<V>> {
Ok(self.cache().get_from_memory(key).await)
}

#[instrument(
name = "layer_cache.get_bytes_from_durable_storage",
level = "debug",
Expand All @@ -107,7 +119,7 @@ where

for key in keys {
let key_str: Arc<str> = key.to_string().into();
if let Some(found) = match self.cache.get(&key_str).await {
if let Some(found) = match self.cache.get(key_str.clone()).await {
Some(value) => Some(value),
None => {
not_found.push(key_str.clone());
Expand Down
17 changes: 11 additions & 6 deletions lib/si-layer-cache/tests/integration_test/db/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn write_to_db() {
let cas_pk_str: Arc<str> = cas_pk.to_string().into();

// Are we in memory?
let in_memory = ldb.cas().cache.cache().get(&cas_pk_str).await;
let in_memory = ldb.cas().cache.cache().get(cas_pk_str.clone()).await;
assert_eq!(Some(cas_value.clone()), in_memory);

// Are we in pg?
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn cold_read_from_db() {

// Delete from cache
ldb.cas().cache.cache().remove(&cas_pk_str);
let not_in_cache = ldb.cas().cache.cache().get(&cas_pk_str).await;
let not_in_cache = ldb.cas().cache.cache().get(cas_pk_str.clone()).await;
assert_eq!(not_in_cache, None);

// Read the data from the cache
Expand All @@ -159,7 +159,7 @@ async fn cold_read_from_db() {
assert_eq!(&cas_value, &data);

// Are we in cache after the read?
let in_cache = ldb.cas().cache.cache().get(&cas_pk_str).await;
let in_cache = ldb.cas().cache.cache().get(cas_pk_str.clone()).await;
assert_eq!(Some(cas_value.clone()), in_cache);

// Are we in pg?
Expand Down Expand Up @@ -233,7 +233,7 @@ async fn writes_are_gossiped() {

let mut memory_check_count = 0;
while memory_check_count <= max_check_count {
let in_memory = ldb_axl.cas().cache.cache().get(&cas_pk_str).await;
let in_memory = ldb_axl.cas().cache.cache().get(cas_pk_str.clone()).await;
match in_memory {
Some(value) => {
assert_eq!(cas_value.clone(), value);
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn stress_test() {
let cas_value = Arc::new(CasValue::String(big_string.to_string()));
let (postcard_value, _) =
serialize::to_vec(&cas_value).expect("cannot deserialize big ass string");
let cas_pk_string = ContentHash::new(&postcard_value).to_string();
let cas_pk_string: Arc<str> = ContentHash::new(&postcard_value).to_string().into();
let ldb_slash_task = ldb_slash.clone();
let _write_big_string = big_string.clone();
let write_cas_value = cas_value.clone();
Expand All @@ -333,7 +333,12 @@ async fn stress_test() {
let max_check_count = 10_000;
let mut memory_check_count = 0;
while memory_check_count < max_check_count {
let in_memory = ldb_axl_task.cas().cache.cache().get(&cas_pk_string).await;
let in_memory = ldb_axl_task
.cas()
.cache
.cache()
.get(cas_pk_string.clone())
.await;
match in_memory {
Some(value) => {
let cas_value: Arc<CasValue> =
Expand Down
13 changes: 9 additions & 4 deletions lib/si-layer-cache/tests/integration_test/db/func_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn write_to_db() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await;
assert_eq!(value.id(), in_memory.expect("func run not in memory").id());

// Are we in pg?
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn update() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await;
assert_eq!(value.id(), in_memory.expect("func run not in memory").id());

// Are we in pg?
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn update() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run().cache.cache().get(key_str.clone()).await;
assert_eq!(
update_func_run.state(),
in_memory.expect("func run not in memory").state(),
Expand All @@ -155,7 +155,12 @@ async fn update() {
let max_check_count = 10;
let mut memory_check_count = 0;
while memory_check_count <= max_check_count {
let in_memory = ldb_remote.func_run().cache.cache().get(&key_str).await;
let in_memory = ldb_remote
.func_run()
.cache
.cache()
.get(key_str.clone())
.await;
match in_memory {
Some(value) => {
assert_eq!(update_func_run.state(), value.state());
Expand Down
13 changes: 9 additions & 4 deletions lib/si-layer-cache/tests/integration_test/db/func_run_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn write_to_db() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await;
assert_eq!(
value.id(),
in_memory.expect("func run log not in memory").id()
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn update() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await;
assert_eq!(
value.id(),
in_memory.expect("func run log not in memory").id()
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn update() {
.expect("failed to write to layerdb");

// Are we in memory?
let in_memory = ldb.func_run_log().cache.cache().get(&key_str).await;
let in_memory = ldb.func_run_log().cache.cache().get(key_str.clone()).await;
assert_eq!(
update_func_run_log.logs(),
in_memory.expect("func run log not in memory").logs(),
Expand All @@ -163,7 +163,12 @@ async fn update() {
let max_check_count = 10;
let mut memory_check_count = 0;
while memory_check_count <= max_check_count {
let in_memory = ldb_remote.func_run_log().cache.cache().get(&key_str).await;
let in_memory = ldb_remote
.func_run_log()
.cache
.cache()
.get(key_str.clone())
.await;
match in_memory {
Some(value) => {
assert_eq!(update_func_run_log.logs(), value.logs());
Expand Down
Loading
Loading