From 57ef370e708c8563b4dc721bc517c168c520b87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20S=C3=B6derstr=C3=B6m?= Date: Wed, 4 Jan 2023 21:08:54 +0100 Subject: [PATCH 1/3] initial disk cache --- Cargo.toml | 15 +- cached_proc_macro/src/lib.rs | 1 + src/lib.rs | 3 + src/stores/disk.rs | 712 +++++++++++++++++++++++++++++++++++ src/stores/mod.rs | 6 + 5 files changed, 736 insertions(+), 1 deletion(-) create mode 100644 src/stores/disk.rs diff --git a/Cargo.toml b/Cargo.toml index 3496cba..bf9b915 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ repository = "https://github.com/jaemk/cached" documentation = "https://docs.rs/cached" readme = "README.md" categories = ["caching", "data-structures"] -keywords = ["caching", "cache", "memoize", "lru", "redis"] +keywords = ["caching", "cache", "memoize", "lru", "redis", "disk"] license = "MIT" edition = "2018" @@ -26,6 +26,7 @@ redis_connection_manager = ["redis_store", "redis/connection-manager"] redis_async_std = ["redis_store", "async", "redis/aio", "redis/async-std-comp", "redis/tls", "redis/async-std-tls-comp"] redis_tokio = ["redis_store", "async", "redis/aio", "redis/tokio-comp", "redis/tls", "redis/tokio-native-tls-comp"] redis_ahash = ["redis_store", "redis/ahash"] +disk_store = ["sled", "serde", "rmp-serde", "directories"] wasm = ["instant/wasm-bindgen"] [dependencies.cached_proc_macro] @@ -68,6 +69,18 @@ version = "0.24" features = ["r2d2"] optional = true +[dependencies.sled] +version = "0.34" +optional = true + +[dependencies.rmp-serde] +version ="1.1" +optional = true + +[dependencies.directories] +version ="4.0" +optional = true + [dependencies.r2d2] version = "0.8" optional = true diff --git a/cached_proc_macro/src/lib.rs b/cached_proc_macro/src/lib.rs index 1b40fde..02164f7 100644 --- a/cached_proc_macro/src/lib.rs +++ b/cached_proc_macro/src/lib.rs @@ -68,6 +68,7 @@ pub fn once(args: TokenStream, input: TokenStream) -> TokenStream { /// the error type returned by your function. /// - `name`: (optional, string) specify the name for the generated cache, defaults to the function name uppercase. /// - `redis`: (optional, bool) default to a `RedisCache` or `AsyncRedisCache` +/// - `disk`: (optional, bool) default to a `DiskCache` or `AsyncDiskCache` /// - `time`: (optional, u64) specify a cache TTL in seconds, implies the cache type is a `TimedCached` or `TimedSizedCache`. /// - `time_refresh`: (optional, bool) specify whether to refresh the TTL on cache hits. /// - `type`: (optional, string type) explicitly specify the cache store type to use. diff --git a/src/lib.rs b/src/lib.rs index e12752b..b56d81a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ of un-cached arguments, specify `#[cached(sync_writes = true)]` / `#[once(sync_w - `redis_connection_manager`: Enable the optional `connection-manager` feature of `redis`. Any async redis caches created will use a connection manager instead of a `MultiplexedConnection` - `redis_ahash`: Enable the optional `ahash` feature of `redis` +- `disk_store`: Include disk cache store - `wasm`: Enable WASM support. Note that this feature is incompatible with `tokio`'s multi-thread runtime (`async_tokio_rt_multi_thread`) and all Redis features (`redis_store`, `redis_async_std`, `redis_tokio`, `redis_ahash`) @@ -189,6 +190,8 @@ pub use stores::{ #[cfg(feature = "redis_store")] #[cfg_attr(docsrs, doc(cfg(feature = "redis_store")))] pub use stores::{RedisCache, RedisCacheError}; +#[cfg(feature = "disk_store")] +pub use stores::{DiskCache, DiskCacheError}; #[cfg(feature = "async")] #[cfg_attr(docsrs, doc(cfg(feature = "async")))] use {async_trait::async_trait, futures::Future}; diff --git a/src/stores/disk.rs b/src/stores/disk.rs new file mode 100644 index 0000000..1144c0c --- /dev/null +++ b/src/stores/disk.rs @@ -0,0 +1,712 @@ +use crate::IOCached; +use directories::BaseDirs; +use instant::Duration; +use serde::de::DeserializeOwned; +use serde::Serialize; +use sled::Db; +use std::{fmt::Display, path::PathBuf, time::SystemTime}; +use std::marker::PhantomData; + +pub struct DiskCacheBuilder { + seconds: u64, + refresh: bool, + namespace: String, + prefix: String, + disk_path: Option, + _phantom_k: PhantomData, + _phantom_v: PhantomData, +} + +const ENV_KEY: &str = "CACHED_DISK_PATH"; +const DEFAULT_NAMESPACE: &str = "cached-disk-store:"; +const LAST_CLEANUP_KEY: &str = "last-cleanup"; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum DiskCacheBuildError { + #[error("Storage connection error")] + ConnectionError(#[from] sled::Error), + #[error("Connection string not specified or invalid in env var {env_key:?}: {error:?}")] + MissingDiskPath { + env_key: String, + error: std::env::VarError, + }, +} + +impl DiskCacheBuilder +where + K: Display, + V: Serialize + DeserializeOwned, +{ + /// Initialize a `DiskCacheBuilder` + pub fn new>(prefix: S, seconds: u64) -> DiskCacheBuilder { + Self { + seconds, + refresh: false, + namespace: DEFAULT_NAMESPACE.to_string(), + prefix: prefix.as_ref().to_string(), + disk_path: None, + _phantom_k: Default::default(), + _phantom_v: Default::default(), + } + } + + /// Specify the cache TTL/lifespan in seconds + pub fn set_lifespan(mut self, seconds: u64) -> Self { + self.seconds = seconds; + self + } + + /// Specify whether cache hits refresh the TTL + pub fn set_refresh(mut self, refresh: bool) -> Self { + self.refresh = refresh; + self + } + + /// Set the namespace for cache keys. Defaults to `cached-disk-store:`. + /// Used to generate keys formatted as: `{namespace}{prefix}{key}` + /// Note that no delimiters are implicitly added so you may pass + /// an empty string if you want there to be no namespace on keys. + pub fn set_namespace>(mut self, namespace: S) -> Self { + self.namespace = namespace.as_ref().to_string(); + self + } + + /// Set the prefix for cache keys. + /// Used to generate keys formatted as: `{namespace}{prefix}{key}` + /// Note that no delimiters are implicitly added so you may pass + /// an empty string if you want there to be no prefix on keys. + pub fn set_prefix>(mut self, prefix: S) -> Self { + self.prefix = prefix.as_ref().to_string(); + self + } + + /// Set the disk path for where the data will be stored + pub fn set_disk_path(mut self, cs: PathBuf) -> Self { + self.disk_path = Some(cs); + self + } + + /// Return the disk path, or load from the env var: CACHED_DISK_PATH, or fall back to OS cache directory + pub fn disk_path(&self) -> Result { + match self.disk_path { + Some(ref s) => Ok(s.to_path_buf()), + None => match std::env::var(ENV_KEY) { + Ok(path) => Ok(PathBuf::from(path)), + Err(error) => { + let disk_path = BaseDirs::new().map(|base_dirs| + base_dirs + .cache_dir() + .join(env!("CARGO_PKG_NAME")) + .join("cached") + ); + + match disk_path { + Some(path) => Ok(path), + None => Err(DiskCacheBuildError::MissingDiskPath { + env_key: ENV_KEY.to_string(), + error, + }) + } + } + } + } + } + + pub fn build(self) -> Result, DiskCacheBuildError> { + let disk_path = self.disk_path()?; + let connection = sled::open(disk_path.clone())?; + + Ok(DiskCache { + connection, + seconds: self.seconds, + refresh: self.refresh, + disk_path, + namespace: self.namespace, + prefix: self.prefix, + _phantom_k: self._phantom_k, + _phantom_v: self._phantom_v, + }) + } +} + +/// Cache store backed by disk +pub struct DiskCache { + pub(super) seconds: u64, + pub(super) refresh: bool, + pub(super) namespace: String, + pub(super) prefix: String, + connection: sled::Db, + disk_path: PathBuf, + _phantom_k: PhantomData, + _phantom_v: PhantomData, +} + +impl DiskCache +where + K: Display, + V: Serialize + DeserializeOwned, +{ + #[allow(clippy::new_ret_no_self)] + /// Initialize a `DiskCacheBuilder` + pub fn new>(prefix: S, seconds: u64) -> DiskCacheBuilder { + DiskCacheBuilder::new(prefix, seconds) + } + + fn generate_key(&self, key: &K) -> String { + format!("{}{}{}", self.namespace, self.prefix, key) + } + + /// Return the disk path used + pub fn disk_path(&self) -> PathBuf { + self.disk_path.clone() + } + + fn last_cleanup(connection: &Db) -> SystemTime { + match connection.get(LAST_CLEANUP_KEY) { + Ok(Some(l)) => match rmp_serde::from_slice::(&l) { + Ok(l) => l, + _ => SystemTime::UNIX_EPOCH + }, + _ => SystemTime::UNIX_EPOCH + } + } + + fn cleanup_expired_entries(connection: &Db) { + let now = SystemTime::now(); + let last_cleanup = DiskCache::::last_cleanup(connection); + + if last_cleanup + Duration::from_secs(10) < now { + return; + } + + for (key, value) in connection.iter().flatten() { + if let Ok(cached) = rmp_serde::from_slice::>(&value) { + if let Some(expires) = cached.expires { + if now > expires { + let _ = connection.remove(key); + } + } + } + } + + let _ = connection.insert(LAST_CLEANUP_KEY, rmp_serde::to_vec(&SystemTime::now()).unwrap()); + } +} + +#[derive(Error, Debug)] +pub enum DiskCacheError { + #[error("Storage error")] + StorageError(#[from] sled::Error), + #[error("Error deserializing cached value")] + CacheDeserializtionError(#[from] rmp_serde::decode::Error), + #[error("Error serializing cached value")] + CacheSerializtionError(#[from] rmp_serde::encode::Error), +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct CachedDiskValue { + pub(crate) value: V, + pub(crate) expires: Option, + pub(crate) version: Option, +} + +impl CachedDiskValue { + fn new(value: V, expires: Option) -> Self { + Self { + value, + expires, + version: Some(1), + } + } +} + +impl IOCached for DiskCache +where + K: Display, + V: Serialize + DeserializeOwned, +{ + type Error = DiskCacheError; + + fn cache_get(&self, key: &K) -> Result, DiskCacheError> { + Self::cleanup_expired_entries(&self.connection); + + let key = self.generate_key(key); + + if let Some(data) = self.connection.get(key)? { + let cached = rmp_serde::from_slice::>(&data)?; + + if let Some(expires) = cached.expires { + if SystemTime::now() < expires { + Ok(Some(cached.value)) + } else { + Ok(None) + } + } else { + Ok(Some(cached.value)) + } + } else { + Ok(None) + } + } + + fn cache_set(&self, key: K, value: V) -> Result, DiskCacheError> { + // TODO: wrap value when serializing to also add expiration data, then add separate thread to clean up the cache? + let key = self.generate_key(&key); + let value = rmp_serde::to_vec(&CachedDiskValue::new( + value, + Some(SystemTime::now() + Duration::from_secs(self.seconds)), + ))?; + + if let Some(data) = self.connection.insert(key, value)? { + let cached = rmp_serde::from_slice::>(&data)?; + + if let Some(expires) = cached.expires { + if SystemTime::now() < expires { + Ok(Some(cached.value)) + } else { + Ok(None) + } + } else { + Ok(Some(cached.value)) + } + } else { + Ok(None) + } + } + + fn cache_remove(&self, key: &K) -> Result, DiskCacheError> { + let key = self.generate_key(key); + + if let Some(data) = self.connection.remove(key)? { + let cached = rmp_serde::from_slice::>(&data)?; + + if let Some(expires) = cached.expires { + if SystemTime::now() < expires { + Ok(Some(cached.value)) + } else { + Ok(None) + } + } else { + Ok(Some(cached.value)) + } + } else { + Ok(None) + } + } + + fn cache_lifespan(&self) -> Option { + Some(self.seconds) + } + + fn cache_set_lifespan(&mut self, seconds: u64) -> Option { + let old = self.seconds; + self.seconds = seconds; + Some(old) + } + + fn cache_set_refresh(&mut self, refresh: bool) -> bool { + let old = self.refresh; + self.refresh = refresh; + old + } +} + +/* +#[cfg(all( + feature = "async", + any(feature = "disk_async_std", feature = "disk_tokio") +))] +mod async_disk { + use super::*; + use {crate::IOCachedAsync, async_trait::async_trait}; + + pub struct AsyncDiskCacheBuilder { + seconds: u64, + refresh: bool, + namespace: String, + prefix: String, + connection_string: Option, + _phantom_k: PhantomData, + _phantom_v: PhantomData, + } + + impl AsyncDiskCacheBuilder + where + K: Display, + V: Serialize + DeserializeOwned, + { + /// Initialize a `DiskCacheBuilder` + pub fn new>(prefix: S, seconds: u64) -> AsyncDiskCacheBuilder { + Self { + seconds, + refresh: false, + namespace: DEFAULT_NAMESPACE.to_string(), + prefix: prefix.as_ref().to_string(), + connection_string: None, + _phantom_k: Default::default(), + _phantom_v: Default::default(), + } + } + + /// Specify the cache TTL/lifespan in seconds + pub fn set_lifespan(mut self, seconds: u64) -> Self { + self.seconds = seconds; + self + } + + /// Specify whether cache hits refresh the TTL + pub fn set_refresh(mut self, refresh: bool) -> Self { + self.refresh = refresh; + self + } + + /// Set the namespace for cache keys. Defaults to `cached-disk-store:`. + /// Used to generate keys formatted as: `{namespace}{prefix}{key}` + /// Note that no delimiters are implicitly added so you may pass + /// an empty string if you want there to be no namespace on keys. + pub fn set_namespace>(mut self, namespace: S) -> Self { + self.namespace = namespace.as_ref().to_string(); + self + } + + /// Set the prefix for cache keys + /// Used to generate keys formatted as: `{namespace}{prefix}{key}` + /// Note that no delimiters are implicitly added so you may pass + /// an empty string if you want there to be no prefix on keys. + pub fn set_prefix>(mut self, prefix: S) -> Self { + self.prefix = prefix.as_ref().to_string(); + self + } + + /// Set the connection string for disk + pub fn set_connection_string(mut self, cs: &str) -> Self { + self.connection_string = Some(cs.to_string()); + self + } + + /// Return the current connection string or load from the env var: CACHED_DISK_PATH + pub fn connection_string(&self) -> Result { + match self.connection_string { + Some(ref s) => Ok(s.to_string()), + None => std::env::var(ENV_KEY).map_err(|e| { + DiskCacheBuildError::MissingDiskPath { + env_key: ENV_KEY.to_string(), + error: e, + } + }), + } + } + + async fn create_multiplexed_connection( + &self, + ) -> Result { + let s = self.connection_string()?; + let client = disk::Client::open(s)?; + let conn = client.get_multiplexed_async_connection().await?; + Ok(conn) + } + + pub async fn build(self) -> Result, DiskCacheBuildError> { + Ok(AsyncDiskCache { + seconds: self.seconds, + refresh: self.refresh, + connection_string: self.connection_string()?, + multiplexed_connection: self.create_multiplexed_connection().await?, + namespace: self.namespace, + prefix: self.prefix, + _phantom_k: self._phantom_k, + _phantom_v: self._phantom_v, + }) + } + } + + /// Cache store backed by disk + /// + /// Values have a ttl applied and enforced by disk. + /// Uses a `disk::aio::MultiplexedConnection` under the hood. + pub struct AsyncDiskCache { + pub(super) seconds: u64, + pub(super) refresh: bool, + pub(super) namespace: String, + pub(super) prefix: String, + connection_string: String, + multiplexed_connection: disk::aio::MultiplexedConnection, + _phantom_k: PhantomData, + _phantom_v: PhantomData, + } + + impl AsyncDiskCache + where + K: Display + Send + Sync, + V: Serialize + DeserializeOwned + Send + Sync, + { + #[allow(clippy::new_ret_no_self)] + /// Initialize an `AsyncDiskCacheBuilder` + pub fn new>(prefix: S, seconds: u64) -> AsyncDiskCacheBuilder { + AsyncDiskCacheBuilder::new(prefix, seconds) + } + + fn generate_key(&self, key: &K) -> String { + format!("{}{}{}", self.namespace, self.prefix, key) + } + + /// Return the disk connection string used + pub fn connection_string(&self) -> String { + self.connection_string.clone() + } + } + + #[async_trait] + impl<'de, K, V> IOCachedAsync for AsyncDiskCache + where + K: Display + Send + Sync, + V: Serialize + DeserializeOwned + Send + Sync, + { + type Error = DiskCacheError; + + /// Get a cached value + async fn cache_get(&self, key: &K) -> Result, Self::Error> { + let mut conn = self.multiplexed_connection.clone(); + let mut pipe = disk::pipe(); + let key = self.generate_key(key); + + pipe.get(key.clone()); + if self.refresh { + pipe.expire(key, self.seconds as usize).ignore(); + } + let res: (Option,) = pipe.query_async(&mut conn).await?; + match res.0 { + None => Ok(None), + Some(s) => { + let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { + DiskCacheError::CacheDeserializationError { + cached_value: s, + error: e, + } + })?; + Ok(Some(v.value)) + } + } + } + + /// Set a cached value + async fn cache_set(&self, key: K, val: V) -> Result, Self::Error> { + let mut conn = self.multiplexed_connection.clone(); + let mut pipe = disk::pipe(); + let key = self.generate_key(&key); + + let val = CachedDiskValue::new(val); + pipe.get(key.clone()); + pipe.set_ex::( + key, + serde_json::to_string(&val) + .map_err(|e| DiskCacheError::CacheSerializationError { error: e })?, + self.seconds as usize, + ) + .ignore(); + + let res: (Option,) = pipe.query_async(&mut conn).await?; + match res.0 { + None => Ok(None), + Some(s) => { + let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { + DiskCacheError::CacheDeserializationError { + cached_value: s, + error: e, + } + })?; + Ok(Some(v.value)) + } + } + } + + /// Remove a cached value + async fn cache_remove(&self, key: &K) -> Result, Self::Error> { + let mut conn = self.multiplexed_connection.clone(); + let mut pipe = disk::pipe(); + let key = self.generate_key(key); + + pipe.get(key.clone()); + pipe.del::(key).ignore(); + let res: (Option,) = pipe.query_async(&mut conn).await?; + match res.0 { + None => Ok(None), + Some(s) => { + let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { + DiskCacheError::CacheDeserializationError { + cached_value: s, + error: e, + } + })?; + Ok(Some(v.value)) + } + } + } + + /// Set the flag to control whether cache hits refresh the ttl of cached values, returns the old flag value + fn cache_set_refresh(&mut self, refresh: bool) -> bool { + let old = self.refresh; + self.refresh = refresh; + old + } + + /// Return the lifespan of cached values (time to eviction) + fn cache_lifespan(&self) -> Option { + Some(self.seconds) + } + + /// Set the lifespan of cached values, returns the old value + fn cache_set_lifespan(&mut self, seconds: u64) -> Option { + let old = self.seconds; + self.seconds = seconds; + Some(old) + } + } + + #[cfg(test)] + mod tests { + use super::*; + use std::thread::sleep; + use std::time::Duration; + + fn now_millis() -> u128 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + } + + #[tokio::test] + async fn test_async_disk_cache() { + let mut c: AsyncDiskCache = + AsyncDiskCache::new(format!("{}:async-disk-cache-test", now_millis()), 2) + .build() + .await + .unwrap(); + + assert!(c.cache_get(&1).await.unwrap().is_none()); + + assert!(c.cache_set(1, 100).await.unwrap().is_none()); + assert!(c.cache_get(&1).await.unwrap().is_some()); + + sleep(Duration::new(2, 500000)); + assert!(c.cache_get(&1).await.unwrap().is_none()); + + let old = c.cache_set_lifespan(1).unwrap(); + assert_eq!(2, old); + assert!(c.cache_set(1, 100).await.unwrap().is_none()); + assert!(c.cache_get(&1).await.unwrap().is_some()); + + sleep(Duration::new(1, 600000)); + assert!(c.cache_get(&1).await.unwrap().is_none()); + + c.cache_set_lifespan(10).unwrap(); + assert!(c.cache_set(1, 100).await.unwrap().is_none()); + assert!(c.cache_set(2, 100).await.unwrap().is_none()); + assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100); + assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100); + } + } +} + +#[cfg(all( + feature = "async", + any(feature = "disk_async_std", feature = "disk_tokio") +))] +pub use async_disk::{AsyncDiskCache, AsyncDiskCacheBuilder}; +*/ + +#[cfg(test)] +/// Cache store tests +mod tests { + use std::thread::sleep; + use std::time::Duration; + + use super::*; + + fn now_millis() -> u128 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + } + + #[test] + fn disk_cache_set_get_remove() { + let cache: DiskCache = + DiskCache::new(format!("{}:disk-cache-test-sgr", now_millis()), 3600) + .set_disk_path(std::env::temp_dir().join("cachedtest-sgr")) + .build() + .unwrap(); + + let cached = cache.cache_get(&6).unwrap(); + assert!(cached.is_none()); + + let cached = cache.cache_set(6, 4444).unwrap(); + assert_eq!(cached, None); + + let cached = cache.cache_set(6, 5555).unwrap(); + assert_eq!(cached, Some(4444)); + + let cached = cache.cache_get(&6).unwrap(); + assert_eq!(cached, Some(5555)); + + let cached = cache.cache_remove(&6).unwrap(); + assert_eq!(cached, Some(5555)); + + let cached = cache.cache_get(&6).unwrap(); + assert!(cached.is_none()); + + drop(cache); + } + + #[test] + fn disk_cache() { + let mut c: DiskCache = + DiskCache::new(format!("{}:disk-cache-test", now_millis()), 2) + .set_namespace("in-tests:") + .build() + .unwrap(); + + assert!(c.cache_get(&1).unwrap().is_none()); + + assert!(c.cache_set(1, 100).unwrap().is_none()); + assert!(c.cache_get(&1).unwrap().is_some()); + + sleep(Duration::new(2, 500000)); + assert!(c.cache_get(&1).unwrap().is_none()); + + let old = c.cache_set_lifespan(1).unwrap(); + assert_eq!(2, old); + assert!(c.cache_set(1, 100).unwrap().is_none()); + assert!(c.cache_get(&1).unwrap().is_some()); + + sleep(Duration::new(1, 600000)); + assert!(c.cache_get(&1).unwrap().is_none()); + + c.cache_set_lifespan(10).unwrap(); + assert!(c.cache_set(1, 100).unwrap().is_none()); + assert!(c.cache_set(2, 100).unwrap().is_none()); + assert_eq!(c.cache_get(&1).unwrap().unwrap(), 100); + assert_eq!(c.cache_get(&1).unwrap().unwrap(), 100); + } + + #[test] + fn remove() { + let cache: DiskCache = + DiskCache::new(format!("{}:disk-cache-test-remove", now_millis()), 3600) + .set_disk_path(std::env::temp_dir().join("cachedtest-remove")) + .build() + .unwrap(); + + assert!(cache.cache_set(1, 100).unwrap().is_none()); + assert!(cache.cache_set(2, 200).unwrap().is_none()); + assert!(cache.cache_set(3, 300).unwrap().is_none()); + + assert_eq!(100, cache.cache_remove(&1).unwrap().unwrap()); + + drop(cache); + } +} diff --git a/src/stores/mod.rs b/src/stores/mod.rs index 7753a3f..1fbbd26 100644 --- a/src/stores/mod.rs +++ b/src/stores/mod.rs @@ -11,6 +11,8 @@ use {super::CachedAsync, async_trait::async_trait, futures::Future}; mod expiring_value_cache; #[cfg(feature = "redis_store")] mod redis; +#[cfg(feature = "disk_store")] +mod disk; mod sized; mod timed; mod timed_sized; @@ -21,6 +23,10 @@ mod unbound; pub use crate::stores::redis::{ RedisCache, RedisCacheBuildError, RedisCacheBuilder, RedisCacheError, }; +#[cfg(feature = "disk_store")] +pub use crate::stores::disk::{ + DiskCache, DiskCacheBuildError, DiskCacheBuilder, DiskCacheError, +}; pub use expiring_value_cache::{CanExpire, ExpiringValueCache}; pub use sized::SizedCache; pub use timed::TimedCache; From 1820f644f31e93cefdd902156fc2951145bcddcb Mon Sep 17 00:00:00 2001 From: James Kominick Date: Tue, 20 Feb 2024 22:54:41 -0500 Subject: [PATCH 2/3] finish DiskCache store --- README.md | 1 + src/lib.rs | 4 +- src/stores/disk.rs | 535 ++++++++------------------------------------- src/stores/mod.rs | 10 +- 4 files changed, 98 insertions(+), 452 deletions(-) diff --git a/README.md b/README.md index c02165e..7470c9d 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ of un-cached arguments, specify `#[cached(sync_writes = true)]` / `#[once(sync_w - `redis_connection_manager`: Enable the optional `connection-manager` feature of `redis`. Any async redis caches created will use a connection manager instead of a `MultiplexedConnection` - `redis_ahash`: Enable the optional `ahash` feature of `redis` +- `disk_store`: Include disk cache store - `wasm`: Enable WASM support. Note that this feature is incompatible with `tokio`'s multi-thread runtime (`async_tokio_rt_multi_thread`) and all Redis features (`redis_store`, `redis_async_std`, `redis_tokio`, `redis_ahash`) diff --git a/src/lib.rs b/src/lib.rs index b56d81a..4947ccf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,11 +187,11 @@ pub use stores::AsyncRedisCache; pub use stores::{ CanExpire, ExpiringValueCache, SizedCache, TimedCache, TimedSizedCache, UnboundCache, }; +#[cfg(feature = "disk_store")] +pub use stores::{DiskCache, DiskCacheError}; #[cfg(feature = "redis_store")] #[cfg_attr(docsrs, doc(cfg(feature = "redis_store")))] pub use stores::{RedisCache, RedisCacheError}; -#[cfg(feature = "disk_store")] -pub use stores::{DiskCache, DiskCacheError}; #[cfg(feature = "async")] #[cfg_attr(docsrs, doc(cfg(feature = "async")))] use {async_trait::async_trait, futures::Future}; diff --git a/src/stores/disk.rs b/src/stores/disk.rs index 1144c0c..0e2a39f 100644 --- a/src/stores/disk.rs +++ b/src/stores/disk.rs @@ -4,23 +4,18 @@ use instant::Duration; use serde::de::DeserializeOwned; use serde::Serialize; use sled::Db; -use std::{fmt::Display, path::PathBuf, time::SystemTime}; use std::marker::PhantomData; +use std::path::Path; +use std::{fmt::Display, path::PathBuf, time::SystemTime}; pub struct DiskCacheBuilder { - seconds: u64, + seconds: Option, refresh: bool, - namespace: String, - prefix: String, - disk_path: Option, - _phantom_k: PhantomData, - _phantom_v: PhantomData, + disk_dir: Option, + cache_name: String, + _phantom: PhantomData<(K, V)>, } -const ENV_KEY: &str = "CACHED_DISK_PATH"; -const DEFAULT_NAMESPACE: &str = "cached-disk-store:"; -const LAST_CLEANUP_KEY: &str = "last-cleanup"; - use thiserror::Error; #[derive(Error, Debug)] @@ -34,27 +29,28 @@ pub enum DiskCacheBuildError { }, } +static DISK_FILE_PREFIX: &str = "cached_disk_cache"; +const DISK_FILE_VERSION: u64 = 1; + impl DiskCacheBuilder where K: Display, V: Serialize + DeserializeOwned, { /// Initialize a `DiskCacheBuilder` - pub fn new>(prefix: S, seconds: u64) -> DiskCacheBuilder { + pub fn new(cache_name: &str) -> DiskCacheBuilder { Self { - seconds, + seconds: None, refresh: false, - namespace: DEFAULT_NAMESPACE.to_string(), - prefix: prefix.as_ref().to_string(), - disk_path: None, - _phantom_k: Default::default(), - _phantom_v: Default::default(), + disk_dir: None, + cache_name: cache_name.to_string(), + _phantom: Default::default(), } } /// Specify the cache TTL/lifespan in seconds pub fn set_lifespan(mut self, seconds: u64) -> Self { - self.seconds = seconds; + self.seconds = Some(seconds); self } @@ -64,83 +60,56 @@ where self } - /// Set the namespace for cache keys. Defaults to `cached-disk-store:`. - /// Used to generate keys formatted as: `{namespace}{prefix}{key}` - /// Note that no delimiters are implicitly added so you may pass - /// an empty string if you want there to be no namespace on keys. - pub fn set_namespace>(mut self, namespace: S) -> Self { - self.namespace = namespace.as_ref().to_string(); - self - } - - /// Set the prefix for cache keys. - /// Used to generate keys formatted as: `{namespace}{prefix}{key}` - /// Note that no delimiters are implicitly added so you may pass - /// an empty string if you want there to be no prefix on keys. - pub fn set_prefix>(mut self, prefix: S) -> Self { - self.prefix = prefix.as_ref().to_string(); - self - } - /// Set the disk path for where the data will be stored - pub fn set_disk_path(mut self, cs: PathBuf) -> Self { - self.disk_path = Some(cs); + pub fn set_disk_directory>(mut self, dir: P) -> Self { + self.disk_dir = Some(dir.as_ref().into()); self } - /// Return the disk path, or load from the env var: CACHED_DISK_PATH, or fall back to OS cache directory - pub fn disk_path(&self) -> Result { - match self.disk_path { - Some(ref s) => Ok(s.to_path_buf()), - None => match std::env::var(ENV_KEY) { - Ok(path) => Ok(PathBuf::from(path)), - Err(error) => { - let disk_path = BaseDirs::new().map(|base_dirs| - base_dirs - .cache_dir() - .join(env!("CARGO_PKG_NAME")) - .join("cached") - ); - - match disk_path { - Some(path) => Ok(path), - None => Err(DiskCacheBuildError::MissingDiskPath { - env_key: ENV_KEY.to_string(), - error, - }) - } - } - } - } + fn default_disk_dir() -> PathBuf { + BaseDirs::new() + .map(|base_dirs| { + let exe_name = std::env::current_exe() + .ok() + .and_then(|path| { + path.file_name() + .and_then(|os_str| os_str.to_str().map(|s| format!("{}_", s))) + }) + .unwrap_or_default(); + let dir_prefix = format!("{}{}", exe_name, DISK_FILE_PREFIX); + base_dirs.cache_dir().join(dir_prefix) + }) + .unwrap_or_else(|| { + std::env::current_dir().expect("disk cache unable to determine current directory") + }) } pub fn build(self) -> Result, DiskCacheBuildError> { - let disk_path = self.disk_path()?; + let disk_dir = self.disk_dir.unwrap_or_else(|| Self::default_disk_dir()); + let disk_path = disk_dir.join(format!("{}_v{}", self.cache_name, DISK_FILE_VERSION)); let connection = sled::open(disk_path.clone())?; Ok(DiskCache { - connection, seconds: self.seconds, refresh: self.refresh, + version: DISK_FILE_VERSION, disk_path, - namespace: self.namespace, - prefix: self.prefix, - _phantom_k: self._phantom_k, - _phantom_v: self._phantom_v, + connection, + _phantom: self._phantom, }) } } /// Cache store backed by disk pub struct DiskCache { - pub(super) seconds: u64, + pub(super) seconds: Option, pub(super) refresh: bool, - pub(super) namespace: String, - pub(super) prefix: String, - connection: sled::Db, + #[allow(unused)] + version: u64, + #[allow(unused)] disk_path: PathBuf, - _phantom_k: PhantomData, - _phantom_v: PhantomData, + connection: sled::Db, + _phantom: PhantomData<(K, V)>, } impl DiskCache @@ -150,48 +119,26 @@ where { #[allow(clippy::new_ret_no_self)] /// Initialize a `DiskCacheBuilder` - pub fn new>(prefix: S, seconds: u64) -> DiskCacheBuilder { - DiskCacheBuilder::new(prefix, seconds) - } - - fn generate_key(&self, key: &K) -> String { - format!("{}{}{}", self.namespace, self.prefix, key) - } - - /// Return the disk path used - pub fn disk_path(&self) -> PathBuf { - self.disk_path.clone() - } - - fn last_cleanup(connection: &Db) -> SystemTime { - match connection.get(LAST_CLEANUP_KEY) { - Ok(Some(l)) => match rmp_serde::from_slice::(&l) { - Ok(l) => l, - _ => SystemTime::UNIX_EPOCH - }, - _ => SystemTime::UNIX_EPOCH - } + pub fn new(cache_name: &str) -> DiskCacheBuilder { + DiskCacheBuilder::new(cache_name) } - fn cleanup_expired_entries(connection: &Db) { + pub fn remove_expired_entries(&self, connection: &Db) { let now = SystemTime::now(); - let last_cleanup = DiskCache::::last_cleanup(connection); - - if last_cleanup + Duration::from_secs(10) < now { - return; - } for (key, value) in connection.iter().flatten() { if let Ok(cached) = rmp_serde::from_slice::>(&value) { - if let Some(expires) = cached.expires { - if now > expires { + if let Some(lifetime_seconds) = self.seconds { + if now + .duration_since(cached.created_at) + .unwrap_or(Duration::from_secs(0)) + < Duration::from_secs(lifetime_seconds) + { let _ = connection.remove(key); } } } } - - let _ = connection.insert(LAST_CLEANUP_KEY, rmp_serde::to_vec(&SystemTime::now()).unwrap()); } } @@ -208,16 +155,16 @@ pub enum DiskCacheError { #[derive(serde::Serialize, serde::Deserialize)] struct CachedDiskValue { pub(crate) value: V, - pub(crate) expires: Option, - pub(crate) version: Option, + pub(crate) created_at: SystemTime, + pub(crate) version: u64, } impl CachedDiskValue { - fn new(value: V, expires: Option) -> Self { + fn new(value: V) -> Self { Self { value, - expires, - version: Some(1), + created_at: SystemTime::now(), + version: 1, } } } @@ -230,17 +177,19 @@ where type Error = DiskCacheError; fn cache_get(&self, key: &K) -> Result, DiskCacheError> { - Self::cleanup_expired_entries(&self.connection); - - let key = self.generate_key(key); - - if let Some(data) = self.connection.get(key)? { + let key_s = key.to_string(); + if let Some(data) = self.connection.get(key_s)? { let cached = rmp_serde::from_slice::>(&data)?; - if let Some(expires) = cached.expires { - if SystemTime::now() < expires { + if let Some(lifetime_seconds) = self.seconds { + if SystemTime::now() + .duration_since(cached.created_at) + .unwrap_or(Duration::from_secs(0)) + < Duration::from_secs(lifetime_seconds) + { Ok(Some(cached.value)) } else { + self.cache_remove(key)?; Ok(None) } } else { @@ -252,18 +201,18 @@ where } fn cache_set(&self, key: K, value: V) -> Result, DiskCacheError> { - // TODO: wrap value when serializing to also add expiration data, then add separate thread to clean up the cache? - let key = self.generate_key(&key); - let value = rmp_serde::to_vec(&CachedDiskValue::new( - value, - Some(SystemTime::now() + Duration::from_secs(self.seconds)), - ))?; + let key = key.to_string(); + let value = rmp_serde::to_vec(&CachedDiskValue::new(value))?; if let Some(data) = self.connection.insert(key, value)? { let cached = rmp_serde::from_slice::>(&data)?; - if let Some(expires) = cached.expires { - if SystemTime::now() < expires { + if let Some(lifetime_seconds) = self.seconds { + if SystemTime::now() + .duration_since(cached.created_at) + .unwrap_or(Duration::from_secs(0)) + < Duration::from_secs(lifetime_seconds) + { Ok(Some(cached.value)) } else { Ok(None) @@ -277,13 +226,16 @@ where } fn cache_remove(&self, key: &K) -> Result, DiskCacheError> { - let key = self.generate_key(key); - + let key = key.to_string(); if let Some(data) = self.connection.remove(key)? { let cached = rmp_serde::from_slice::>(&data)?; - if let Some(expires) = cached.expires { - if SystemTime::now() < expires { + if let Some(lifetime_seconds) = self.seconds { + if SystemTime::now() + .duration_since(cached.created_at) + .unwrap_or(Duration::from_secs(0)) + < Duration::from_secs(lifetime_seconds) + { Ok(Some(cached.value)) } else { Ok(None) @@ -297,13 +249,13 @@ where } fn cache_lifespan(&self) -> Option { - Some(self.seconds) + self.seconds } fn cache_set_lifespan(&mut self, seconds: u64) -> Option { let old = self.seconds; - self.seconds = seconds; - Some(old) + self.seconds = Some(seconds); + old } fn cache_set_refresh(&mut self, refresh: bool) -> bool { @@ -313,311 +265,6 @@ where } } -/* -#[cfg(all( - feature = "async", - any(feature = "disk_async_std", feature = "disk_tokio") -))] -mod async_disk { - use super::*; - use {crate::IOCachedAsync, async_trait::async_trait}; - - pub struct AsyncDiskCacheBuilder { - seconds: u64, - refresh: bool, - namespace: String, - prefix: String, - connection_string: Option, - _phantom_k: PhantomData, - _phantom_v: PhantomData, - } - - impl AsyncDiskCacheBuilder - where - K: Display, - V: Serialize + DeserializeOwned, - { - /// Initialize a `DiskCacheBuilder` - pub fn new>(prefix: S, seconds: u64) -> AsyncDiskCacheBuilder { - Self { - seconds, - refresh: false, - namespace: DEFAULT_NAMESPACE.to_string(), - prefix: prefix.as_ref().to_string(), - connection_string: None, - _phantom_k: Default::default(), - _phantom_v: Default::default(), - } - } - - /// Specify the cache TTL/lifespan in seconds - pub fn set_lifespan(mut self, seconds: u64) -> Self { - self.seconds = seconds; - self - } - - /// Specify whether cache hits refresh the TTL - pub fn set_refresh(mut self, refresh: bool) -> Self { - self.refresh = refresh; - self - } - - /// Set the namespace for cache keys. Defaults to `cached-disk-store:`. - /// Used to generate keys formatted as: `{namespace}{prefix}{key}` - /// Note that no delimiters are implicitly added so you may pass - /// an empty string if you want there to be no namespace on keys. - pub fn set_namespace>(mut self, namespace: S) -> Self { - self.namespace = namespace.as_ref().to_string(); - self - } - - /// Set the prefix for cache keys - /// Used to generate keys formatted as: `{namespace}{prefix}{key}` - /// Note that no delimiters are implicitly added so you may pass - /// an empty string if you want there to be no prefix on keys. - pub fn set_prefix>(mut self, prefix: S) -> Self { - self.prefix = prefix.as_ref().to_string(); - self - } - - /// Set the connection string for disk - pub fn set_connection_string(mut self, cs: &str) -> Self { - self.connection_string = Some(cs.to_string()); - self - } - - /// Return the current connection string or load from the env var: CACHED_DISK_PATH - pub fn connection_string(&self) -> Result { - match self.connection_string { - Some(ref s) => Ok(s.to_string()), - None => std::env::var(ENV_KEY).map_err(|e| { - DiskCacheBuildError::MissingDiskPath { - env_key: ENV_KEY.to_string(), - error: e, - } - }), - } - } - - async fn create_multiplexed_connection( - &self, - ) -> Result { - let s = self.connection_string()?; - let client = disk::Client::open(s)?; - let conn = client.get_multiplexed_async_connection().await?; - Ok(conn) - } - - pub async fn build(self) -> Result, DiskCacheBuildError> { - Ok(AsyncDiskCache { - seconds: self.seconds, - refresh: self.refresh, - connection_string: self.connection_string()?, - multiplexed_connection: self.create_multiplexed_connection().await?, - namespace: self.namespace, - prefix: self.prefix, - _phantom_k: self._phantom_k, - _phantom_v: self._phantom_v, - }) - } - } - - /// Cache store backed by disk - /// - /// Values have a ttl applied and enforced by disk. - /// Uses a `disk::aio::MultiplexedConnection` under the hood. - pub struct AsyncDiskCache { - pub(super) seconds: u64, - pub(super) refresh: bool, - pub(super) namespace: String, - pub(super) prefix: String, - connection_string: String, - multiplexed_connection: disk::aio::MultiplexedConnection, - _phantom_k: PhantomData, - _phantom_v: PhantomData, - } - - impl AsyncDiskCache - where - K: Display + Send + Sync, - V: Serialize + DeserializeOwned + Send + Sync, - { - #[allow(clippy::new_ret_no_self)] - /// Initialize an `AsyncDiskCacheBuilder` - pub fn new>(prefix: S, seconds: u64) -> AsyncDiskCacheBuilder { - AsyncDiskCacheBuilder::new(prefix, seconds) - } - - fn generate_key(&self, key: &K) -> String { - format!("{}{}{}", self.namespace, self.prefix, key) - } - - /// Return the disk connection string used - pub fn connection_string(&self) -> String { - self.connection_string.clone() - } - } - - #[async_trait] - impl<'de, K, V> IOCachedAsync for AsyncDiskCache - where - K: Display + Send + Sync, - V: Serialize + DeserializeOwned + Send + Sync, - { - type Error = DiskCacheError; - - /// Get a cached value - async fn cache_get(&self, key: &K) -> Result, Self::Error> { - let mut conn = self.multiplexed_connection.clone(); - let mut pipe = disk::pipe(); - let key = self.generate_key(key); - - pipe.get(key.clone()); - if self.refresh { - pipe.expire(key, self.seconds as usize).ignore(); - } - let res: (Option,) = pipe.query_async(&mut conn).await?; - match res.0 { - None => Ok(None), - Some(s) => { - let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { - DiskCacheError::CacheDeserializationError { - cached_value: s, - error: e, - } - })?; - Ok(Some(v.value)) - } - } - } - - /// Set a cached value - async fn cache_set(&self, key: K, val: V) -> Result, Self::Error> { - let mut conn = self.multiplexed_connection.clone(); - let mut pipe = disk::pipe(); - let key = self.generate_key(&key); - - let val = CachedDiskValue::new(val); - pipe.get(key.clone()); - pipe.set_ex::( - key, - serde_json::to_string(&val) - .map_err(|e| DiskCacheError::CacheSerializationError { error: e })?, - self.seconds as usize, - ) - .ignore(); - - let res: (Option,) = pipe.query_async(&mut conn).await?; - match res.0 { - None => Ok(None), - Some(s) => { - let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { - DiskCacheError::CacheDeserializationError { - cached_value: s, - error: e, - } - })?; - Ok(Some(v.value)) - } - } - } - - /// Remove a cached value - async fn cache_remove(&self, key: &K) -> Result, Self::Error> { - let mut conn = self.multiplexed_connection.clone(); - let mut pipe = disk::pipe(); - let key = self.generate_key(key); - - pipe.get(key.clone()); - pipe.del::(key).ignore(); - let res: (Option,) = pipe.query_async(&mut conn).await?; - match res.0 { - None => Ok(None), - Some(s) => { - let v: CachedDiskValue = serde_json::from_str(&s).map_err(|e| { - DiskCacheError::CacheDeserializationError { - cached_value: s, - error: e, - } - })?; - Ok(Some(v.value)) - } - } - } - - /// Set the flag to control whether cache hits refresh the ttl of cached values, returns the old flag value - fn cache_set_refresh(&mut self, refresh: bool) -> bool { - let old = self.refresh; - self.refresh = refresh; - old - } - - /// Return the lifespan of cached values (time to eviction) - fn cache_lifespan(&self) -> Option { - Some(self.seconds) - } - - /// Set the lifespan of cached values, returns the old value - fn cache_set_lifespan(&mut self, seconds: u64) -> Option { - let old = self.seconds; - self.seconds = seconds; - Some(old) - } - } - - #[cfg(test)] - mod tests { - use super::*; - use std::thread::sleep; - use std::time::Duration; - - fn now_millis() -> u128 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() - } - - #[tokio::test] - async fn test_async_disk_cache() { - let mut c: AsyncDiskCache = - AsyncDiskCache::new(format!("{}:async-disk-cache-test", now_millis()), 2) - .build() - .await - .unwrap(); - - assert!(c.cache_get(&1).await.unwrap().is_none()); - - assert!(c.cache_set(1, 100).await.unwrap().is_none()); - assert!(c.cache_get(&1).await.unwrap().is_some()); - - sleep(Duration::new(2, 500000)); - assert!(c.cache_get(&1).await.unwrap().is_none()); - - let old = c.cache_set_lifespan(1).unwrap(); - assert_eq!(2, old); - assert!(c.cache_set(1, 100).await.unwrap().is_none()); - assert!(c.cache_get(&1).await.unwrap().is_some()); - - sleep(Duration::new(1, 600000)); - assert!(c.cache_get(&1).await.unwrap().is_none()); - - c.cache_set_lifespan(10).unwrap(); - assert!(c.cache_set(1, 100).await.unwrap().is_none()); - assert!(c.cache_set(2, 100).await.unwrap().is_none()); - assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100); - assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100); - } - } -} - -#[cfg(all( - feature = "async", - any(feature = "disk_async_std", feature = "disk_tokio") -))] -pub use async_disk::{AsyncDiskCache, AsyncDiskCacheBuilder}; -*/ - #[cfg(test)] /// Cache store tests mod tests { @@ -636,8 +283,8 @@ mod tests { #[test] fn disk_cache_set_get_remove() { let cache: DiskCache = - DiskCache::new(format!("{}:disk-cache-test-sgr", now_millis()), 3600) - .set_disk_path(std::env::temp_dir().join("cachedtest-sgr")) + DiskCache::new(&format!("{}:disk-cache-test-sgr", now_millis())) + .set_disk_directory(std::env::temp_dir().join("cachedtest-sgr")) .build() .unwrap(); @@ -665,8 +312,8 @@ mod tests { #[test] fn disk_cache() { let mut c: DiskCache = - DiskCache::new(format!("{}:disk-cache-test", now_millis()), 2) - .set_namespace("in-tests:") + DiskCache::new(&format!("{}:disk-cache-test", now_millis())) + .set_lifespan(2) .build() .unwrap(); @@ -696,8 +343,8 @@ mod tests { #[test] fn remove() { let cache: DiskCache = - DiskCache::new(format!("{}:disk-cache-test-remove", now_millis()), 3600) - .set_disk_path(std::env::temp_dir().join("cachedtest-remove")) + DiskCache::new(&format!("{}:disk-cache-test-remove", now_millis())) + .set_disk_directory(std::env::temp_dir().join("cachedtest-remove")) .build() .unwrap(); diff --git a/src/stores/mod.rs b/src/stores/mod.rs index 1fbbd26..3aef8c4 100644 --- a/src/stores/mod.rs +++ b/src/stores/mod.rs @@ -8,25 +8,23 @@ use std::hash::Hash; #[cfg(feature = "async")] use {super::CachedAsync, async_trait::async_trait, futures::Future}; +#[cfg(feature = "disk_store")] +mod disk; mod expiring_value_cache; #[cfg(feature = "redis_store")] mod redis; -#[cfg(feature = "disk_store")] -mod disk; mod sized; mod timed; mod timed_sized; mod unbound; +#[cfg(feature = "disk_store")] +pub use crate::stores::disk::{DiskCache, DiskCacheBuildError, DiskCacheBuilder, DiskCacheError}; #[cfg(feature = "redis_store")] #[cfg_attr(docsrs, doc(cfg(feature = "redis_store")))] pub use crate::stores::redis::{ RedisCache, RedisCacheBuildError, RedisCacheBuilder, RedisCacheError, }; -#[cfg(feature = "disk_store")] -pub use crate::stores::disk::{ - DiskCache, DiskCacheBuildError, DiskCacheBuilder, DiskCacheError, -}; pub use expiring_value_cache::{CanExpire, ExpiringValueCache}; pub use sized::SizedCache; pub use timed::TimedCache; From afb670c91e0fd9051b28edde47d856593f23f138 Mon Sep 17 00:00:00 2001 From: James Kominick Date: Fri, 23 Feb 2024 23:20:59 -0500 Subject: [PATCH 3/3] add #[io_cached] disk support, examples, and tests --- README.md | 29 ++++++++ cached_proc_macro/src/io_cached.rs | 102 +++++++++++++++++++++++++--- cached_proc_macro/src/lib.rs | 2 +- examples/disk.rs | 46 +++++++++++++ src/lib.rs | 30 +++++++++ src/stores/disk.rs | 77 +++++++++++++++------ tests/cached.rs | 105 +++++++++++++++++++++++++++++ 7 files changed, 360 insertions(+), 31 deletions(-) create mode 100644 examples/disk.rs diff --git a/README.md b/README.md index 7470c9d..3081539 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,35 @@ async fn async_cached_sleep_secs(secs: u64) -> Result { } ``` +---- + +```rust,no_run,ignore +use cached::proc_macro::io_cached; +use cached::DiskCache; +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Clone)] +enum ExampleError { + #[error("error with disk cache `{0}`")] + DiskError(String), +} + +/// Cache the results of a function on disk. +/// Cache files will be stored under the system cache dir +/// unless otherwise specified with `disk_dir` or the `create` argument. +/// A `map_error` closure must be specified to convert any +/// disk cache errors into the same type of error returned +/// by your function. All `io_cached` functions must return `Result`s. +#[io_cached( + map_error = r##"|e| ExampleError::DiskError(format!("{:?}", e))"##, + disk = true +)] +fn cached_sleep_secs(secs: u64) -> Result { + std::thread::sleep(std::time::Duration::from_secs(secs)); + Ok(secs.to_string()) +} +``` + Functions defined via macros will have their results cached using the function's arguments as a key, a `convert` expression specified on a procedural macros, diff --git a/cached_proc_macro/src/io_cached.rs b/cached_proc_macro/src/io_cached.rs index 9c45bed..4c38843 100644 --- a/cached_proc_macro/src/io_cached.rs +++ b/cached_proc_macro/src/io_cached.rs @@ -12,6 +12,10 @@ use syn::{ struct IOMacroArgs { map_error: String, #[darling(default)] + disk: bool, + #[darling(default)] + disk_dir: Option, + #[darling(default)] redis: bool, #[darling(default)] cache_prefix_block: Option, @@ -149,6 +153,7 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { Some(ref name) => Ident::new(name, fn_ident.span()), None => Ident::new(&fn_ident.to_string().to_uppercase(), fn_ident.span()), }; + let cache_name = cache_ident.to_string(); let (cache_key_ty, key_convert_block) = make_cache_key_type( &args.key, @@ -161,13 +166,15 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { // make the cache type and create statement let (cache_ty, cache_create) = match ( &args.redis, + &args.disk, &args.time, &args.time_refresh, &args.cache_prefix_block, &args.cache_type, &args.cache_create, ) { - (true, time, time_refresh, cache_prefix, cache_type, cache_create) => { + // redis + (true, false, time, time_refresh, cache_prefix, cache_type, cache_create) => { let cache_ty = match cache_type { Some(cache_type) => { let cache_type = @@ -234,7 +241,63 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { }; (cache_ty, cache_create) } - (_, time, time_refresh, cache_prefix, cache_type, cache_create) => { + // disk + (false, true, time, time_refresh, _, cache_type, cache_create) => { + let cache_ty = match cache_type { + Some(cache_type) => { + let cache_type = + parse_str::(cache_type).expect("unable to parse cache type"); + quote! { #cache_type } + } + None => { + // https://github.com/spacejam/sled?tab=readme-ov-file#interaction-with-async + quote! { cached::DiskCache<#cache_key_ty, #cache_value_ty> } + } + }; + let cache_create = match cache_create { + Some(cache_create) => { + if time.is_some() || time_refresh.is_some() { + panic!( + "cannot specify `time` or `time_refresh` when passing `create block" + ); + } else { + let cache_create = parse_str::(cache_create.as_ref()) + .expect("unable to parse cache create block"); + quote! { #cache_create } + } + } + None => { + let create = quote! { + cached::DiskCache::new(#cache_name) + }; + let create = match time { + None => create, + Some(time) => { + quote! { + (#create).set_lifespan(#time) + } + } + }; + let create = match time_refresh { + None => create, + Some(time_refresh) => { + quote! { + (#create).set_refresh(#time_refresh) + } + } + }; + let create = match args.disk_dir { + None => create, + Some(disk_dir) => { + quote! { (#create).set_disk_directory(#disk_dir) } + } + }; + quote! { (#create).build().expect("error constructing DiskCache in #[io_cached] macro") } + } + }; + (cache_ty, cache_create) + } + (_, _, time, time_refresh, cache_prefix, cache_type, cache_create) => { let cache_ty = match cache_type { Some(cache_type) => { let cache_type = @@ -270,7 +333,7 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { let (set_cache_block, return_cache_block) = { let (set_cache_block, return_cache_block) = if args.with_cached_flag { ( - if asyncness.is_some() { + if asyncness.is_some() && !args.disk { quote! { if let Ok(result) = &result { cache.cache_set(key, result.value.clone()).await.map_err(#map_error)?; @@ -287,7 +350,7 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { ) } else { ( - if asyncness.is_some() { + if asyncness.is_some() && !args.disk { quote! { if let Ok(result) = &result { cache.cache_set(key, result.clone()).await.map_err(#map_error)?; @@ -342,6 +405,29 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { ); fill_in_attributes(&mut attributes, cache_fn_doc_extra); + let async_trait = if asyncness.is_some() && !args.disk { + quote! { + use cached::IOCachedAsync; + } + } else { + quote! { + use cached::IOCached; + } + }; + + let async_cache_get_return = if asyncness.is_some() && !args.disk { + quote! { + if let Some(result) = cache.cache_get(&key).await.map_err(#map_error)? { + #return_cache_block + } + } + } else { + quote! { + if let Some(result) = cache.cache_get(&key).map_err(#map_error)? { + #return_cache_block + } + } + }; // put it all together let expanded = if asyncness.is_some() { quote! { @@ -352,14 +438,12 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { #(#attributes)* #visibility #signature_no_muts { let init = || async { #cache_create }; - use cached::IOCachedAsync; + #async_trait let key = #key_convert_block; { // check if the result is cached let cache = &#cache_ident.get_or_init(init).await; - if let Some(result) = cache.cache_get(&key).await.map_err(#map_error)? { - #return_cache_block - } + #async_cache_get_return } #do_set_return_block } @@ -367,7 +451,7 @@ pub fn io_cached(args: TokenStream, input: TokenStream) -> TokenStream { #[doc = #prime_fn_indent_doc] #[allow(dead_code)] #visibility #prime_sig { - use cached::IOCachedAsync; + #async_trait let init = || async { #cache_create }; let key = #key_convert_block; #do_set_return_block diff --git a/cached_proc_macro/src/lib.rs b/cached_proc_macro/src/lib.rs index 02164f7..6a4a998 100644 --- a/cached_proc_macro/src/lib.rs +++ b/cached_proc_macro/src/lib.rs @@ -68,7 +68,7 @@ pub fn once(args: TokenStream, input: TokenStream) -> TokenStream { /// the error type returned by your function. /// - `name`: (optional, string) specify the name for the generated cache, defaults to the function name uppercase. /// - `redis`: (optional, bool) default to a `RedisCache` or `AsyncRedisCache` -/// - `disk`: (optional, bool) default to a `DiskCache` or `AsyncDiskCache` +/// - `disk`: (optional, bool) use a `DiskCache`, this must be set to true even if `type` and `create` are specified. /// - `time`: (optional, u64) specify a cache TTL in seconds, implies the cache type is a `TimedCached` or `TimedSizedCache`. /// - `time_refresh`: (optional, bool) specify whether to refresh the TTL on cache hits. /// - `type`: (optional, string type) explicitly specify the cache store type to use. diff --git a/examples/disk.rs b/examples/disk.rs new file mode 100644 index 0000000..83bbe5b --- /dev/null +++ b/examples/disk.rs @@ -0,0 +1,46 @@ +/* +run with required features: + cargo run --example disk --features "disk_store" + */ + +use cached::proc_macro::io_cached; +use std::io; +use std::io::Write; +use std::time::Duration; +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Clone)] +enum ExampleError { + #[error("error with redis cache `{0}`")] + DiskError(String), +} + +// When the macro constructs your DiskCache instance, the default +// cache files will be stored under $system_cache_dir/cached_disk_cache/ +#[io_cached( + disk = true, + time = 30, + map_error = r##"|e| ExampleError::DiskError(format!("{:?}", e))"## +)] +fn cached_sleep_secs(secs: u64) -> Result<(), ExampleError> { + std::thread::sleep(Duration::from_secs(secs)); + Ok(()) +} + +fn main() { + print!("1. first sync call with a 2 seconds sleep..."); + io::stdout().flush().unwrap(); + cached_sleep_secs(2).unwrap(); + println!("done"); + print!("second sync call with a 2 seconds sleep (it should be fast)..."); + io::stdout().flush().unwrap(); + cached_sleep_secs(2).unwrap(); + println!("done"); + + use cached::IOCached; + CACHED_SLEEP_SECS.cache_remove(&2).unwrap(); + print!("third sync call with a 2 seconds sleep (slow, after cache-remove)..."); + io::stdout().flush().unwrap(); + cached_sleep_secs(2).unwrap(); + println!("done"); +} diff --git a/src/lib.rs b/src/lib.rs index 4947ccf..fa49324 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,6 +140,35 @@ async fn async_cached_sleep_secs(secs: u64) -> Result { } ``` +---- + +```rust,no_run,ignore +use cached::proc_macro::io_cached; +use cached::DiskCache; +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Clone)] +enum ExampleError { + #[error("error with disk cache `{0}`")] + DiskError(String), +} + +/// Cache the results of a function on disk. +/// Cache files will be stored under the system cache dir +/// unless otherwise specified with `disk_dir` or the `create` argument. +/// A `map_error` closure must be specified to convert any +/// disk cache errors into the same type of error returned +/// by your function. All `io_cached` functions must return `Result`s. +#[io_cached( + map_error = r##"|e| ExampleError::DiskError(format!("{:?}", e))"##, + disk = true +)] +fn cached_sleep_secs(secs: u64) -> Result { + std::thread::sleep(std::time::Duration::from_secs(secs)); + Ok(secs.to_string()) +} +``` + Functions defined via macros will have their results cached using the function's arguments as a key, a `convert` expression specified on a procedural macros, @@ -188,6 +217,7 @@ pub use stores::{ CanExpire, ExpiringValueCache, SizedCache, TimedCache, TimedSizedCache, UnboundCache, }; #[cfg(feature = "disk_store")] +#[cfg_attr(docsrs, doc(cfg(feature = "disk_store")))] pub use stores::{DiskCache, DiskCacheError}; #[cfg(feature = "redis_store")] #[cfg_attr(docsrs, doc(cfg(feature = "redis_store")))] diff --git a/src/stores/disk.rs b/src/stores/disk.rs index 0e2a39f..42eecbf 100644 --- a/src/stores/disk.rs +++ b/src/stores/disk.rs @@ -38,12 +38,12 @@ where V: Serialize + DeserializeOwned, { /// Initialize a `DiskCacheBuilder` - pub fn new(cache_name: &str) -> DiskCacheBuilder { + pub fn new>(cache_name: S) -> DiskCacheBuilder { Self { seconds: None, refresh: false, disk_dir: None, - cache_name: cache_name.to_string(), + cache_name: cache_name.as_ref().to_string(), _phantom: Default::default(), } } @@ -108,7 +108,7 @@ pub struct DiskCache { version: u64, #[allow(unused)] disk_path: PathBuf, - connection: sled::Db, + connection: Db, _phantom: PhantomData<(K, V)>, } @@ -167,6 +167,10 @@ impl CachedDiskValue { version: 1, } } + + fn refresh_created_at(&mut self) { + self.created_at = SystemTime::now(); + } } impl IOCached for DiskCache @@ -177,24 +181,39 @@ where type Error = DiskCacheError; fn cache_get(&self, key: &K) -> Result, DiskCacheError> { - let key_s = key.to_string(); - if let Some(data) = self.connection.get(key_s)? { - let cached = rmp_serde::from_slice::>(&data)?; - - if let Some(lifetime_seconds) = self.seconds { - if SystemTime::now() - .duration_since(cached.created_at) - .unwrap_or(Duration::from_secs(0)) - < Duration::from_secs(lifetime_seconds) - { - Ok(Some(cached.value)) - } else { - self.cache_remove(key)?; - Ok(None) + let key = key.to_string(); + let seconds = self.seconds; + let refresh = self.refresh; + let update = |old: Option<&[u8]>| -> Option> { + if old.is_none() { + return None; + } + let old = old.unwrap(); + if seconds.is_none() { + return Some(old.to_vec()); + } + let seconds = seconds.unwrap(); + let mut cached = rmp_serde::from_slice::>(old) + .expect("error deserializing cached disk value"); + if SystemTime::now() + .duration_since(cached.created_at) + .unwrap_or(Duration::from_secs(0)) + < Duration::from_secs(seconds) + { + if refresh { + cached.refresh_created_at(); } + let cache_val = + rmp_serde::to_vec(&cached).expect("error serializing cached disk value"); + Some(cache_val) } else { - Ok(Some(cached.value)) + None } + }; + + if let Some(data) = self.connection.update_and_fetch(&key, update)? { + let cached = rmp_serde::from_slice::>(&data)?; + Ok(Some(cached.value)) } else { Ok(None) } @@ -281,7 +300,7 @@ mod tests { } #[test] - fn disk_cache_set_get_remove() { + fn disk_set_get_remove() { let cache: DiskCache = DiskCache::new(&format!("{}:disk-cache-test-sgr", now_millis())) .set_disk_directory(std::env::temp_dir().join("cachedtest-sgr")) @@ -310,7 +329,7 @@ mod tests { } #[test] - fn disk_cache() { + fn disk_expire() { let mut c: DiskCache = DiskCache::new(&format!("{}:disk-cache-test", now_millis())) .set_lifespan(2) @@ -341,7 +360,7 @@ mod tests { } #[test] - fn remove() { + fn disk_remove() { let cache: DiskCache = DiskCache::new(&format!("{}:disk-cache-test-remove", now_millis())) .set_disk_directory(std::env::temp_dir().join("cachedtest-remove")) @@ -356,4 +375,20 @@ mod tests { drop(cache); } + + #[test] + fn disk_default_cache_dir() { + let cache: DiskCache = + DiskCache::new(&format!("{}:disk-cache-test-default-dir", now_millis())) + .build() + .unwrap(); + + assert!(cache.cache_set(1, 100).unwrap().is_none()); + assert!(cache.cache_set(2, 200).unwrap().is_none()); + assert!(cache.cache_set(3, 300).unwrap().is_none()); + + assert_eq!(100, cache.cache_remove(&1).unwrap().unwrap()); + + drop(cache); + } } diff --git a/tests/cached.rs b/tests/cached.rs index 2df201c..97fd7af 100644 --- a/tests/cached.rs +++ b/tests/cached.rs @@ -1208,6 +1208,111 @@ fn test_mutable_args_once() { assert_eq!((2, 2), mutable_args_once(5, 6)); } +#[cfg(feature = "disk_store")] +mod disk_tests { + use super::*; + use cached::proc_macro::io_cached; + use cached::DiskCache; + use thiserror::Error; + + #[derive(Error, Debug, PartialEq, Clone)] + enum TestError { + #[error("error with disk cache `{0}`")] + DiskError(String), + #[error("count `{0}`")] + Count(u32), + } + + #[io_cached( + disk = true, + time = 1, + map_error = r##"|e| TestError::DiskError(format!("{:?}", e))"## + )] + fn cached_disk(n: u32) -> Result { + if n < 5 { + Ok(n) + } else { + Err(TestError::Count(n)) + } + } + + #[test] + fn test_cached_disk() { + assert_eq!(cached_disk(1), Ok(1)); + assert_eq!(cached_disk(1), Ok(1)); + assert_eq!(cached_disk(5), Err(TestError::Count(5))); + assert_eq!(cached_disk(6), Err(TestError::Count(6))); + } + + #[io_cached( + disk = true, + time = 1, + with_cached_flag = true, + map_error = r##"|e| TestError::DiskError(format!("{:?}", e))"## + )] + fn cached_disk_cached_flag(n: u32) -> Result, TestError> { + if n < 5 { + Ok(cached::Return::new(n)) + } else { + Err(TestError::Count(n)) + } + } + + #[test] + fn test_cached_disk_cached_flag() { + assert!(!cached_disk_cached_flag(1).unwrap().was_cached); + assert!(cached_disk_cached_flag(1).unwrap().was_cached); + assert!(cached_disk_cached_flag(5).is_err()); + assert!(cached_disk_cached_flag(6).is_err()); + } + + #[io_cached( + map_error = r##"|e| TestError::DiskError(format!("{:?}", e))"##, + type = "cached::DiskCache", + create = r##" { DiskCache::new("cached_disk_cache_create").set_lifespan(1).set_refresh(true).build().expect("error building disk cache") } "## + )] + fn cached_disk_cache_create(n: u32) -> Result { + if n < 5 { + Ok(n) + } else { + Err(TestError::Count(n)) + } + } + + #[test] + fn test_cached_disk_cache_create() { + assert_eq!(cached_disk_cache_create(1), Ok(1)); + assert_eq!(cached_disk_cache_create(1), Ok(1)); + assert_eq!(cached_disk_cache_create(5), Err(TestError::Count(5))); + assert_eq!(cached_disk_cache_create(6), Err(TestError::Count(6))); + } + + #[cfg(feature = "async")] + mod async_test { + use super::*; + + #[io_cached( + disk = true, + map_error = r##"|e| TestError::DiskError(format!("{:?}", e))"## + )] + async fn async_cached_disk(n: u32) -> Result { + if n < 5 { + Ok(n) + } else { + Err(TestError::Count(n)) + } + } + + #[tokio::test] + async fn test_async_cached_disk() { + assert_eq!(async_cached_disk(1).await, Ok(1)); + assert_eq!(async_cached_disk(1).await, Ok(1)); + assert_eq!(async_cached_disk(5).await, Err(TestError::Count(5))); + assert_eq!(async_cached_disk(6).await, Err(TestError::Count(6))); + } + } +} + #[cfg(feature = "redis_store")] mod redis_tests { use super::*;