From 55ab087ea9995b091cfad7d1524c5deb423fc324 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sat, 4 Nov 2023 17:20:13 +0100 Subject: [PATCH] Change store to be append only --- Cargo.lock | 2 +- apps/freenet-email-app/web/src/inbox.rs | 2 +- crates/core/Cargo.toml | 1 + crates/core/src/contract/executor.rs | 4 +- crates/core/src/ring.rs | 1 - crates/core/src/runtime/contract_store.rs | 112 +++---- crates/core/src/runtime/delegate_store.rs | 109 +++---- crates/core/src/runtime/secrets_store.rs | 113 ++++--- crates/core/src/runtime/store.rs | 343 ++++++++++++++++++---- crates/core/src/runtime/tests/contract.rs | 54 +--- crates/core/src/runtime/tests/mod.rs | 11 +- crates/core/src/runtime/tests/time.rs | 12 +- crates/fdev/src/local_node/state.rs | 11 +- stdlib | 2 +- 14 files changed, 453 insertions(+), 324 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e18e8bef3..e0fc9e51f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,6 +1624,7 @@ dependencies = [ "statrs", "stretto", "tar", + "tempfile", "thiserror", "tokio", "tower-http", @@ -1650,7 +1651,6 @@ name = "freenet-stdlib" version = "0.0.8" dependencies = [ "arbitrary", - "arrayvec", "bincode", "blake3", "bs58", diff --git a/apps/freenet-email-app/web/src/inbox.rs b/apps/freenet-email-app/web/src/inbox.rs index 604c25e46..6b75a3b2e 100644 --- a/apps/freenet-email-app/web/src/inbox.rs +++ b/apps/freenet-email-app/web/src/inbox.rs @@ -557,7 +557,7 @@ mod tests { minimum_tier: Tier::Hour1, private_key, }, - key: ContractKey::from((¶ms.try_into()?, ContractCode::from([].as_slice()))), + key: ContractKey::from_params_and_code(¶ms.try_into()?, ContractCode::from([].as_slice())), }) } } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 1cdbec941..9c4968d9b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -90,6 +90,7 @@ pico-args = "0.5" statrs = "0.16.0" freenet-stdlib = { workspace = true, features = ["testing", "net"] } chrono = { workspace = true, features = ["arbitrary"] } +tempfile = "3.8" [features] default = ["trace", "websocket", "sqlite"] diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index d11fb3a9e..5fbc00bc9 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -1125,7 +1125,7 @@ impl Executor { if let Some(notifiers) = self.update_notifications.get_mut(key) { let summaries = self.subscriber_summaries.get_mut(key).unwrap(); // in general there should be less than 32 failures - let mut failures = arrayvec::ArrayVec::<_, 32>::new(); + let mut failures = Vec::with_capacity(32); for (peer_key, notifier) in notifiers.iter() { let peer_summary = summaries.get_mut(peer_key).unwrap(); let update = match peer_summary { @@ -1153,7 +1153,7 @@ impl Executor { } .into())) { - let _ = failures.try_push(*peer_key); + failures.push(*peer_key); tracing::error!(cli_id = %peer_key, "{err}"); } else { tracing::debug!(cli_id = %peer_key, contract = %key, "notified of update"); diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index cee1cbd9c..b6ecf95ee 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -27,7 +27,6 @@ use std::{ }; use anyhow::bail; -use arrayvec::ArrayVec; use dashmap::{mapref::one::Ref as DmRef, DashMap, DashSet}; use either::Either; use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; diff --git a/crates/core/src/runtime/contract_store.rs b/crates/core/src/runtime/contract_store.rs index 4204ef549..9537a708f 100644 --- a/crates/core/src/runtime/contract_store.rs +++ b/crates/core/src/runtime/contract_store.rs @@ -1,72 +1,48 @@ -use std::{fs::File, io::Write, iter::FromIterator, path::PathBuf, sync::Arc}; +use std::{ + fs::{File, OpenOptions}, + io::{BufWriter, Write}, + path::PathBuf, + sync::Arc, +}; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use serde::{Deserialize, Serialize}; use stretto::Cache; -use super::{ - error::RuntimeInnerError, - store::{StoreEntriesContainer, StoreFsManagement}, - RuntimeResult, -}; - -#[derive(Serialize, Deserialize, Default)] -struct KeyToCodeMap(Vec<(ContractKey, CodeHash)>); - -impl StoreEntriesContainer for KeyToCodeMap { - type MemContainer = Arc>; - type Key = ContractKey; - type Value = CodeHash; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToCodeMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - container.insert(key, value); - } -} - -impl From<&DashMap> for KeyToCodeMap { - fn from(vals: &DashMap) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), *r.value())); - } - Self(map) - } -} +use super::{error::RuntimeInnerError, store::StoreFsManagement, RuntimeResult}; /// Handle contract blob storage on the file system. pub struct ContractStore { contracts_dir: PathBuf, contract_cache: Cache>>, - key_to_code_part: Arc>, + key_to_code_part: Arc>, + index_file: BufWriter, } // TODO: add functionality to delete old contracts which have not been used for a while // to keep the total space used under a configured threshold -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for ContractStore {} +impl StoreFsManagement for ContractStore { + type MemContainer = Arc>; + type Key = ContractInstanceId; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } +} impl ContractStore { /// # Arguments /// - max_size: max size in bytes of the contracts being cached pub fn new(contracts_dir: PathBuf, max_size: i64) -> RuntimeResult { const ERR: &str = "failed to build mem cache"; - let key_to_code_part; - let _ = LOCK_FILE_PATH.try_insert(contracts_dir.join("__LOCK")); - // if the lock file exists is from a previous execution so is safe to delete it - let _ = std::fs::remove_file(LOCK_FILE_PATH.get().unwrap().as_path()); + let mut key_to_code_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(contracts_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -79,24 +55,25 @@ impl ContractStore { tracing::error!("error creating contract dir: {err}"); err })?; - key_to_code_part = Arc::new(DashMap::new()); File::create(contracts_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( - KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + Self::load_from_file( + KEY_FILE_PATH.get().expect("infallible").as_path(), + &mut key_to_code_part, )?; - key_to_code_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( key_to_code_part.clone(), - KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + KEY_FILE_PATH.get().expect("infallible").as_path(), )?; + + let index_file = + std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?); Ok(Self { contract_cache: Cache::new(100, max_size).expect(ERR), contracts_dir, key_to_code_part, + index_file, }) } @@ -121,8 +98,8 @@ impl ContractStore { return result; } - self.key_to_code_part.get(key).and_then(|key| { - let code_hash = key.value(); + self.key_to_code_part.get(&key.id()).and_then(|key| { + let code_hash = key.value().1; let path = code_hash.encode(); let key_path = self.contracts_dir.join(path).with_extension("wasm"); let ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract { @@ -140,7 +117,7 @@ impl ContractStore { }; // add back the contract part to the mem store let size = data.data().len() as i64; - self.contract_cache.insert(*code_hash, data.clone(), size); + self.contract_cache.insert(code_hash, data.clone(), size); Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( WrappedContract::new(data, params), ))) @@ -162,15 +139,6 @@ impl ContractStore { if self.contract_cache.get(code_hash).is_some() { return Ok(()); } - - Self::update( - &mut self.key_to_code_part, - key.clone(), - *code_hash, - KEY_FILE_PATH.get().unwrap(), - LOCK_FILE_PATH.get().unwrap().as_path(), - )?; - let key_path = code_hash.encode(); let key_path = self.contracts_dir.join(key_path).with_extension("wasm"); if let Ok((code, _ver)) = ContractCode::load_versioned_from_path(&key_path) { @@ -185,10 +153,17 @@ impl ContractStore { self.contract_cache .insert(*code_hash, Arc::new(ContractCode::from(data)), size); + // save on disc let version = APIVersion::from(contract); let output: Vec = code.to_bytes_versioned(version)?; let mut file = File::create(key_path)?; file.write_all(output.as_slice())?; + Self::insert( + &mut self.index_file, + &mut self.key_to_code_part, + key.id(), + *code_hash, + )?; Ok(()) } @@ -213,6 +188,9 @@ impl ContractStore { RuntimeInnerError::UnwrapContract })?, }; + if let Some((_, (offset, _))) = self.key_to_code_part.remove(&key.id()) { + Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; + } let key_path = self .contracts_dir .join(contract_hash.encode()) @@ -222,7 +200,7 @@ impl ContractStore { } pub fn code_hash_from_key(&self, key: &ContractKey) -> Option { - self.key_to_code_part.get(key).map(|r| *r.value()) + self.key_to_code_part.get(&key.id()).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/delegate_store.rs b/crates/core/src/runtime/delegate_store.rs index 961fa3cb3..7adb2caff 100644 --- a/crates/core/src/runtime/delegate_store.rs +++ b/crates/core/src/runtime/delegate_store.rs @@ -3,66 +3,43 @@ use freenet_stdlib::prelude::{ APIVersion, CodeHash, Delegate, DelegateCode, DelegateContainer, DelegateKey, DelegateWasmAPIVersion, Parameters, }; -use serde::{Deserialize, Serialize}; +use std::fs::OpenOptions; +use std::io::BufWriter; use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use stretto::Cache; -use super::store::{StoreEntriesContainer, StoreFsManagement}; +use super::store::StoreFsManagement; use super::RuntimeResult; -const DEFAULT_MAX_SIZE: i64 = 10 * 1024 * 1024 * 20; - -#[derive(Serialize, Deserialize, Default)] -struct KeyToCodeMap(Vec<(DelegateKey, CodeHash)>); - -impl StoreEntriesContainer for KeyToCodeMap { - type MemContainer = Arc>; - type Key = DelegateKey; - type Value = CodeHash; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToCodeMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - container.insert(key, value); - } -} - -impl From<&DashMap> for KeyToCodeMap { - fn from(vals: &DashMap) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), *r.value())); - } - Self(map) - } -} - pub struct DelegateStore { delegates_dir: PathBuf, delegate_cache: Cache>, - key_to_code_part: Arc>, + key_to_code_part: Arc>, + index_file: BufWriter, } -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for DelegateStore {} +impl StoreFsManagement for DelegateStore { + type MemContainer = Arc>; + type Key = DelegateKey; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } +} impl DelegateStore { /// # Arguments /// - max_size: max size in bytes of the delegates being cached pub fn new(delegates_dir: PathBuf, max_size: i64) -> RuntimeResult { const ERR: &str = "failed to build mem cache"; - let key_to_delegate_part; - let _ = LOCK_FILE_PATH.try_insert(delegates_dir.join("__LOCK")); + let mut key_to_code_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(delegates_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -75,24 +52,25 @@ impl DelegateStore { tracing::error!("error creating delegate dir: {err}"); err })?; - key_to_delegate_part = Arc::new(DashMap::new()); File::create(delegates_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( + Self::load_from_file( KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + &mut key_to_code_part, )?; - key_to_delegate_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( - key_to_delegate_part.clone(), + key_to_code_part.clone(), KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), )?; + + let index_file = + std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?); Ok(Self { delegate_cache: Cache::new(100, max_size).expect(ERR), delegates_dir, - key_to_code_part: key_to_delegate_part, + key_to_code_part, + index_file, }) } @@ -108,7 +86,7 @@ impl DelegateStore { self.key_to_code_part.get(key).and_then(|code_part| { let delegate_code_path = self .delegates_dir - .join(code_part.value().encode()) + .join(code_part.value().1.encode()) .with_extension("wasm"); tracing::debug!("loading delegate `{key}` from {delegate_code_path:?}"); let DelegateContainer::Wasm(DelegateWasmAPIVersion::V1(Delegate { @@ -138,13 +116,6 @@ impl DelegateStore { } let key = delegate.key(); - Self::update( - &mut self.key_to_code_part, - key.clone(), - *code_hash, - KEY_FILE_PATH.get().unwrap(), - LOCK_FILE_PATH.get().unwrap().as_path(), - )?; let key_path = code_hash.encode(); let delegate_path = self.delegates_dir.join(key_path).with_extension("wasm"); @@ -160,17 +131,27 @@ impl DelegateStore { self.delegate_cache .insert(*code_hash, delegate.code().clone().into_owned(), code_size); + // save on disc let version = APIVersion::from(delegate.clone()); let output: Vec = delegate.code().to_bytes_versioned(version)?; let mut file = File::create(delegate_path)?; file.write_all(output.as_slice())?; + Self::insert( + &mut self.index_file, + &mut self.key_to_code_part, + key.clone(), + *code_hash, + )?; Ok(()) } pub fn remove_delegate(&mut self, key: &DelegateKey) -> RuntimeResult<()> { self.delegate_cache.remove(key.code_hash()); - let cmp_path = self.delegates_dir.join(key.encode()).with_extension("wasm"); + let cmp_path: PathBuf = self.delegates_dir.join(key.encode()).with_extension("wasm"); + if let Some((_, (offset, _))) = self.key_to_code_part.remove(key) { + Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; + } match std::fs::remove_file(cmp_path) { Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), @@ -184,17 +165,7 @@ impl DelegateStore { } pub fn code_hash_from_key(&self, key: &DelegateKey) -> Option { - self.key_to_code_part.get(key).map(|r| *r.value()) - } -} - -impl Default for DelegateStore { - fn default() -> Self { - Self { - delegates_dir: Default::default(), - delegate_cache: Cache::new(100, DEFAULT_MAX_SIZE).unwrap(), - key_to_code_part: Arc::new(DashMap::new()), - } + self.key_to_code_part.get(key).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/secrets_store.rs b/crates/core/src/runtime/secrets_store.rs index 092a7057d..4c3965288 100644 --- a/crates/core/src/runtime/secrets_store.rs +++ b/crates/core/src/runtime/secrets_store.rs @@ -1,8 +1,7 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::{self, File}, io::Write, - iter::FromIterator, path::PathBuf, sync::Arc, }; @@ -12,44 +11,11 @@ use chacha20poly1305::{aead::Aead, Error as EncryptionError, KeyInit, XChaCha20P use dashmap::DashMap; use freenet_stdlib::{client_api::DelegateRequest, prelude::*}; use once_cell::sync::Lazy; -use serde::{Deserialize, Serialize}; -use super::{ - store::{StoreEntriesContainer, StoreFsManagement}, - RuntimeResult, -}; +use super::{store::StoreFsManagement, RuntimeResult}; type SecretKey = [u8; 32]; -#[derive(Serialize, Deserialize, Default)] -struct KeyToEncryptionMap(Vec<(DelegateKey, Vec)>); - -impl StoreEntriesContainer for KeyToEncryptionMap { - type MemContainer = Arc>>; - type Key = DelegateKey; - type Value = Vec; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToEncryptionMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - if let Some(element) = container.get(&key.clone()) { - let mut secrets = element.value().clone(); - secrets.extend(value); - container.insert(key, secrets); - } else { - container.insert(key, value); - } - } -} - #[derive(Debug, thiserror::Error)] pub enum SecretStoreError { #[error("encryption error: {0}")] @@ -62,16 +28,6 @@ pub enum SecretStoreError { MissingSecret(SecretsId), } -impl From<&DashMap>> for KeyToEncryptionMap { - fn from(vals: &DashMap>) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), r.value().clone())); - } - Self(map) - } -} - #[derive(Clone)] struct Encryption { cipher: XChaCha20Poly1305, @@ -82,13 +38,56 @@ struct Encryption { pub struct SecretsStore { base_path: PathBuf, ciphers: HashMap, - key_to_secret_part: Arc>>, + key_to_secret_part: Arc>>, } -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for SecretsStore {} +pub(super) struct ConcatenatedSecretKeys(Vec); + +impl AsRef<[u8]> for ConcatenatedSecretKeys { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl<'x> TryFrom<&'x [u8]> for ConcatenatedSecretKeys { + type Error = std::io::Error; + + fn try_from(value: &'x [u8]) -> Result { + Ok(Self(value.to_vec())) + } +} + +impl StoreFsManagement for SecretsStore { + type MemContainer = Arc>>; + type Key = DelegateKey; + type Value = ConcatenatedSecretKeys; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, _offset): (Self::Key, u64), + value: Self::Value, + ) { + let split_secrets = value + .0 + .chunks(32) + .map(|chunk| { + let mut fixed = [0u8; 32]; + fixed.copy_from_slice(chunk); + fixed + }) + .collect::>(); + match container.entry(key) { + dashmap::mapref::entry::Entry::Occupied(mut delegate) => { + delegate.get_mut().extend(split_secrets); + } + dashmap::mapref::entry::Entry::Vacant(delegate) => { + delegate.insert(split_secrets); + } + } + } +} static DEFAULT_CIPHER: Lazy = Lazy::new(|| { let arr = GenericArray::from_slice(&DelegateRequest::DEFAULT_CIPHER); @@ -105,8 +104,7 @@ static DEFAULT_ENCRYPTION: Lazy = Lazy::new(|| Encryption { impl SecretsStore { pub fn new(secrets_dir: PathBuf) -> RuntimeResult { - let key_to_secret_part; - let _ = LOCK_FILE_PATH.try_insert(secrets_dir.join("__LOCK")); + let mut key_to_secret_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(secrets_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -119,19 +117,16 @@ impl SecretsStore { tracing::error!("error creating delegate dir: {err}"); err })?; - key_to_secret_part = Arc::new(DashMap::new()); File::create(secrets_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( + Self::load_from_file( KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + &mut key_to_secret_part, )?; - key_to_secret_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( key_to_secret_part.clone(), KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), )?; Ok(Self { base_path: secrets_dir, @@ -176,8 +171,12 @@ impl SecretsStore { } })?; + // FIXME: update on disc self.key_to_secret_part - .insert(delegate.clone(), vec![secret_key]); + .entry(delegate.clone()) + .or_default() + .value_mut() + .insert(secret_key); fs::create_dir_all(&delegate_path)?; tracing::debug!("storing secret `{key}` at {secret_file_path:?}"); diff --git a/crates/core/src/runtime/store.rs b/crates/core/src/runtime/store.rs index 62b04ae91..5257dbf79 100644 --- a/crates/core/src/runtime/store.rs +++ b/crates/core/src/runtime/store.rs @@ -1,48 +1,86 @@ +use arrayvec::ArrayVec; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey}; use notify::Watcher; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::io::Write; +use std::io::{BufReader, BufWriter, Seek, Write}; use std::path::Path; -use std::{ - fs::{self, File}, - io::Read, - thread, - time::Duration, -}; - -use super::{error::RuntimeInnerError, RuntimeResult}; +use std::{fs::File, io::Read}; + +use super::RuntimeResult; use crate::DynError; -pub(crate) trait StoreEntriesContainer: Serialize + DeserializeOwned + Default { - type MemContainer: Send + Sync + 'static; - type Key; - type Value; +const INTERNAL_KEY: usize = 32; +const TOMBSTONE_MARKER: usize = 1; + +pub(super) enum StoreKey { + ContractKey([u8; INTERNAL_KEY]), + DelegateKey { + key: [u8; INTERNAL_KEY], + code_hash: [u8; INTERNAL_KEY], + }, +} + +impl From for StoreKey { + fn from(value: DelegateKey) -> Self { + Self::DelegateKey { + key: *value, + code_hash: **value.code_hash(), + } + } +} + +impl From for DelegateKey { + fn from(value: StoreKey) -> Self { + let StoreKey::DelegateKey { key, code_hash } = value else { + unreachable!() + }; + DelegateKey::new(key, CodeHash::new(code_hash)) + } +} + +impl From for StoreKey { + fn from(value: ContractInstanceId) -> Self { + Self::ContractKey(*value) + } +} - fn update(self, container: &mut Self::MemContainer); - fn replace(container: &Self::MemContainer) -> Self; - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value); +impl From for ContractInstanceId { + fn from(value: StoreKey) -> Self { + let StoreKey::ContractKey(value) = value else { + unreachable!() + }; + ContractInstanceId::new(value) + } +} + +#[repr(u8)] +enum KeyType { + Contract = 0, + Delegate = 1, } -pub(crate) trait StoreFsManagement -where - C: StoreEntriesContainer, -{ +pub(super) trait StoreFsManagement { + type MemContainer: Send + Sync + 'static; + type Key: Clone + From; + type Value: AsRef<[u8]> + for<'x> TryFrom<&'x [u8], Error = std::io::Error>; + + fn insert_in_container( + container: &mut Self::MemContainer, + key_and_offset: (Self::Key, u64), + value: Self::Value, + ); + fn watch_changes( - mut container: C::MemContainer, + mut container: Self::MemContainer, key_file_path: &Path, - lock_file_path: &Path, ) -> Result<(), DynError> { let key_path = key_file_path.to_path_buf(); - let lock_path = lock_file_path.to_path_buf(); let mut watcher = notify::recommended_watcher( move |res: Result| match res { Ok(ev) => { if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = ev.kind { - match Self::load_from_file(key_path.as_path(), lock_path.as_path()) { - Err(err) => tracing::error!("{err}"), - Ok(map) => { - map.update(&mut container); - } + if let Err(err) = Self::load_from_file(key_path.as_path(), &mut container) { + tracing::error!("{err}") } } } @@ -53,51 +91,226 @@ where Ok(()) } - fn update( - mem_containter: &mut C::MemContainer, - key: C::Key, - value: C::Value, + fn insert( + file: &mut BufWriter, + mem_container: &mut Self::MemContainer, + key: Self::Key, + value: Self::Value, + ) -> RuntimeResult<()> + where + StoreKey: From, + { + // The full key is the tombstone marker byte + kind + [internal key content] + size of value + let internal_key: StoreKey = key.clone().into(); + file.write_u8(false as u8)?; + match internal_key { + StoreKey::ContractKey(key) => { + file.write_u8(KeyType::Contract as u8)?; + file.write_all(&key)?; + } + StoreKey::DelegateKey { key, code_hash } => { + file.write_u8(KeyType::Delegate as u8)?; + file.write_all(&key)?; + file.write_all(&code_hash)?; + } + } + file.write_u32::(value.as_ref().len() as u32)?; + file.write_all(value.as_ref())?; + file.flush()?; + Self::insert_in_container(mem_container, (key, file.stream_position()?), value); + Ok(()) + } + + fn remove(key_file_path: &Path, key_offset: u64) -> RuntimeResult<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .read(true) + .open(key_file_path)?; + file.seek(std::io::SeekFrom::Start(key_offset))?; + // Mark tombstone byte as true + file.write_u8(true as u8)?; + Ok(()) + } + + fn load_from_file( key_file_path: &Path, - lock_file_path: &Path, + container: &mut Self::MemContainer, ) -> RuntimeResult<()> { - Self::acquire_ls_lock(lock_file_path)?; - C::insert(mem_containter, key, value); - let container = C::replace(mem_containter); - let serialized = bincode::serialize(&container).map_err(|e| RuntimeInnerError::Any(e))?; - // FIXME: make this more reliable, append to the file instead of truncating it - let mut f = File::create(key_file_path)?; - f.write_all(&serialized)?; - Self::release_ls_lock(lock_file_path)?; + let mut file = BufReader::new(File::open(key_file_path)?); + let mut first_key_part = + [0u8; TOMBSTONE_MARKER + std::mem::size_of::() + INTERNAL_KEY]; + let mut key_cursor = 0; + while file.read_exact(&mut first_key_part).is_ok() { + let deleted = first_key_part[0] != 0; + let key_type = match first_key_part[1] { + 0 => KeyType::Contract, + 1 => KeyType::Delegate, + _ => unreachable!(), + }; + if !deleted { + let store_key = match key_type { + KeyType::Contract => { + let mut contract_key = [0; INTERNAL_KEY]; + contract_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); + StoreKey::ContractKey(contract_key) + } + KeyType::Delegate => { + let mut delegate_key = [0; INTERNAL_KEY]; + let mut code_hash = [0; INTERNAL_KEY]; + delegate_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); + file.read_exact(&mut code_hash)?; + StoreKey::DelegateKey { + key: delegate_key, + code_hash, + } + } + }; + let value_len = file.read_u32::()?; + let value = { + if value_len == 32 { + let buf = &mut ArrayVec::::from([0; 32]); + file.read_exact(&mut *buf)?; + Self::Value::try_from(&*buf)? + } else { + let mut buf = vec![0; value_len as usize]; + file.read_exact(&mut buf)?; + Self::Value::try_from(&buf)? + } + }; + Self::insert_in_container(container, (store_key.into(), key_cursor), value); + } else { + let skip = match key_type { + KeyType::Contract => file.read_u32::()?, + KeyType::Delegate => { + // skip the code hash part + file.seek_relative(32)?; + file.read_u32::()? + } + }; + file.seek_relative(skip as i64)?; + } + key_cursor = file.stream_position()?; + } Ok(()) } +} - fn load_from_file(key_file_path: &Path, lock_file_path: &Path) -> RuntimeResult { - let mut buf = vec![]; - Self::acquire_ls_lock(lock_file_path)?; - let mut f = File::open(key_file_path)?; - f.read_to_end(&mut buf)?; - Self::release_ls_lock(lock_file_path)?; - let value = if buf.is_empty() { - C::default() - } else { - bincode::deserialize(&buf).map_err(|e| RuntimeInnerError::Any(e))? - }; - Ok(value) +#[cfg(test)] +mod tests { + use std::{fs::OpenOptions, sync::Arc}; + + use super::*; + use dashmap::DashMap; + use tempfile::TempDir; + + struct TestStore1; + + impl StoreFsManagement for TestStore1 { + type MemContainer = Arc>; + type Key = ContractInstanceId; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } } - fn acquire_ls_lock(lock_file_path: &Path) -> RuntimeResult<()> { - while lock_file_path.exists() { - thread::sleep(Duration::from_micros(5)); + struct TestStore2; + + impl StoreFsManagement for TestStore2 { + type MemContainer = Arc>; + type Key = DelegateKey; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); } - File::create(lock_file_path)?; - Ok(()) } - fn release_ls_lock(lock_file_path: &Path) -> RuntimeResult<()> { - match fs::remove_file(lock_file_path) { - Ok(_) => Ok(()), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(other) => Err(other.into()), + #[test] + fn test_store() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let contract_keys_file_path = temp_dir.path().join("contract_keys"); + let delegate_keys_file_path = temp_dir.path().join("delegate_keys"); + + let key_1 = ContractInstanceId::new([1; 32]); + let expected_value_1 = CodeHash::new([2; 32]); + let key_2 = DelegateKey::new([3; 32], CodeHash::new([4; 32])); + let expected_value_2 = CodeHash::new([5; 32]); + + // Test the update function + { + let mut file_1 = BufWriter::new( + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&contract_keys_file_path) + .expect("Failed to open key file"), + ); + let mut file_2 = BufWriter::new( + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&delegate_keys_file_path) + .expect("Failed to open key file"), + ); + let mut container_1 = ::MemContainer::default(); + let mut container_2 = ::MemContainer::default(); + + TestStore1::insert(&mut file_1, &mut container_1, key_1, expected_value_1) + .expect("Failed to update"); + + TestStore2::insert( + &mut file_2, + &mut container_2, + key_2.clone(), + expected_value_2, + ) + .expect("Failed to update"); + } + + // Test the load_from_file function + { + let mut new_container_1 = ::MemContainer::default(); + TestStore1::load_from_file(&contract_keys_file_path, &mut new_container_1) + .expect("Failed to load from file"); + + let mut new_container_2 = ::MemContainer::default(); + TestStore2::load_from_file(&delegate_keys_file_path, &mut new_container_2) + .expect("Failed to load from file"); + + // Check if the container has the updated key-value pair + let loaded_value_1 = new_container_1.get(&key_1).expect("Key not found"); + let loaded_value_2 = new_container_2.get(&key_2).expect("Key not found"); + + assert_eq!(expected_value_1, loaded_value_1.value().1); + assert_eq!(expected_value_2, loaded_value_2.value().1); + } + + // Test the remove function for TestStore1 + { + let key_file_path = &contract_keys_file_path; + let key_offset = 0; + + TestStore1::remove(key_file_path, key_offset).expect("Failed to remove key"); + + // Reload the container from the key file and check if the key is removed + let mut new_container_1 = ::MemContainer::default(); + TestStore1::load_from_file(key_file_path, &mut new_container_1) + .expect("Failed to load from file"); + + let loaded_value_1 = new_container_1.get(&key_1); + assert!(loaded_value_1.is_none(), "Key still exists"); } } } diff --git a/crates/core/src/runtime/tests/contract.rs b/crates/core/src/runtime/tests/contract.rs index 963c21c18..8e4b30a00 100644 --- a/crates/core/src/runtime/tests/contract.rs +++ b/crates/core/src/runtime/tests/contract.rs @@ -1,22 +1,14 @@ use freenet_stdlib::prelude::*; use super::super::contract::*; -use super::super::{ - secrets_store::SecretsStore, tests::setup_test_contract, DelegateStore, Runtime, -}; +use super::super::{secrets_store::SecretsStore, tests::setup_test_contract, Runtime}; const TEST_CONTRACT_1: &str = "test_contract_1"; #[test] fn validate_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let is_valid = runtime.validate_state( &key, @@ -39,14 +31,8 @@ fn validate_state() -> Result<(), Box> { #[test] fn validate_delta() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let is_valid = runtime.validate_delta( &key, @@ -67,14 +53,8 @@ fn validate_delta() -> Result<(), Box> { #[test] fn update_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let new_state = runtime .update_state( @@ -91,14 +71,8 @@ fn update_state() -> Result<(), Box> { #[test] fn summarize_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let summary = runtime.summarize_state( &key, @@ -111,14 +85,8 @@ fn summarize_state() -> Result<(), Box> { #[test] fn get_state_delta() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let delta = runtime.get_state_delta( &key, diff --git a/crates/core/src/runtime/tests/mod.rs b/crates/core/src/runtime/tests/mod.rs index 32d0bf913..08caec36f 100644 --- a/crates/core/src/runtime/tests/mod.rs +++ b/crates/core/src/runtime/tests/mod.rs @@ -8,7 +8,7 @@ use freenet_stdlib::prelude::{ ContractCode, ContractContainer, ContractKey, ContractWasmAPIVersion, WrappedContract, }; -use super::ContractStore; +use super::{ContractStore, DelegateStore}; mod contract; mod time; @@ -61,15 +61,16 @@ pub(crate) fn get_test_module(name: &str) -> Result, Box Result<(ContractStore, ContractKey), Box> { +) -> Result<(ContractStore, DelegateStore, ContractKey), Box> { // let _ = tracing_subscriber::fmt().with_env_filter("info").try_init(); - let mut store = ContractStore::new(test_dir("contract"), 10_000)?; + let mut contract_store = ContractStore::new(test_dir("contract"), 10_000)?; + let delegate_store = DelegateStore::new(test_dir("delegate"), 10_000)?; let contract_bytes = WrappedContract::new( Arc::new(ContractCode::from(get_test_module(name)?)), vec![].into(), ); let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract_bytes)); let key = contract.key(); - store.store_contract(contract)?; - Ok((store, key)) + contract_store.store_contract(contract)?; + Ok((contract_store, delegate_store, key)) } diff --git a/crates/core/src/runtime/tests/time.rs b/crates/core/src/runtime/tests/time.rs index 9e7e3f653..8304f90b5 100644 --- a/crates/core/src/runtime/tests/time.rs +++ b/crates/core/src/runtime/tests/time.rs @@ -2,18 +2,12 @@ use wasmer::TypedFunction; -use super::super::{DelegateStore, Runtime, SecretsStore}; +use super::super::{Runtime, SecretsStore}; #[test] fn now() -> Result<(), Box> { - let (store, key) = super::setup_test_contract("test_contract_2")?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = super::setup_test_contract("test_contract_2")?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let module = runtime.prepare_contract_call(&key, &vec![].into(), 1_000)?; let f: TypedFunction<(), ()> = module diff --git a/crates/fdev/src/local_node/state.rs b/crates/fdev/src/local_node/state.rs index 9a25ff836..d59888206 100644 --- a/crates/fdev/src/local_node/state.rs +++ b/crates/fdev/src/local_node/state.rs @@ -19,14 +19,19 @@ pub(super) struct AppState { impl AppState { const MAX_MEM_CACHE: u32 = 10_000_000; + const DEFAULT_MAX_DELEGATE_SIZE: i64 = 10 * 1024 * 1024; pub async fn new(config: &LocalNodeCliConfig) -> Result { - let contract_dir = Config::conf().contracts_dir(); - let contract_store = ContractStore::new(contract_dir, config.max_contract_size)?; + let contract_store = + ContractStore::new(Config::conf().contracts_dir(), config.max_contract_size)?; + let delegate_store = DelegateStore::new( + Config::conf().delegates_dir(), + Self::DEFAULT_MAX_DELEGATE_SIZE, + )?; let state_store = StateStore::new(Storage::new().await?, Self::MAX_MEM_CACHE).unwrap(); let rt = freenet::dev_tool::Runtime::build( contract_store, - DelegateStore::default(), + delegate_store, SecretsStore::default(), false, ) diff --git a/stdlib b/stdlib index 10167caf6..844880eb5 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 10167caf635fe5876b0e755c700f0e17b6e177a0 +Subproject commit 844880eb5d5b1b38b60d152925ea24eb75d57053