From 92ec60bbefee790189bce95b5f08ebfc8812b677 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 5 Nov 2023 00:34:36 +0100 Subject: [PATCH] Add compaction and fix issues + concurrency tests --- Cargo.lock | 15 +- crates/core/Cargo.toml | 2 +- crates/core/src/ring.rs | 4 +- crates/core/src/runtime/contract_store.rs | 8 +- crates/core/src/runtime/store.rs | 446 ++++++++++++++++++---- stdlib | 2 +- 6 files changed, 392 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0fc9e51f..3893d4257 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,9 +199,6 @@ name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -dependencies = [ - "serde", -] [[package]] name = "asn1-rs" @@ -1582,7 +1579,6 @@ version = "0.0.6" dependencies = [ "anyhow", "arbitrary", - "arrayvec", "async-trait", "asynchronous-codec", "axum", @@ -1603,6 +1599,7 @@ dependencies = [ "directories", "either", "freenet-stdlib", + "fs4", "futures", "itertools", "libp2p", @@ -1678,6 +1675,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "fs4" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29f9df8a11882c4e3335eb2d18a0137c505d9ca927470b0cac9c6f0ae07d28f7" +dependencies = [ + "rustix 0.38.21", + "windows-sys 0.48.0", +] + [[package]] name = "fsevent-sys" version = "4.1.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9c4968d9b..7af9ef236 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,7 +16,6 @@ path = "src/bin/freenet.rs" anyhow = "1" asynchronous-codec = "0.6" async-trait = "0.1" -arrayvec = { workspace = true } axum = { version = "0.6", default-features = false, features = ["ws", "tower-log", "matched-path", "headers", "query", "http1"] } bincode = "1" blake3 = { workspace = true } @@ -33,6 +32,7 @@ dashmap = "^5.5" delegate = "0.10" directories = "5" either = { workspace = true , features = ["serde"] } +fs4 = "0.7" futures = "0.3.21" libp2p = { version = "0.52.3", features = [ "autonat", diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index b6ecf95ee..e9f214ad9 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -694,7 +694,7 @@ impl Ring { live_tx = self .acquire_new( ideal_location, - &missing.values().collect::>(), + &missing.values().collect::>(), ¬ifier, self.max_connections - open_connections, ) @@ -738,7 +738,7 @@ impl Ring { live_tx = self .acquire_new( ideal_location, - &missing.values().collect::>(), + &missing.values().collect::>(), ¬ifier, should_swap.len(), ) diff --git a/crates/core/src/runtime/contract_store.rs b/crates/core/src/runtime/contract_store.rs index 9537a708f..96d3214cb 100644 --- a/crates/core/src/runtime/contract_store.rs +++ b/crates/core/src/runtime/contract_store.rs @@ -98,7 +98,7 @@ impl ContractStore { return result; } - self.key_to_code_part.get(&key.id()).and_then(|key| { + self.key_to_code_part.get(key.id()).and_then(|key| { let code_hash = key.value().1; let path = code_hash.encode(); let key_path = self.contracts_dir.join(path).with_extension("wasm"); @@ -161,7 +161,7 @@ impl ContractStore { Self::insert( &mut self.index_file, &mut self.key_to_code_part, - key.id(), + *key.id(), *code_hash, )?; @@ -188,7 +188,7 @@ impl ContractStore { RuntimeInnerError::UnwrapContract })?, }; - if let Some((_, (offset, _))) = self.key_to_code_part.remove(&key.id()) { + if let Some((_, (offset, _))) = self.key_to_code_part.remove(key.id()) { Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?; } let key_path = self @@ -200,7 +200,7 @@ impl ContractStore { } pub fn code_hash_from_key(&self, key: &ContractKey) -> Option { - self.key_to_code_part.get(&key.id()).map(|r| r.value().1) + self.key_to_code_part.get(key.id()).map(|r| r.value().1) } } diff --git a/crates/core/src/runtime/store.rs b/crates/core/src/runtime/store.rs index 5257dbf79..78e864d3d 100644 --- a/crates/core/src/runtime/store.rs +++ b/crates/core/src/runtime/store.rs @@ -1,17 +1,19 @@ -use arrayvec::ArrayVec; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use either::Either; use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey}; use notify::Watcher; +use std::fs::{self, OpenOptions}; use std::io::{BufReader, BufWriter, Seek, Write}; use std::path::Path; +use std::time::Duration; use std::{fs::File, io::Read}; -use super::RuntimeResult; use crate::DynError; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; +#[derive(Debug)] pub(super) enum StoreKey { ContractKey([u8; INTERNAL_KEY]), DelegateKey { @@ -75,11 +77,14 @@ pub(super) trait StoreFsManagement { key_file_path: &Path, ) -> Result<(), DynError> { let key_path = key_file_path.to_path_buf(); + let key_path_cp = key_path.clone(); let mut watcher = notify::recommended_watcher( move |res: Result| match res { Ok(ev) => { if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = ev.kind { - if let Err(err) = Self::load_from_file(key_path.as_path(), &mut container) { + if let Err(err) = + Self::load_from_file(key_path_cp.as_path(), &mut container) + { tracing::error!("{err}") } } @@ -87,6 +92,12 @@ pub(super) trait StoreFsManagement { Err(err) => tracing::error!("{err}"), }, )?; + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(5 * 60)); + if let Err(err) = compact_index_file(&key_path) { + tracing::warn!("Failed index file ({key_path:?}) compaction: {err}"); + } + }); watcher.watch(key_file_path, notify::RecursiveMode::NonRecursive)?; Ok(()) } @@ -96,32 +107,18 @@ pub(super) trait StoreFsManagement { mem_container: &mut Self::MemContainer, key: Self::Key, value: Self::Value, - ) -> RuntimeResult<()> + ) -> std::io::Result<()> where StoreKey: From, { // The full key is the tombstone marker byte + kind + [internal key content] + size of value let internal_key: StoreKey = key.clone().into(); - file.write_u8(false as u8)?; - match internal_key { - StoreKey::ContractKey(key) => { - file.write_u8(KeyType::Contract as u8)?; - file.write_all(&key)?; - } - StoreKey::DelegateKey { key, code_hash } => { - file.write_u8(KeyType::Delegate as u8)?; - file.write_all(&key)?; - file.write_all(&code_hash)?; - } - } - file.write_u32::(value.as_ref().len() as u32)?; - file.write_all(value.as_ref())?; - file.flush()?; - Self::insert_in_container(mem_container, (key, file.stream_position()?), value); + let offset = insert_record(file, internal_key, value.as_ref())?; + Self::insert_in_container(mem_container, (key, offset), value); Ok(()) } - fn remove(key_file_path: &Path, key_offset: u64) -> RuntimeResult<()> { + fn remove(key_file_path: &Path, key_offset: u64) -> std::io::Result<()> { let mut file = std::fs::OpenOptions::new() .write(true) .read(true) @@ -135,59 +132,17 @@ pub(super) trait StoreFsManagement { fn load_from_file( key_file_path: &Path, container: &mut Self::MemContainer, - ) -> RuntimeResult<()> { + ) -> std::io::Result<()> { let mut file = BufReader::new(File::open(key_file_path)?); - let mut first_key_part = - [0u8; TOMBSTONE_MARKER + std::mem::size_of::() + INTERNAL_KEY]; let mut key_cursor = 0; - while file.read_exact(&mut first_key_part).is_ok() { - let deleted = first_key_part[0] != 0; - let key_type = match first_key_part[1] { - 0 => KeyType::Contract, - 1 => KeyType::Delegate, - _ => unreachable!(), - }; - if !deleted { - let store_key = match key_type { - KeyType::Contract => { - let mut contract_key = [0; INTERNAL_KEY]; - contract_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - StoreKey::ContractKey(contract_key) - } - KeyType::Delegate => { - let mut delegate_key = [0; INTERNAL_KEY]; - let mut code_hash = [0; INTERNAL_KEY]; - delegate_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - file.read_exact(&mut code_hash)?; - StoreKey::DelegateKey { - key: delegate_key, - code_hash, - } - } - }; - let value_len = file.read_u32::()?; - let value = { - if value_len == 32 { - let buf = &mut ArrayVec::::from([0; 32]); - file.read_exact(&mut *buf)?; - Self::Value::try_from(&*buf)? - } else { - let mut buf = vec![0; value_len as usize]; - file.read_exact(&mut buf)?; - Self::Value::try_from(&buf)? - } - }; - Self::insert_in_container(container, (store_key.into(), key_cursor), value); - } else { - let skip = match key_type { - KeyType::Contract => file.read_u32::()?, - KeyType::Delegate => { - // skip the code hash part - file.seek_relative(32)?; - file.read_u32::()? - } - }; - file.seek_relative(skip as i64)?; + while let Ok(rec) = process_record(&mut file) { + if let Some((store_key, value)) = rec { + let store_key = store_key.into(); + let value = match value { + Either::Left(v) => Self::Value::try_from(&v), + Either::Right(v) => Self::Value::try_from(&v), + }?; + Self::insert_in_container(container, (store_key, key_cursor), value); } key_cursor = file.stream_position()?; } @@ -195,9 +150,193 @@ pub(super) trait StoreFsManagement { } } +/// Inserts a new record and returns the offset +fn insert_record(file: &mut BufWriter, key: StoreKey, value: &[u8]) -> std::io::Result { + // The full key is the tombstone marker byte + kind + [internal key content] + size of value + file.write_u8(false as u8)?; + let mut traversed = 1; + match key { + StoreKey::ContractKey(key) => { + file.write_u8(KeyType::Contract as u8)?; + file.write_all(&key)?; + } + StoreKey::DelegateKey { key, code_hash } => { + file.write_u8(KeyType::Delegate as u8)?; + file.write_all(&key)?; + file.write_all(&code_hash)?; + traversed += 32; // additional code_hash bytes + } + } + traversed += 1 + 32; // key + type marker + file.write_u32::(value.as_ref().len() as u32)?; + traversed += std::mem::size_of::(); + file.write_all(value)?; + traversed += value.len(); + file.flush()?; + let current_offset = file.stream_position()?; + Ok(current_offset - traversed as u64) +} + +#[allow(clippy::type_complexity)] +fn process_record( + reader: &mut BufReader, +) -> std::io::Result>)>> +where + T: Read + Seek, +{ + let mut key_part = [0u8; TOMBSTONE_MARKER + std::mem::size_of::()]; + reader.read_exact(&mut key_part)?; + + let deleted = key_part[0] != 0; + let key_type = match key_part[1] { + 0 => KeyType::Contract, + 1 => KeyType::Delegate, + _ => unreachable!(), + }; + + if !deleted { + let store_key = match key_type { + KeyType::Contract => { + let mut contract_key = [0; INTERNAL_KEY]; + reader.read_exact(&mut contract_key)?; + StoreKey::ContractKey(contract_key) + } + KeyType::Delegate => { + let mut delegate_key = [0; INTERNAL_KEY]; + let mut code_hash = [0; INTERNAL_KEY]; + reader.read_exact(&mut delegate_key)?; + reader.read_exact(&mut code_hash)?; + StoreKey::DelegateKey { + key: delegate_key, + code_hash, + } + } + }; + + // Write the value part + let value_len = reader.read_u32::()?; + let value = if value_len == 32 { + let mut value = [0u8; 32]; + reader.read_exact(&mut value)?; + Either::Left(value) + } else { + let mut value = vec![0u8; value_len as usize]; + reader.read_exact(&mut value)?; + Either::Right(value) + }; + Ok(Some((store_key, value))) + } else { + // Skip the record if deleted + let value_len = match key_type { + KeyType::Contract => { + reader.seek_relative(32)?; // skip the actual key + reader.read_u32::()? // get the value len + } + KeyType::Delegate => { + reader.seek_relative(32)?; // skip the delegate key + reader.seek_relative(32)?; // skip the code hash + reader.read_u32::()? // get the value len + } + }; + reader.seek_relative(value_len as i64)?; + Ok(None) + } +} + +fn compact_index_file(key_file_path: &Path) -> std::io::Result<()> { + use fs4::FileExt; + + let original_file = OpenOptions::new() + .truncate(false) + .read(true) + .open(key_file_path)?; + + // Lock the original file exclusively + if original_file.try_lock_exclusive().is_err() { + return Ok(()); + } + + // Create a new temporary file to write compacted data + let temp_file_path = key_file_path.with_extension("tmp"); + let temp_file = OpenOptions::new() + .create(true) + .write(true) + .open(&temp_file_path) + .map_err(|e| { + let _ = original_file.unlock(); + e + })?; + + // Lock the temporary file exclusively + temp_file.try_lock_exclusive()?; + + // Read the original file and compact data into the temp file + let mut original_reader = BufReader::new(original_file); + let mut temp_writer = BufWriter::new(temp_file); + + let mut any_deleted = false; // Track if any deleted records were found + + loop { + match process_record(&mut original_reader) { + Ok(Some((store_key, value))) => { + let value = match &value { + Either::Left(v) => v.as_slice(), + Either::Right(v) => v.as_slice(), + }; + if let Err(err) = insert_record(&mut temp_writer, store_key, value) { + // Handle the error gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(err); + } + } + Ok(None) => { + // Skip record + any_deleted = true; // A deleted record was found + } + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { + // Done + break; + } + Err(other) => { + // Handle other errors gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(other); + } + } + } + + // Check if any deleted records were found; if not, skip compaction + if !any_deleted { + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Ok(()); + } + + // Clean up and finalize the compaction process + let original_file = original_reader.into_inner(); + original_file.unlock()?; + temp_writer.flush()?; + let _ = temp_writer.into_inner().map(|f| f.unlock()); + std::mem::drop(original_file); + + // Replace the original file with the temporary file + fs::rename(&temp_file_path, key_file_path)?; + let _ = fs::remove_file(&temp_file_path); + + Ok(()) +} + #[cfg(test)] mod tests { - use std::{fs::OpenOptions, sync::Arc}; + use std::{ + fs::OpenOptions, + sync::{Arc, Barrier}, + }; use super::*; use dashmap::DashMap; @@ -313,4 +452,165 @@ mod tests { assert!(loaded_value_1.is_none(), "Key still exists"); } } + + #[test] + fn test_concurrent_updates() { + const NUM_THREADS: usize = 4; + + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let contract_keys_file_path = temp_dir.path().join("contract_keys"); + std::fs::File::create(&contract_keys_file_path).expect("Failed to create file"); + + let container = ::MemContainer::default(); + let barrier = Arc::new(Barrier::new(NUM_THREADS)); + + let mut handles = vec![]; + for i in [0, 10, 20, 30] { + let shared_data = container.clone(); + let barrier = barrier.clone(); + let mut file = BufWriter::new( + OpenOptions::new() + .read(true) + .append(true) + .open(&temp_dir.path().join("contract_keys")) + .expect("Failed to open key file"), + ); + let key_file_path = contract_keys_file_path.clone(); + let handle = std::thread::spawn(move || { + // Wait for all threads to reach this point + barrier.wait(); + create_test_data(&mut file, &key_file_path, shared_data, i) + }); + handles.push(handle); + } + + // Wait for all threads to finish + for handle in handles { + handle.join().expect("Thread panicked"); + } + + // Assert the correctness of append-only updates in the shared data + // Check if the shared data contains the expected content after updates + let container = Arc::try_unwrap(container).unwrap(); + assert_eq!(container.len(), NUM_THREADS * 7); + let mut new_container = ::MemContainer::default(); + TestStore1::load_from_file(&contract_keys_file_path, &mut new_container) + .expect("Failed to load from file"); + assert_eq!(new_container.len(), container.len()); + for i in [0, 10, 20, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "does not have non-deleted key: {}", + i + j + ); + } + for j in [3, 6, 9] { + assert!( + !new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "has deleted key: {}", + i + j + ); + } + } + } + + #[test] + fn test_concurrent_compaction() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let key_file_path = temp_dir.path().join("data.dat"); + std::fs::File::create(&key_file_path).expect("Failed to create file"); + + let num_threads = 4; + let barrier = std::sync::Arc::new(std::sync::Barrier::new(num_threads)); + + let container = ::MemContainer::default(); + + // Start multiple threads to run the compaction function concurrently + let handles: Vec<_> = [0, 10, 20, 30] + .into_iter() + .map(|i| { + let key_file_path = key_file_path.clone(); + let barrier = barrier.clone(); + let shared_data = container.clone(); + let mut file = BufWriter::new( + OpenOptions::new() + .read(true) + .append(true) + .open(&key_file_path) + .expect("Failed to open key file"), + ); + std::thread::spawn(move || { + barrier.wait(); + // concurrently creates/removes some data and compacts + if [10, 30].contains(&i) { + create_test_data(&mut file, &key_file_path, shared_data, i); + } else if let Err(err) = super::compact_index_file(&key_file_path) { + eprintln!("Thread encountered an error during compaction: {err}"); + return Err(err); + } + barrier.wait(); + // compact a last time so we know what data to compare against + super::compact_index_file(&key_file_path).map_err(|err| { + eprintln!("Thread encountered an error during compaction: {err}"); + err + }) + }) + }) + .collect(); + + // Wait for all threads to finish + for handle in handles { + handle + .join() + .expect("Thread panicked") + .expect("Compaction not completed"); + } + + let mut file = BufReader::new(File::open(key_file_path).expect("Couldn't open file")); + + let mut deleted = 0; + let mut keys = vec![]; + while let Ok(rec) = process_record(&mut file) { + match rec { + Some((key, _)) => { + if let StoreKey::ContractKey(key) = key { + keys.push(key); + } + } + None => { + deleted += 1; + } + } + } + assert_eq!(keys.len(), 14); + for i in [10, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + keys.contains(&[i + j; 32]), + "does not have non-deleted key: {}", + i + j + ); + } + } + assert_eq!(deleted, 0); // should be clean after compaction + } + + fn create_test_data( + file: &mut BufWriter, + test_path: &Path, + mut shared_data: ::MemContainer, + thread: u8, + ) { + for j in 0..10 { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let value = CodeHash::new([thread + j as u8; 32]); + TestStore1::insert(file, &mut shared_data, key, value).expect("Failed to update"); + } + for j in [3, 6, 9] { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let key_offset = shared_data.remove(&key).unwrap().1 .0; + TestStore1::remove(test_path, key_offset).expect("Failed to remove key"); + } + } } diff --git a/stdlib b/stdlib index 844880eb5..08d0017fc 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 844880eb5d5b1b38b60d152925ea24eb75d57053 +Subproject commit 08d0017fc84cf36eb9016ea6467cfb51c005328b