From 55ab087ea9995b091cfad7d1524c5deb423fc324 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sat, 4 Nov 2023 17:20:13 +0100 Subject: [PATCH 1/2] Change store to be append only --- Cargo.lock | 2 +- apps/freenet-email-app/web/src/inbox.rs | 2 +- crates/core/Cargo.toml | 1 + crates/core/src/contract/executor.rs | 4 +- crates/core/src/ring.rs | 1 - crates/core/src/runtime/contract_store.rs | 112 +++---- crates/core/src/runtime/delegate_store.rs | 109 +++---- crates/core/src/runtime/secrets_store.rs | 113 ++++--- crates/core/src/runtime/store.rs | 343 ++++++++++++++++++---- crates/core/src/runtime/tests/contract.rs | 54 +--- crates/core/src/runtime/tests/mod.rs | 11 +- crates/core/src/runtime/tests/time.rs | 12 +- crates/fdev/src/local_node/state.rs | 11 +- stdlib | 2 +- 14 files changed, 453 insertions(+), 324 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e18e8bef3..e0fc9e51f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,6 +1624,7 @@ dependencies = [ "statrs", "stretto", "tar", + "tempfile", "thiserror", "tokio", "tower-http", @@ -1650,7 +1651,6 @@ name = "freenet-stdlib" version = "0.0.8" dependencies = [ "arbitrary", - "arrayvec", "bincode", "blake3", "bs58", diff --git a/apps/freenet-email-app/web/src/inbox.rs b/apps/freenet-email-app/web/src/inbox.rs index 604c25e46..6b75a3b2e 100644 --- a/apps/freenet-email-app/web/src/inbox.rs +++ b/apps/freenet-email-app/web/src/inbox.rs @@ -557,7 +557,7 @@ mod tests { minimum_tier: Tier::Hour1, private_key, }, - key: ContractKey::from((¶ms.try_into()?, ContractCode::from([].as_slice()))), + key: ContractKey::from_params_and_code(¶ms.try_into()?, ContractCode::from([].as_slice())), }) } } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 1cdbec941..9c4968d9b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -90,6 +90,7 @@ pico-args = "0.5" statrs = "0.16.0" freenet-stdlib = { workspace = true, features = ["testing", "net"] } chrono = { workspace = true, features = ["arbitrary"] } +tempfile = "3.8" [features] default = ["trace", "websocket", "sqlite"] diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index d11fb3a9e..5fbc00bc9 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -1125,7 +1125,7 @@ impl Executor { if let Some(notifiers) = self.update_notifications.get_mut(key) { let summaries = self.subscriber_summaries.get_mut(key).unwrap(); // in general there should be less than 32 failures - let mut failures = arrayvec::ArrayVec::<_, 32>::new(); + let mut failures = Vec::with_capacity(32); for (peer_key, notifier) in notifiers.iter() { let peer_summary = summaries.get_mut(peer_key).unwrap(); let update = match peer_summary { @@ -1153,7 +1153,7 @@ impl Executor { } .into())) { - let _ = failures.try_push(*peer_key); + failures.push(*peer_key); tracing::error!(cli_id = %peer_key, "{err}"); } else { tracing::debug!(cli_id = %peer_key, contract = %key, "notified of update"); diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index cee1cbd9c..b6ecf95ee 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -27,7 +27,6 @@ use std::{ }; use anyhow::bail; -use arrayvec::ArrayVec; use dashmap::{mapref::one::Ref as DmRef, DashMap, DashSet}; use either::Either; use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; diff --git a/crates/core/src/runtime/contract_store.rs b/crates/core/src/runtime/contract_store.rs index 4204ef549..9537a708f 100644 --- a/crates/core/src/runtime/contract_store.rs +++ b/crates/core/src/runtime/contract_store.rs @@ -1,72 +1,48 @@ -use std::{fs::File, io::Write, iter::FromIterator, path::PathBuf, sync::Arc}; +use std::{ + fs::{File, OpenOptions}, + io::{BufWriter, Write}, + path::PathBuf, + sync::Arc, +}; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use serde::{Deserialize, Serialize}; use stretto::Cache; -use super::{ - error::RuntimeInnerError, - store::{StoreEntriesContainer, StoreFsManagement}, - RuntimeResult, -}; - -#[derive(Serialize, Deserialize, Default)] -struct KeyToCodeMap(Vec<(ContractKey, CodeHash)>); - -impl StoreEntriesContainer for KeyToCodeMap { - type MemContainer = Arc>; - type Key = ContractKey; - type Value = CodeHash; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToCodeMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - container.insert(key, value); - } -} - -impl From<&DashMap> for KeyToCodeMap { - fn from(vals: &DashMap) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), *r.value())); - } - Self(map) - } -} +use super::{error::RuntimeInnerError, store::StoreFsManagement, RuntimeResult}; /// Handle contract blob storage on the file system. pub struct ContractStore { contracts_dir: PathBuf, contract_cache: Cache>>, - key_to_code_part: Arc>, + key_to_code_part: Arc>, + index_file: BufWriter, } // TODO: add functionality to delete old contracts which have not been used for a while // to keep the total space used under a configured threshold -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for ContractStore {} +impl StoreFsManagement for ContractStore { + type MemContainer = Arc>; + type Key = ContractInstanceId; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } +} impl ContractStore { /// # Arguments /// - max_size: max size in bytes of the contracts being cached pub fn new(contracts_dir: PathBuf, max_size: i64) -> RuntimeResult { const ERR: &str = "failed to build mem cache"; - let key_to_code_part; - let _ = LOCK_FILE_PATH.try_insert(contracts_dir.join("__LOCK")); - // if the lock file exists is from a previous execution so is safe to delete it - let _ = std::fs::remove_file(LOCK_FILE_PATH.get().unwrap().as_path()); + let mut key_to_code_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(contracts_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -79,24 +55,25 @@ impl ContractStore { tracing::error!("error creating contract dir: {err}"); err })?; - key_to_code_part = Arc::new(DashMap::new()); File::create(contracts_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( - KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + Self::load_from_file( + KEY_FILE_PATH.get().expect("infallible").as_path(), + &mut key_to_code_part, )?; - key_to_code_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( key_to_code_part.clone(), - KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + KEY_FILE_PATH.get().expect("infallible").as_path(), )?; + + let index_file = + std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?); Ok(Self { contract_cache: Cache::new(100, max_size).expect(ERR), contracts_dir, key_to_code_part, + index_file, }) } @@ -121,8 +98,8 @@ impl ContractStore { return result; } - self.key_to_code_part.get(key).and_then(|key| { - let code_hash = key.value(); + self.key_to_code_part.get(&key.id()).and_then(|key| { + let code_hash = key.value().1; let path = code_hash.encode(); let key_path = self.contracts_dir.join(path).with_extension("wasm"); let ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract { @@ -140,7 +117,7 @@ impl ContractStore { }; // add back the contract part to the mem store let size = data.data().len() as i64; - self.contract_cache.insert(*code_hash, data.clone(), size); + self.contract_cache.insert(code_hash, data.clone(), size); Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( WrappedContract::new(data, params), ))) @@ -162,15 +139,6 @@ impl ContractStore { if self.contract_cache.get(code_hash).is_some() { return Ok(()); } - - Self::update( - &mut self.key_to_code_part, - key.clone(), - *code_hash, - KEY_FILE_PATH.get().unwrap(), - LOCK_FILE_PATH.get().unwrap().as_path(), - )?; - let key_path = code_hash.encode(); let key_path = self.contracts_dir.join(key_path).with_extension("wasm"); if let Ok((code, _ver)) = ContractCode::load_versioned_from_path(&key_path) { @@ -185,10 +153,17 @@ impl ContractStore { self.contract_cache .insert(*code_hash, Arc::new(ContractCode::from(data)), size); + // save on disc let version = APIVersion::from(contract); let output: Vec = code.to_bytes_versioned(version)?; let mut file = File::create(key_path)?; file.write_all(output.as_slice())?; + Self::insert( + &mut self.index_file, + &mut self.key_to_code_part, + key.id(), + *code_hash, + )?; Ok(()) } @@ -213,6 +188,9 @@ impl ContractStore { RuntimeInnerError::UnwrapContract })?, }; + if let Some((_, (offset, _))) = self.key_to_code_part.remove(&key.id()) { + Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; + } let key_path = self .contracts_dir .join(contract_hash.encode()) @@ -222,7 +200,7 @@ impl ContractStore { } pub fn code_hash_from_key(&self, key: &ContractKey) -> Option { - self.key_to_code_part.get(key).map(|r| *r.value()) + self.key_to_code_part.get(&key.id()).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/delegate_store.rs b/crates/core/src/runtime/delegate_store.rs index 961fa3cb3..7adb2caff 100644 --- a/crates/core/src/runtime/delegate_store.rs +++ b/crates/core/src/runtime/delegate_store.rs @@ -3,66 +3,43 @@ use freenet_stdlib::prelude::{ APIVersion, CodeHash, Delegate, DelegateCode, DelegateContainer, DelegateKey, DelegateWasmAPIVersion, Parameters, }; -use serde::{Deserialize, Serialize}; +use std::fs::OpenOptions; +use std::io::BufWriter; use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use stretto::Cache; -use super::store::{StoreEntriesContainer, StoreFsManagement}; +use super::store::StoreFsManagement; use super::RuntimeResult; -const DEFAULT_MAX_SIZE: i64 = 10 * 1024 * 1024 * 20; - -#[derive(Serialize, Deserialize, Default)] -struct KeyToCodeMap(Vec<(DelegateKey, CodeHash)>); - -impl StoreEntriesContainer for KeyToCodeMap { - type MemContainer = Arc>; - type Key = DelegateKey; - type Value = CodeHash; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToCodeMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - container.insert(key, value); - } -} - -impl From<&DashMap> for KeyToCodeMap { - fn from(vals: &DashMap) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), *r.value())); - } - Self(map) - } -} - pub struct DelegateStore { delegates_dir: PathBuf, delegate_cache: Cache>, - key_to_code_part: Arc>, + key_to_code_part: Arc>, + index_file: BufWriter, } -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for DelegateStore {} +impl StoreFsManagement for DelegateStore { + type MemContainer = Arc>; + type Key = DelegateKey; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } +} impl DelegateStore { /// # Arguments /// - max_size: max size in bytes of the delegates being cached pub fn new(delegates_dir: PathBuf, max_size: i64) -> RuntimeResult { const ERR: &str = "failed to build mem cache"; - let key_to_delegate_part; - let _ = LOCK_FILE_PATH.try_insert(delegates_dir.join("__LOCK")); + let mut key_to_code_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(delegates_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -75,24 +52,25 @@ impl DelegateStore { tracing::error!("error creating delegate dir: {err}"); err })?; - key_to_delegate_part = Arc::new(DashMap::new()); File::create(delegates_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( + Self::load_from_file( KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + &mut key_to_code_part, )?; - key_to_delegate_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( - key_to_delegate_part.clone(), + key_to_code_part.clone(), KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), )?; + + let index_file = + std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?); Ok(Self { delegate_cache: Cache::new(100, max_size).expect(ERR), delegates_dir, - key_to_code_part: key_to_delegate_part, + key_to_code_part, + index_file, }) } @@ -108,7 +86,7 @@ impl DelegateStore { self.key_to_code_part.get(key).and_then(|code_part| { let delegate_code_path = self .delegates_dir - .join(code_part.value().encode()) + .join(code_part.value().1.encode()) .with_extension("wasm"); tracing::debug!("loading delegate `{key}` from {delegate_code_path:?}"); let DelegateContainer::Wasm(DelegateWasmAPIVersion::V1(Delegate { @@ -138,13 +116,6 @@ impl DelegateStore { } let key = delegate.key(); - Self::update( - &mut self.key_to_code_part, - key.clone(), - *code_hash, - KEY_FILE_PATH.get().unwrap(), - LOCK_FILE_PATH.get().unwrap().as_path(), - )?; let key_path = code_hash.encode(); let delegate_path = self.delegates_dir.join(key_path).with_extension("wasm"); @@ -160,17 +131,27 @@ impl DelegateStore { self.delegate_cache .insert(*code_hash, delegate.code().clone().into_owned(), code_size); + // save on disc let version = APIVersion::from(delegate.clone()); let output: Vec = delegate.code().to_bytes_versioned(version)?; let mut file = File::create(delegate_path)?; file.write_all(output.as_slice())?; + Self::insert( + &mut self.index_file, + &mut self.key_to_code_part, + key.clone(), + *code_hash, + )?; Ok(()) } pub fn remove_delegate(&mut self, key: &DelegateKey) -> RuntimeResult<()> { self.delegate_cache.remove(key.code_hash()); - let cmp_path = self.delegates_dir.join(key.encode()).with_extension("wasm"); + let cmp_path: PathBuf = self.delegates_dir.join(key.encode()).with_extension("wasm"); + if let Some((_, (offset, _))) = self.key_to_code_part.remove(key) { + Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; + } match std::fs::remove_file(cmp_path) { Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(err) => Err(err.into()), @@ -184,17 +165,7 @@ impl DelegateStore { } pub fn code_hash_from_key(&self, key: &DelegateKey) -> Option { - self.key_to_code_part.get(key).map(|r| *r.value()) - } -} - -impl Default for DelegateStore { - fn default() -> Self { - Self { - delegates_dir: Default::default(), - delegate_cache: Cache::new(100, DEFAULT_MAX_SIZE).unwrap(), - key_to_code_part: Arc::new(DashMap::new()), - } + self.key_to_code_part.get(key).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/secrets_store.rs b/crates/core/src/runtime/secrets_store.rs index 092a7057d..4c3965288 100644 --- a/crates/core/src/runtime/secrets_store.rs +++ b/crates/core/src/runtime/secrets_store.rs @@ -1,8 +1,7 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::{self, File}, io::Write, - iter::FromIterator, path::PathBuf, sync::Arc, }; @@ -12,44 +11,11 @@ use chacha20poly1305::{aead::Aead, Error as EncryptionError, KeyInit, XChaCha20P use dashmap::DashMap; use freenet_stdlib::{client_api::DelegateRequest, prelude::*}; use once_cell::sync::Lazy; -use serde::{Deserialize, Serialize}; -use super::{ - store::{StoreEntriesContainer, StoreFsManagement}, - RuntimeResult, -}; +use super::{store::StoreFsManagement, RuntimeResult}; type SecretKey = [u8; 32]; -#[derive(Serialize, Deserialize, Default)] -struct KeyToEncryptionMap(Vec<(DelegateKey, Vec)>); - -impl StoreEntriesContainer for KeyToEncryptionMap { - type MemContainer = Arc>>; - type Key = DelegateKey; - type Value = Vec; - - fn update(self, container: &mut Self::MemContainer) { - for (k, v) in self.0 { - container.insert(k, v); - } - } - - fn replace(container: &Self::MemContainer) -> Self { - KeyToEncryptionMap::from(&**container) - } - - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) { - if let Some(element) = container.get(&key.clone()) { - let mut secrets = element.value().clone(); - secrets.extend(value); - container.insert(key, secrets); - } else { - container.insert(key, value); - } - } -} - #[derive(Debug, thiserror::Error)] pub enum SecretStoreError { #[error("encryption error: {0}")] @@ -62,16 +28,6 @@ pub enum SecretStoreError { MissingSecret(SecretsId), } -impl From<&DashMap>> for KeyToEncryptionMap { - fn from(vals: &DashMap>) -> Self { - let mut map = vec![]; - for r in vals.iter() { - map.push((r.key().clone(), r.value().clone())); - } - Self(map) - } -} - #[derive(Clone)] struct Encryption { cipher: XChaCha20Poly1305, @@ -82,13 +38,56 @@ struct Encryption { pub struct SecretsStore { base_path: PathBuf, ciphers: HashMap, - key_to_secret_part: Arc>>, + key_to_secret_part: Arc>>, } -static LOCK_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); static KEY_FILE_PATH: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); -impl StoreFsManagement for SecretsStore {} +pub(super) struct ConcatenatedSecretKeys(Vec); + +impl AsRef<[u8]> for ConcatenatedSecretKeys { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl<'x> TryFrom<&'x [u8]> for ConcatenatedSecretKeys { + type Error = std::io::Error; + + fn try_from(value: &'x [u8]) -> Result { + Ok(Self(value.to_vec())) + } +} + +impl StoreFsManagement for SecretsStore { + type MemContainer = Arc>>; + type Key = DelegateKey; + type Value = ConcatenatedSecretKeys; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, _offset): (Self::Key, u64), + value: Self::Value, + ) { + let split_secrets = value + .0 + .chunks(32) + .map(|chunk| { + let mut fixed = [0u8; 32]; + fixed.copy_from_slice(chunk); + fixed + }) + .collect::>(); + match container.entry(key) { + dashmap::mapref::entry::Entry::Occupied(mut delegate) => { + delegate.get_mut().extend(split_secrets); + } + dashmap::mapref::entry::Entry::Vacant(delegate) => { + delegate.insert(split_secrets); + } + } + } +} static DEFAULT_CIPHER: Lazy = Lazy::new(|| { let arr = GenericArray::from_slice(&DelegateRequest::DEFAULT_CIPHER); @@ -105,8 +104,7 @@ static DEFAULT_ENCRYPTION: Lazy = Lazy::new(|| Encryption { impl SecretsStore { pub fn new(secrets_dir: PathBuf) -> RuntimeResult { - let key_to_secret_part; - let _ = LOCK_FILE_PATH.try_insert(secrets_dir.join("__LOCK")); + let mut key_to_secret_part = Arc::new(DashMap::new()); let key_file = match KEY_FILE_PATH .try_insert(secrets_dir.join("KEY_DATA")) .map_err(|(e, _)| e) @@ -119,19 +117,16 @@ impl SecretsStore { tracing::error!("error creating delegate dir: {err}"); err })?; - key_to_secret_part = Arc::new(DashMap::new()); File::create(secrets_dir.join("KEY_DATA"))?; } else { - let map = Self::load_from_file( + Self::load_from_file( KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), + &mut key_to_secret_part, )?; - key_to_secret_part = Arc::new(DashMap::from_iter(map.0)); } Self::watch_changes( key_to_secret_part.clone(), KEY_FILE_PATH.get().unwrap().as_path(), - LOCK_FILE_PATH.get().unwrap().as_path(), )?; Ok(Self { base_path: secrets_dir, @@ -176,8 +171,12 @@ impl SecretsStore { } })?; + // FIXME: update on disc self.key_to_secret_part - .insert(delegate.clone(), vec![secret_key]); + .entry(delegate.clone()) + .or_default() + .value_mut() + .insert(secret_key); fs::create_dir_all(&delegate_path)?; tracing::debug!("storing secret `{key}` at {secret_file_path:?}"); diff --git a/crates/core/src/runtime/store.rs b/crates/core/src/runtime/store.rs index 62b04ae91..5257dbf79 100644 --- a/crates/core/src/runtime/store.rs +++ b/crates/core/src/runtime/store.rs @@ -1,48 +1,86 @@ +use arrayvec::ArrayVec; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey}; use notify::Watcher; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::io::Write; +use std::io::{BufReader, BufWriter, Seek, Write}; use std::path::Path; -use std::{ - fs::{self, File}, - io::Read, - thread, - time::Duration, -}; - -use super::{error::RuntimeInnerError, RuntimeResult}; +use std::{fs::File, io::Read}; + +use super::RuntimeResult; use crate::DynError; -pub(crate) trait StoreEntriesContainer: Serialize + DeserializeOwned + Default { - type MemContainer: Send + Sync + 'static; - type Key; - type Value; +const INTERNAL_KEY: usize = 32; +const TOMBSTONE_MARKER: usize = 1; + +pub(super) enum StoreKey { + ContractKey([u8; INTERNAL_KEY]), + DelegateKey { + key: [u8; INTERNAL_KEY], + code_hash: [u8; INTERNAL_KEY], + }, +} + +impl From for StoreKey { + fn from(value: DelegateKey) -> Self { + Self::DelegateKey { + key: *value, + code_hash: **value.code_hash(), + } + } +} + +impl From for DelegateKey { + fn from(value: StoreKey) -> Self { + let StoreKey::DelegateKey { key, code_hash } = value else { + unreachable!() + }; + DelegateKey::new(key, CodeHash::new(code_hash)) + } +} + +impl From for StoreKey { + fn from(value: ContractInstanceId) -> Self { + Self::ContractKey(*value) + } +} - fn update(self, container: &mut Self::MemContainer); - fn replace(container: &Self::MemContainer) -> Self; - fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value); +impl From for ContractInstanceId { + fn from(value: StoreKey) -> Self { + let StoreKey::ContractKey(value) = value else { + unreachable!() + }; + ContractInstanceId::new(value) + } +} + +#[repr(u8)] +enum KeyType { + Contract = 0, + Delegate = 1, } -pub(crate) trait StoreFsManagement -where - C: StoreEntriesContainer, -{ +pub(super) trait StoreFsManagement { + type MemContainer: Send + Sync + 'static; + type Key: Clone + From; + type Value: AsRef<[u8]> + for<'x> TryFrom<&'x [u8], Error = std::io::Error>; + + fn insert_in_container( + container: &mut Self::MemContainer, + key_and_offset: (Self::Key, u64), + value: Self::Value, + ); + fn watch_changes( - mut container: C::MemContainer, + mut container: Self::MemContainer, key_file_path: &Path, - lock_file_path: &Path, ) -> Result<(), DynError> { let key_path = key_file_path.to_path_buf(); - let lock_path = lock_file_path.to_path_buf(); let mut watcher = notify::recommended_watcher( move |res: Result| match res { Ok(ev) => { if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = ev.kind { - match Self::load_from_file(key_path.as_path(), lock_path.as_path()) { - Err(err) => tracing::error!("{err}"), - Ok(map) => { - map.update(&mut container); - } + if let Err(err) = Self::load_from_file(key_path.as_path(), &mut container) { + tracing::error!("{err}") } } } @@ -53,51 +91,226 @@ where Ok(()) } - fn update( - mem_containter: &mut C::MemContainer, - key: C::Key, - value: C::Value, + fn insert( + file: &mut BufWriter, + mem_container: &mut Self::MemContainer, + key: Self::Key, + value: Self::Value, + ) -> RuntimeResult<()> + where + StoreKey: From, + { + // The full key is the tombstone marker byte + kind + [internal key content] + size of value + let internal_key: StoreKey = key.clone().into(); + file.write_u8(false as u8)?; + match internal_key { + StoreKey::ContractKey(key) => { + file.write_u8(KeyType::Contract as u8)?; + file.write_all(&key)?; + } + StoreKey::DelegateKey { key, code_hash } => { + file.write_u8(KeyType::Delegate as u8)?; + file.write_all(&key)?; + file.write_all(&code_hash)?; + } + } + file.write_u32::(value.as_ref().len() as u32)?; + file.write_all(value.as_ref())?; + file.flush()?; + Self::insert_in_container(mem_container, (key, file.stream_position()?), value); + Ok(()) + } + + fn remove(key_file_path: &Path, key_offset: u64) -> RuntimeResult<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .read(true) + .open(key_file_path)?; + file.seek(std::io::SeekFrom::Start(key_offset))?; + // Mark tombstone byte as true + file.write_u8(true as u8)?; + Ok(()) + } + + fn load_from_file( key_file_path: &Path, - lock_file_path: &Path, + container: &mut Self::MemContainer, ) -> RuntimeResult<()> { - Self::acquire_ls_lock(lock_file_path)?; - C::insert(mem_containter, key, value); - let container = C::replace(mem_containter); - let serialized = bincode::serialize(&container).map_err(|e| RuntimeInnerError::Any(e))?; - // FIXME: make this more reliable, append to the file instead of truncating it - let mut f = File::create(key_file_path)?; - f.write_all(&serialized)?; - Self::release_ls_lock(lock_file_path)?; + let mut file = BufReader::new(File::open(key_file_path)?); + let mut first_key_part = + [0u8; TOMBSTONE_MARKER + std::mem::size_of::() + INTERNAL_KEY]; + let mut key_cursor = 0; + while file.read_exact(&mut first_key_part).is_ok() { + let deleted = first_key_part[0] != 0; + let key_type = match first_key_part[1] { + 0 => KeyType::Contract, + 1 => KeyType::Delegate, + _ => unreachable!(), + }; + if !deleted { + let store_key = match key_type { + KeyType::Contract => { + let mut contract_key = [0; INTERNAL_KEY]; + contract_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); + StoreKey::ContractKey(contract_key) + } + KeyType::Delegate => { + let mut delegate_key = [0; INTERNAL_KEY]; + let mut code_hash = [0; INTERNAL_KEY]; + delegate_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); + file.read_exact(&mut code_hash)?; + StoreKey::DelegateKey { + key: delegate_key, + code_hash, + } + } + }; + let value_len = file.read_u32::()?; + let value = { + if value_len == 32 { + let buf = &mut ArrayVec::::from([0; 32]); + file.read_exact(&mut *buf)?; + Self::Value::try_from(&*buf)? + } else { + let mut buf = vec![0; value_len as usize]; + file.read_exact(&mut buf)?; + Self::Value::try_from(&buf)? + } + }; + Self::insert_in_container(container, (store_key.into(), key_cursor), value); + } else { + let skip = match key_type { + KeyType::Contract => file.read_u32::()?, + KeyType::Delegate => { + // skip the code hash part + file.seek_relative(32)?; + file.read_u32::()? + } + }; + file.seek_relative(skip as i64)?; + } + key_cursor = file.stream_position()?; + } Ok(()) } +} - fn load_from_file(key_file_path: &Path, lock_file_path: &Path) -> RuntimeResult { - let mut buf = vec![]; - Self::acquire_ls_lock(lock_file_path)?; - let mut f = File::open(key_file_path)?; - f.read_to_end(&mut buf)?; - Self::release_ls_lock(lock_file_path)?; - let value = if buf.is_empty() { - C::default() - } else { - bincode::deserialize(&buf).map_err(|e| RuntimeInnerError::Any(e))? - }; - Ok(value) +#[cfg(test)] +mod tests { + use std::{fs::OpenOptions, sync::Arc}; + + use super::*; + use dashmap::DashMap; + use tempfile::TempDir; + + struct TestStore1; + + impl StoreFsManagement for TestStore1 { + type MemContainer = Arc>; + type Key = ContractInstanceId; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); + } } - fn acquire_ls_lock(lock_file_path: &Path) -> RuntimeResult<()> { - while lock_file_path.exists() { - thread::sleep(Duration::from_micros(5)); + struct TestStore2; + + impl StoreFsManagement for TestStore2 { + type MemContainer = Arc>; + type Key = DelegateKey; + type Value = CodeHash; + + fn insert_in_container( + container: &mut Self::MemContainer, + (key, offset): (Self::Key, u64), + value: Self::Value, + ) { + container.insert(key, (offset, value)); } - File::create(lock_file_path)?; - Ok(()) } - fn release_ls_lock(lock_file_path: &Path) -> RuntimeResult<()> { - match fs::remove_file(lock_file_path) { - Ok(_) => Ok(()), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(other) => Err(other.into()), + #[test] + fn test_store() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let contract_keys_file_path = temp_dir.path().join("contract_keys"); + let delegate_keys_file_path = temp_dir.path().join("delegate_keys"); + + let key_1 = ContractInstanceId::new([1; 32]); + let expected_value_1 = CodeHash::new([2; 32]); + let key_2 = DelegateKey::new([3; 32], CodeHash::new([4; 32])); + let expected_value_2 = CodeHash::new([5; 32]); + + // Test the update function + { + let mut file_1 = BufWriter::new( + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&contract_keys_file_path) + .expect("Failed to open key file"), + ); + let mut file_2 = BufWriter::new( + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&delegate_keys_file_path) + .expect("Failed to open key file"), + ); + let mut container_1 = ::MemContainer::default(); + let mut container_2 = ::MemContainer::default(); + + TestStore1::insert(&mut file_1, &mut container_1, key_1, expected_value_1) + .expect("Failed to update"); + + TestStore2::insert( + &mut file_2, + &mut container_2, + key_2.clone(), + expected_value_2, + ) + .expect("Failed to update"); + } + + // Test the load_from_file function + { + let mut new_container_1 = ::MemContainer::default(); + TestStore1::load_from_file(&contract_keys_file_path, &mut new_container_1) + .expect("Failed to load from file"); + + let mut new_container_2 = ::MemContainer::default(); + TestStore2::load_from_file(&delegate_keys_file_path, &mut new_container_2) + .expect("Failed to load from file"); + + // Check if the container has the updated key-value pair + let loaded_value_1 = new_container_1.get(&key_1).expect("Key not found"); + let loaded_value_2 = new_container_2.get(&key_2).expect("Key not found"); + + assert_eq!(expected_value_1, loaded_value_1.value().1); + assert_eq!(expected_value_2, loaded_value_2.value().1); + } + + // Test the remove function for TestStore1 + { + let key_file_path = &contract_keys_file_path; + let key_offset = 0; + + TestStore1::remove(key_file_path, key_offset).expect("Failed to remove key"); + + // Reload the container from the key file and check if the key is removed + let mut new_container_1 = ::MemContainer::default(); + TestStore1::load_from_file(key_file_path, &mut new_container_1) + .expect("Failed to load from file"); + + let loaded_value_1 = new_container_1.get(&key_1); + assert!(loaded_value_1.is_none(), "Key still exists"); } } } diff --git a/crates/core/src/runtime/tests/contract.rs b/crates/core/src/runtime/tests/contract.rs index 963c21c18..8e4b30a00 100644 --- a/crates/core/src/runtime/tests/contract.rs +++ b/crates/core/src/runtime/tests/contract.rs @@ -1,22 +1,14 @@ use freenet_stdlib::prelude::*; use super::super::contract::*; -use super::super::{ - secrets_store::SecretsStore, tests::setup_test_contract, DelegateStore, Runtime, -}; +use super::super::{secrets_store::SecretsStore, tests::setup_test_contract, Runtime}; const TEST_CONTRACT_1: &str = "test_contract_1"; #[test] fn validate_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let is_valid = runtime.validate_state( &key, @@ -39,14 +31,8 @@ fn validate_state() -> Result<(), Box> { #[test] fn validate_delta() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let is_valid = runtime.validate_delta( &key, @@ -67,14 +53,8 @@ fn validate_delta() -> Result<(), Box> { #[test] fn update_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let new_state = runtime .update_state( @@ -91,14 +71,8 @@ fn update_state() -> Result<(), Box> { #[test] fn summarize_state() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let summary = runtime.summarize_state( &key, @@ -111,14 +85,8 @@ fn summarize_state() -> Result<(), Box> { #[test] fn get_state_delta() -> Result<(), Box> { - let (store, key) = setup_test_contract(TEST_CONTRACT_1)?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = setup_test_contract(TEST_CONTRACT_1)?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let delta = runtime.get_state_delta( &key, diff --git a/crates/core/src/runtime/tests/mod.rs b/crates/core/src/runtime/tests/mod.rs index 32d0bf913..08caec36f 100644 --- a/crates/core/src/runtime/tests/mod.rs +++ b/crates/core/src/runtime/tests/mod.rs @@ -8,7 +8,7 @@ use freenet_stdlib::prelude::{ ContractCode, ContractContainer, ContractKey, ContractWasmAPIVersion, WrappedContract, }; -use super::ContractStore; +use super::{ContractStore, DelegateStore}; mod contract; mod time; @@ -61,15 +61,16 @@ pub(crate) fn get_test_module(name: &str) -> Result, Box Result<(ContractStore, ContractKey), Box> { +) -> Result<(ContractStore, DelegateStore, ContractKey), Box> { // let _ = tracing_subscriber::fmt().with_env_filter("info").try_init(); - let mut store = ContractStore::new(test_dir("contract"), 10_000)?; + let mut contract_store = ContractStore::new(test_dir("contract"), 10_000)?; + let delegate_store = DelegateStore::new(test_dir("delegate"), 10_000)?; let contract_bytes = WrappedContract::new( Arc::new(ContractCode::from(get_test_module(name)?)), vec![].into(), ); let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract_bytes)); let key = contract.key(); - store.store_contract(contract)?; - Ok((store, key)) + contract_store.store_contract(contract)?; + Ok((contract_store, delegate_store, key)) } diff --git a/crates/core/src/runtime/tests/time.rs b/crates/core/src/runtime/tests/time.rs index 9e7e3f653..8304f90b5 100644 --- a/crates/core/src/runtime/tests/time.rs +++ b/crates/core/src/runtime/tests/time.rs @@ -2,18 +2,12 @@ use wasmer::TypedFunction; -use super::super::{DelegateStore, Runtime, SecretsStore}; +use super::super::{Runtime, SecretsStore}; #[test] fn now() -> Result<(), Box> { - let (store, key) = super::setup_test_contract("test_contract_2")?; - let mut runtime = Runtime::build( - store, - DelegateStore::default(), - SecretsStore::default(), - false, - ) - .unwrap(); + let (contracts, delegates, key) = super::setup_test_contract("test_contract_2")?; + let mut runtime = Runtime::build(contracts, delegates, SecretsStore::default(), false).unwrap(); let module = runtime.prepare_contract_call(&key, &vec![].into(), 1_000)?; let f: TypedFunction<(), ()> = module diff --git a/crates/fdev/src/local_node/state.rs b/crates/fdev/src/local_node/state.rs index 9a25ff836..d59888206 100644 --- a/crates/fdev/src/local_node/state.rs +++ b/crates/fdev/src/local_node/state.rs @@ -19,14 +19,19 @@ pub(super) struct AppState { impl AppState { const MAX_MEM_CACHE: u32 = 10_000_000; + const DEFAULT_MAX_DELEGATE_SIZE: i64 = 10 * 1024 * 1024; pub async fn new(config: &LocalNodeCliConfig) -> Result { - let contract_dir = Config::conf().contracts_dir(); - let contract_store = ContractStore::new(contract_dir, config.max_contract_size)?; + let contract_store = + ContractStore::new(Config::conf().contracts_dir(), config.max_contract_size)?; + let delegate_store = DelegateStore::new( + Config::conf().delegates_dir(), + Self::DEFAULT_MAX_DELEGATE_SIZE, + )?; let state_store = StateStore::new(Storage::new().await?, Self::MAX_MEM_CACHE).unwrap(); let rt = freenet::dev_tool::Runtime::build( contract_store, - DelegateStore::default(), + delegate_store, SecretsStore::default(), false, ) diff --git a/stdlib b/stdlib index 10167caf6..844880eb5 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 10167caf635fe5876b0e755c700f0e17b6e177a0 +Subproject commit 844880eb5d5b1b38b60d152925ea24eb75d57053 From 92ec60bbefee790189bce95b5f08ebfc8812b677 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 5 Nov 2023 00:34:36 +0100 Subject: [PATCH 2/2] Add compaction and fix issues + concurrency tests --- Cargo.lock | 15 +- crates/core/Cargo.toml | 2 +- crates/core/src/ring.rs | 4 +- crates/core/src/runtime/contract_store.rs | 8 +- crates/core/src/runtime/store.rs | 446 ++++++++++++++++++---- stdlib | 2 +- 6 files changed, 392 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0fc9e51f..3893d4257 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,9 +199,6 @@ name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -dependencies = [ - "serde", -] [[package]] name = "asn1-rs" @@ -1582,7 +1579,6 @@ version = "0.0.6" dependencies = [ "anyhow", "arbitrary", - "arrayvec", "async-trait", "asynchronous-codec", "axum", @@ -1603,6 +1599,7 @@ dependencies = [ "directories", "either", "freenet-stdlib", + "fs4", "futures", "itertools", "libp2p", @@ -1678,6 +1675,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "fs4" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29f9df8a11882c4e3335eb2d18a0137c505d9ca927470b0cac9c6f0ae07d28f7" +dependencies = [ + "rustix 0.38.21", + "windows-sys 0.48.0", +] + [[package]] name = "fsevent-sys" version = "4.1.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9c4968d9b..7af9ef236 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,7 +16,6 @@ path = "src/bin/freenet.rs" anyhow = "1" asynchronous-codec = "0.6" async-trait = "0.1" -arrayvec = { workspace = true } axum = { version = "0.6", default-features = false, features = ["ws", "tower-log", "matched-path", "headers", "query", "http1"] } bincode = "1" blake3 = { workspace = true } @@ -33,6 +32,7 @@ dashmap = "^5.5" delegate = "0.10" directories = "5" either = { workspace = true , features = ["serde"] } +fs4 = "0.7" futures = "0.3.21" libp2p = { version = "0.52.3", features = [ "autonat", diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index b6ecf95ee..e9f214ad9 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -694,7 +694,7 @@ impl Ring { live_tx = self .acquire_new( ideal_location, - &missing.values().collect::>(), + &missing.values().collect::>(), ¬ifier, self.max_connections - open_connections, ) @@ -738,7 +738,7 @@ impl Ring { live_tx = self .acquire_new( ideal_location, - &missing.values().collect::>(), + &missing.values().collect::>(), ¬ifier, should_swap.len(), ) diff --git a/crates/core/src/runtime/contract_store.rs b/crates/core/src/runtime/contract_store.rs index 9537a708f..96d3214cb 100644 --- a/crates/core/src/runtime/contract_store.rs +++ b/crates/core/src/runtime/contract_store.rs @@ -98,7 +98,7 @@ impl ContractStore { return result; } - self.key_to_code_part.get(&key.id()).and_then(|key| { + self.key_to_code_part.get(key.id()).and_then(|key| { let code_hash = key.value().1; let path = code_hash.encode(); let key_path = self.contracts_dir.join(path).with_extension("wasm"); @@ -161,7 +161,7 @@ impl ContractStore { Self::insert( &mut self.index_file, &mut self.key_to_code_part, - key.id(), + *key.id(), *code_hash, )?; @@ -188,7 +188,7 @@ impl ContractStore { RuntimeInnerError::UnwrapContract })?, }; - if let Some((_, (offset, _))) = self.key_to_code_part.remove(&key.id()) { + if let Some((_, (offset, _))) = self.key_to_code_part.remove(key.id()) { Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; } let key_path = self @@ -200,7 +200,7 @@ impl ContractStore { } pub fn code_hash_from_key(&self, key: &ContractKey) -> Option { - self.key_to_code_part.get(&key.id()).map(|r| r.value().1) + self.key_to_code_part.get(key.id()).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/store.rs b/crates/core/src/runtime/store.rs index 5257dbf79..78e864d3d 100644 --- a/crates/core/src/runtime/store.rs +++ b/crates/core/src/runtime/store.rs @@ -1,17 +1,19 @@ -use arrayvec::ArrayVec; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use either::Either; use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey}; use notify::Watcher; +use std::fs::{self, OpenOptions}; use std::io::{BufReader, BufWriter, Seek, Write}; use std::path::Path; +use std::time::Duration; use std::{fs::File, io::Read}; -use super::RuntimeResult; use crate::DynError; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; +#[derive(Debug)] pub(super) enum StoreKey { ContractKey([u8; INTERNAL_KEY]), DelegateKey { @@ -75,11 +77,14 @@ pub(super) trait StoreFsManagement { key_file_path: &Path, ) -> Result<(), DynError> { let key_path = key_file_path.to_path_buf(); + let key_path_cp = key_path.clone(); let mut watcher = notify::recommended_watcher( move |res: Result| match res { Ok(ev) => { if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = ev.kind { - if let Err(err) = Self::load_from_file(key_path.as_path(), &mut container) { + if let Err(err) = + Self::load_from_file(key_path_cp.as_path(), &mut container) + { tracing::error!("{err}") } } @@ -87,6 +92,12 @@ pub(super) trait StoreFsManagement { Err(err) => tracing::error!("{err}"), }, )?; + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(5 * 60)); + if let Err(err) = compact_index_file(&key_path) { + tracing::warn!("Failed index file ({key_path:?}) compaction: {err}"); + } + }); watcher.watch(key_file_path, notify::RecursiveMode::NonRecursive)?; Ok(()) } @@ -96,32 +107,18 @@ pub(super) trait StoreFsManagement { mem_container: &mut Self::MemContainer, key: Self::Key, value: Self::Value, - ) -> RuntimeResult<()> + ) -> std::io::Result<()> where StoreKey: From, { // The full key is the tombstone marker byte + kind + [internal key content] + size of value let internal_key: StoreKey = key.clone().into(); - file.write_u8(false as u8)?; - match internal_key { - StoreKey::ContractKey(key) => { - file.write_u8(KeyType::Contract as u8)?; - file.write_all(&key)?; - } - StoreKey::DelegateKey { key, code_hash } => { - file.write_u8(KeyType::Delegate as u8)?; - file.write_all(&key)?; - file.write_all(&code_hash)?; - } - } - file.write_u32::(value.as_ref().len() as u32)?; - file.write_all(value.as_ref())?; - file.flush()?; - Self::insert_in_container(mem_container, (key, file.stream_position()?), value); + let offset = insert_record(file, internal_key, value.as_ref())?; + Self::insert_in_container(mem_container, (key, offset), value); Ok(()) } - fn remove(key_file_path: &Path, key_offset: u64) -> RuntimeResult<()> { + fn remove(key_file_path: &Path, key_offset: u64) -> std::io::Result<()> { let mut file = std::fs::OpenOptions::new() .write(true) .read(true) @@ -135,59 +132,17 @@ pub(super) trait StoreFsManagement { fn load_from_file( key_file_path: &Path, container: &mut Self::MemContainer, - ) -> RuntimeResult<()> { + ) -> std::io::Result<()> { let mut file = BufReader::new(File::open(key_file_path)?); - let mut first_key_part = - [0u8; TOMBSTONE_MARKER + std::mem::size_of::() + INTERNAL_KEY]; let mut key_cursor = 0; - while file.read_exact(&mut first_key_part).is_ok() { - let deleted = first_key_part[0] != 0; - let key_type = match first_key_part[1] { - 0 => KeyType::Contract, - 1 => KeyType::Delegate, - _ => unreachable!(), - }; - if !deleted { - let store_key = match key_type { - KeyType::Contract => { - let mut contract_key = [0; INTERNAL_KEY]; - contract_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - StoreKey::ContractKey(contract_key) - } - KeyType::Delegate => { - let mut delegate_key = [0; INTERNAL_KEY]; - let mut code_hash = [0; INTERNAL_KEY]; - delegate_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - file.read_exact(&mut code_hash)?; - StoreKey::DelegateKey { - key: delegate_key, - code_hash, - } - } - }; - let value_len = file.read_u32::()?; - let value = { - if value_len == 32 { - let buf = &mut ArrayVec::::from([0; 32]); - file.read_exact(&mut *buf)?; - Self::Value::try_from(&*buf)? - } else { - let mut buf = vec![0; value_len as usize]; - file.read_exact(&mut buf)?; - Self::Value::try_from(&buf)? - } - }; - Self::insert_in_container(container, (store_key.into(), key_cursor), value); - } else { - let skip = match key_type { - KeyType::Contract => file.read_u32::()?, - KeyType::Delegate => { - // skip the code hash part - file.seek_relative(32)?; - file.read_u32::()? - } - }; - file.seek_relative(skip as i64)?; + while let Ok(rec) = process_record(&mut file) { + if let Some((store_key, value)) = rec { + let store_key = store_key.into(); + let value = match value { + Either::Left(v) => Self::Value::try_from(&v), + Either::Right(v) => Self::Value::try_from(&v), + }?; + Self::insert_in_container(container, (store_key, key_cursor), value); } key_cursor = file.stream_position()?; } @@ -195,9 +150,193 @@ pub(super) trait StoreFsManagement { } } +/// Inserts a new record and returns the offset +fn insert_record(file: &mut BufWriter, key: StoreKey, value: &[u8]) -> std::io::Result { + // The full key is the tombstone marker byte + kind + [internal key content] + size of value + file.write_u8(false as u8)?; + let mut traversed = 1; + match key { + StoreKey::ContractKey(key) => { + file.write_u8(KeyType::Contract as u8)?; + file.write_all(&key)?; + } + StoreKey::DelegateKey { key, code_hash } => { + file.write_u8(KeyType::Delegate as u8)?; + file.write_all(&key)?; + file.write_all(&code_hash)?; + traversed += 32; // additional code_hash bytes + } + } + traversed += 1 + 32; // key + type marker + file.write_u32::(value.as_ref().len() as u32)?; + traversed += std::mem::size_of::(); + file.write_all(value)?; + traversed += value.len(); + file.flush()?; + let current_offset = file.stream_position()?; + Ok(current_offset - traversed as u64) +} + +#[allow(clippy::type_complexity)] +fn process_record( + reader: &mut BufReader, +) -> std::io::Result>)>> +where + T: Read + Seek, +{ + let mut key_part = [0u8; TOMBSTONE_MARKER + std::mem::size_of::()]; + reader.read_exact(&mut key_part)?; + + let deleted = key_part[0] != 0; + let key_type = match key_part[1] { + 0 => KeyType::Contract, + 1 => KeyType::Delegate, + _ => unreachable!(), + }; + + if !deleted { + let store_key = match key_type { + KeyType::Contract => { + let mut contract_key = [0; INTERNAL_KEY]; + reader.read_exact(&mut contract_key)?; + StoreKey::ContractKey(contract_key) + } + KeyType::Delegate => { + let mut delegate_key = [0; INTERNAL_KEY]; + let mut code_hash = [0; INTERNAL_KEY]; + reader.read_exact(&mut delegate_key)?; + reader.read_exact(&mut code_hash)?; + StoreKey::DelegateKey { + key: delegate_key, + code_hash, + } + } + }; + + // Write the value part + let value_len = reader.read_u32::()?; + let value = if value_len == 32 { + let mut value = [0u8; 32]; + reader.read_exact(&mut value)?; + Either::Left(value) + } else { + let mut value = vec![0u8; value_len as usize]; + reader.read_exact(&mut value)?; + Either::Right(value) + }; + Ok(Some((store_key, value))) + } else { + // Skip the record if deleted + let value_len = match key_type { + KeyType::Contract => { + reader.seek_relative(32)?; // skip the actual key + reader.read_u32::()? // get the value len + } + KeyType::Delegate => { + reader.seek_relative(32)?; // skip the delegate key + reader.seek_relative(32)?; // skip the code hash + reader.read_u32::()? // get the value len + } + }; + reader.seek_relative(value_len as i64)?; + Ok(None) + } +} + +fn compact_index_file(key_file_path: &Path) -> std::io::Result<()> { + use fs4::FileExt; + + let original_file = OpenOptions::new() + .truncate(false) + .read(true) + .open(key_file_path)?; + + // Lock the original file exclusively + if original_file.try_lock_exclusive().is_err() { + return Ok(()); + } + + // Create a new temporary file to write compacted data + let temp_file_path = key_file_path.with_extension("tmp"); + let temp_file = OpenOptions::new() + .create(true) + .write(true) + .open(&temp_file_path) + .map_err(|e| { + let _ = original_file.unlock(); + e + })?; + + // Lock the temporary file exclusively + temp_file.try_lock_exclusive()?; + + // Read the original file and compact data into the temp file + let mut original_reader = BufReader::new(original_file); + let mut temp_writer = BufWriter::new(temp_file); + + let mut any_deleted = false; // Track if any deleted records were found + + loop { + match process_record(&mut original_reader) { + Ok(Some((store_key, value))) => { + let value = match &value { + Either::Left(v) => v.as_slice(), + Either::Right(v) => v.as_slice(), + }; + if let Err(err) = insert_record(&mut temp_writer, store_key, value) { + // Handle the error gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(err); + } + } + Ok(None) => { + // Skip record + any_deleted = true; // A deleted record was found + } + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { + // Done + break; + } + Err(other) => { + // Handle other errors gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(other); + } + } + } + + // Check if any deleted records were found; if not, skip compaction + if !any_deleted { + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Ok(()); + } + + // Clean up and finalize the compaction process + let original_file = original_reader.into_inner(); + original_file.unlock()?; + temp_writer.flush()?; + let _ = temp_writer.into_inner().map(|f| f.unlock()); + std::mem::drop(original_file); + + // Replace the original file with the temporary file + fs::rename(&temp_file_path, key_file_path)?; + let _ = fs::remove_file(&temp_file_path); + + Ok(()) +} + #[cfg(test)] mod tests { - use std::{fs::OpenOptions, sync::Arc}; + use std::{ + fs::OpenOptions, + sync::{Arc, Barrier}, + }; use super::*; use dashmap::DashMap; @@ -313,4 +452,165 @@ mod tests { assert!(loaded_value_1.is_none(), "Key still exists"); } } + + #[test] + fn test_concurrent_updates() { + const NUM_THREADS: usize = 4; + + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let contract_keys_file_path = temp_dir.path().join("contract_keys"); + std::fs::File::create(&contract_keys_file_path).expect("Failed to create file"); + + let container = ::MemContainer::default(); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let mut handles = vec![]; + for i in [0, 10, 20, 30] { + let shared_data = container.clone(); + let barrier = barrier.clone(); + let mut file = BufWriter::new( + OpenOptions::new() + .read(true) + .append(true) + .open(&temp_dir.path().join("contract_keys")) + .expect("Failed to open key file"), + ); + let key_file_path = contract_keys_file_path.clone(); + let handle = std::thread::spawn(move || { + // Wait for all threads to reach this point + barrier.wait(); + create_test_data(&mut file, &key_file_path, shared_data, i) + }); + handles.push(handle); + } + + // Wait for all threads to finish + for handle in handles { + handle.join().expect("Thread panicked"); + } + + // Assert the correctness of append-only updates in the shared data + // Check if the shared data contains the expected content after updates + let container = Arc::try_unwrap(container).unwrap(); + assert_eq!(container.len(), NUM_THREADS * 7); + let mut new_container = ::MemContainer::default(); + TestStore1::load_from_file(&contract_keys_file_path, &mut new_container) + .expect("Failed to load from file"); + assert_eq!(new_container.len(), container.len()); + for i in [0, 10, 20, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "does not have non-deleted key: {}", + i + j + ); + } + for j in [3, 6, 9] { + assert!( + !new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "has deleted key: {}", + i + j + ); + } + } + } + + #[test] + fn test_concurrent_compaction() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let key_file_path = temp_dir.path().join("data.dat"); + std::fs::File::create(&key_file_path).expect("Failed to create file"); + + let num_threads = 4; + let barrier = std::sync::Arc::new(std::sync::Barrier::new(num_threads)); + + let container = ::MemContainer::default(); + + // Start multiple threads to run the compaction function concurrently + let handles: Vec<_> = [0, 10, 20, 30] + .into_iter() + .map(|i| { + let key_file_path = key_file_path.clone(); + let barrier = barrier.clone(); + let shared_data = container.clone(); + let mut file = BufWriter::new( + OpenOptions::new() + .read(true) + .append(true) + .open(&key_file_path) + .expect("Failed to open key file"), + ); + std::thread::spawn(move || { + barrier.wait(); + // concurrently creates/removes some data and compacts + if [10, 30].contains(&i) { + create_test_data(&mut file, &key_file_path, shared_data, i); + } else if let Err(err) = super::compact_index_file(&key_file_path) { + eprintln!("Thread encountered an error during compaction: {err}"); + return Err(err); + } + barrier.wait(); + // compact a last time so we know what data to compare against + super::compact_index_file(&key_file_path).map_err(|err| { + eprintln!("Thread encountered an error during compaction: {err}"); + err + }) + }) + }) + .collect(); + + // Wait for all threads to finish + for handle in handles { + handle + .join() + .expect("Thread panicked") + .expect("Compaction not completed"); + } + + let mut file = BufReader::new(File::open(key_file_path).expect("Couldn't open file")); + + let mut deleted = 0; + let mut keys = vec![]; + while let Ok(rec) = process_record(&mut file) { + match rec { + Some((key, _)) => { + if let StoreKey::ContractKey(key) = key { + keys.push(key); + } + } + None => { + deleted += 1; + } + } + } + assert_eq!(keys.len(), 14); + for i in [10, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + keys.contains(&[i + j; 32]), + "does not have non-deleted key: {}", + i + j + ); + } + } + assert_eq!(deleted, 0); // should be clean after compaction + } + + fn create_test_data( + file: &mut BufWriter, + test_path: &Path, + mut shared_data: ::MemContainer, + thread: u8, + ) { + for j in 0..10 { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let value = CodeHash::new([thread + j as u8; 32]); + TestStore1::insert(file, &mut shared_data, key, value).expect("Failed to update"); + } + for j in [3, 6, 9] { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let key_offset = shared_data.remove(&key).unwrap().1 .0; + TestStore1::remove(test_path, key_offset).expect("Failed to remove key"); + } + } } diff --git a/stdlib b/stdlib index 844880eb5..08d0017fc 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 844880eb5d5b1b38b60d152925ea24eb75d57053 +Subproject commit 08d0017fc84cf36eb9016ea6467cfb51c005328b