Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: disable the foyer disk cache #5154

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 17 additions & 66 deletions lib/si-layer-cache/src/hybrid_cache.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
use foyer::opentelemetry_0_26::OpenTelemetryMetricsRegistry;
use foyer::{
DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions,
RateLimitPicker, RecoverMode,
};
use std::cmp::max;
use foyer::{Cache as MemCache, CacheBuilder};
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};
use telemetry::opentelemetry::global;
use telemetry::tracing::{error, info};
use tokio::fs;

use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::db::serialize;
use crate::error::LayerDbResult;
use crate::LayerDbError;

const FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const _FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb
const DEFAULT_MEMORY_RESERVED_PERCENT: u8 = 40;
const DEFAULT_MEMORY_USABLE_MAX_PERCENT: u8 = 100;
const DEFAULT_DISK_RESERVED_PERCENT: u8 = 5;
Expand Down Expand Up @@ -47,7 +41,7 @@ pub struct Cache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
cache: HybridCache<Arc<str>, MaybeDeserialized<V>>,
cache: MemCache<Arc<str>, MaybeDeserialized<V>>,
}

impl<V> Cache<V>
Expand All @@ -70,28 +64,8 @@ where
computed_memory_cache_capacity_bytes.try_into()?
};

fs::create_dir_all(config.disk_path.as_path()).await?;
// Compute total disk which is in use for `disk_path`
let total_disk_bytes = fs4::total_space(config.disk_path.as_path())?;

let disk_cache_capacity_bytes = {
// Subtract reserved disk percentage to determine total usable cache disk
let total_usable_disk_bytes = (total_disk_bytes as f64
* (1.0 - (config.disk_reserved_percent as f64 / 100.0)))
.floor() as u64;
// Compute final usable disk as a percentage of the maximum usable disk
let computed_disk_cache_capacity_bytes = (total_usable_disk_bytes as f64
* (config.disk_usable_max_percent as f64 / 100.0))
.floor() as u64;

// Ensure that the computed value is at least as big as the Foyer minimum
max(computed_disk_cache_capacity_bytes, FOYER_DISK_CACHE_MINUMUM).try_into()?
};

info!(
cache.name = &config.name,
cache.disk.total_bytes = total_disk_bytes,
cache.disk.size_bytes = disk_cache_capacity_bytes,
cache.disk.reserved_percent = config.disk_reserved_percent,
cache.disk.usable_max_percent = config.disk_usable_max_percent,
cache.disk.rate_limit = config.disk_admission_rate_limit,
Expand All @@ -104,44 +78,24 @@ where

let cache_name: &'static str = config.name.leak();

let cache: HybridCache<Arc<str>, MaybeDeserialized<V>> = HybridCacheBuilder::new()
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.memory(memory_cache_capacity_bytes)
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.storage(Engine::Large)
.with_admission_picker(Arc::new(RateLimitPicker::new(
config.disk_admission_rate_limit,
)))
.with_device_options(
DirectFsDeviceOptions::new(config.disk_path)
.with_capacity(disk_cache_capacity_bytes),
)
.with_large_object_disk_cache_options(
LargeEngineOptions::new()
.with_buffer_pool_size(config.disk_buffer_size)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_flushers(config.disk_buffer_flushers)
.with_recover_concurrency(config.disk_recover_concurrency)
.with_indexer_shards(config.disk_indexer_shards)
.with_reclaimers(config.disk_reclaimers),
)
.with_recover_mode(RecoverMode::Quiet)
.build()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;
let cache: MemCache<Arc<str>, MaybeDeserialized<V>> =
CacheBuilder::new(memory_cache_capacity_bytes)
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
MaybeDeserialized::RawBytes(bytes) => bytes.len(),
MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint,
},
)
.build();

Ok(Self { cache })
}

pub async fn get(&self, key: &str) -> Option<V> {
match self.cache.obtain(key.into()).await {
Ok(Some(entry)) => match entry.value() {
match self.cache.get(key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If memory serves this doesn't really change the sematics, but rather a change in API because we're dealing with a MemCache rather than a HybridCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, obtain de-dups calls to the disk, so no disk, no obtain.

Some(entry) => match entry.value() {
MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()),
MaybeDeserialized::RawBytes(bytes) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
Expand Down Expand Up @@ -189,10 +143,7 @@ where
}

pub async fn close(&self) -> LayerDbResult<()> {
self.cache
.close()
.await
.map_err(|e| LayerDbError::Foyer(e.into()))?;
self.cache.clear();
Ok(())
}
}
Expand Down
Loading