diff --git a/Cargo.lock b/Cargo.lock index a2a170a16..3893d4257 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,9 +199,6 @@ name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -dependencies = [ - "serde", -] [[package]] name = "asn1-rs" @@ -1582,7 +1579,6 @@ version = "0.0.6" dependencies = [ "anyhow", "arbitrary", - "arrayvec", "async-trait", "asynchronous-codec", "axum", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2a36b407d..7af9ef236 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,7 +16,6 @@ path = "src/bin/freenet.rs" anyhow = "1" asynchronous-codec = "0.6" async-trait = "0.1" -arrayvec = { workspace = true } axum = { version = "0.6", default-features = false, features = ["ws", "tower-log", "matched-path", "headers", "query", "http1"] } bincode = "1" blake3 = { workspace = true } diff --git a/crates/core/src/runtime/store.rs b/crates/core/src/runtime/store.rs index c83bcd234..4c76c60e9 100644 --- a/crates/core/src/runtime/store.rs +++ b/crates/core/src/runtime/store.rs @@ -1,18 +1,19 @@ -use arrayvec::ArrayVec; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use either::Either; use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey}; use notify::Watcher; use std::fs::{self, OpenOptions}; use std::io::{BufReader, BufWriter, Seek, Write}; use std::path::Path; +use std::time::Duration; use std::{fs::File, io::Read}; -use super::RuntimeResult; use crate::DynError; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; +#[derive(Debug)] pub(super) enum StoreKey { ContractKey([u8; INTERNAL_KEY]), DelegateKey { @@ -76,11 +77,14 @@ pub(super) trait StoreFsManagement { key_file_path: &Path, ) -> Result<(), DynError> { let key_path = key_file_path.to_path_buf(); + let key_path_cp = key_path.clone(); let mut watcher = notify::recommended_watcher( move |res: Result| match res { Ok(ev) => { if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = ev.kind { - if let Err(err) = Self::load_from_file(key_path.as_path(), &mut container) { + if let Err(err) = + Self::load_from_file(key_path_cp.as_path(), &mut container) + { tracing::error!("{err}") } } @@ -88,6 +92,12 @@ pub(super) trait StoreFsManagement { Err(err) => tracing::error!("{err}"), }, )?; + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(5 * 60)); + if let Err(err) = compact_index_file(&key_path) { + tracing::warn!("Failed index file ({key_path:?}) compaction: {err}"); + } + }); watcher.watch(key_file_path, notify::RecursiveMode::NonRecursive)?; Ok(()) } @@ -97,32 +107,23 @@ pub(super) trait StoreFsManagement { mem_container: &mut Self::MemContainer, key: Self::Key, value: Self::Value, - ) -> RuntimeResult<()> + ) -> std::io::Result<()> where StoreKey: From, { // The full key is the tombstone marker byte + kind + [internal key content] + size of value let internal_key: StoreKey = key.clone().into(); - file.write_u8(false as u8)?; - match internal_key { - StoreKey::ContractKey(key) => { - file.write_u8(KeyType::Contract as u8)?; - file.write_all(&key)?; - } - StoreKey::DelegateKey { key, code_hash } => { - file.write_u8(KeyType::Delegate as u8)?; - file.write_all(&key)?; - file.write_all(&code_hash)?; - } - } - file.write_u32::(value.as_ref().len() as u32)?; - file.write_all(value.as_ref())?; - file.flush()?; - Self::insert_in_container(mem_container, (key, file.stream_position()?), value); + let offset = insert_record(file, internal_key, value.as_ref())?; + println!( + "inserted: {:?} @ {offset}", + Into::::into(key.clone()) + ); + Self::insert_in_container(mem_container, (key, offset), value); Ok(()) } - fn remove(key_file_path: &Path, key_offset: u64) -> RuntimeResult<()> { + fn remove(key_file_path: &Path, key_offset: u64) -> std::io::Result<()> { + println!("removing key @ {key_offset}"); let mut file = std::fs::OpenOptions::new() .write(true) .read(true) @@ -136,59 +137,18 @@ pub(super) trait StoreFsManagement { fn load_from_file( key_file_path: &Path, container: &mut Self::MemContainer, - ) -> RuntimeResult<()> { + ) -> std::io::Result<()> { let mut file = BufReader::new(File::open(key_file_path)?); - let mut first_key_part = - [0u8; TOMBSTONE_MARKER + std::mem::size_of::() + INTERNAL_KEY]; let mut key_cursor = 0; - while file.read_exact(&mut first_key_part).is_ok() { - let deleted = first_key_part[0] != 0; - let key_type = match first_key_part[1] { - 0 => KeyType::Contract, - 1 => KeyType::Delegate, - _ => unreachable!(), - }; - if !deleted { - let store_key = match key_type { - KeyType::Contract => { - let mut contract_key = [0; INTERNAL_KEY]; - contract_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - StoreKey::ContractKey(contract_key) - } - KeyType::Delegate => { - let mut delegate_key = [0; INTERNAL_KEY]; - let mut code_hash = [0; INTERNAL_KEY]; - delegate_key.copy_from_slice(&first_key_part[2..2 + INTERNAL_KEY]); - file.read_exact(&mut code_hash)?; - StoreKey::DelegateKey { - key: delegate_key, - code_hash, - } - } - }; - let value_len = file.read_u32::()?; - let value = { - if value_len == 32 { - let buf = &mut ArrayVec::::from([0; 32]); - file.read_exact(&mut *buf)?; - Self::Value::try_from(&*buf)? - } else { - let mut buf = vec![0; value_len as usize]; - file.read_exact(&mut buf)?; - Self::Value::try_from(&buf)? - } - }; - Self::insert_in_container(container, (store_key.into(), key_cursor), value); - } else { - let skip = match key_type { - KeyType::Contract => file.read_u32::()?, - KeyType::Delegate => { - // skip the code hash part - file.seek_relative(32)?; - file.read_u32::()? - } - }; - file.seek_relative(skip as i64)?; + while let Ok(rec) = process_record(&mut file) { + if let Some((store_key, value)) = rec { + println!("loading {store_key:?} @ {key_cursor}"); + let store_key = store_key.into(); + let value = match value { + Either::Left(v) => Self::Value::try_from(&v), + Either::Right(v) => Self::Value::try_from(&v), + }?; + Self::insert_in_container(container, (store_key, key_cursor), value); } key_cursor = file.stream_position()?; } @@ -196,42 +156,183 @@ pub(super) trait StoreFsManagement { } } -pub fn compact_and_backup_file( - key_file_path: &Path, - compact_function: fn(&mut BufWriter) -> std::io::Result<()>, -) -> std::io::Result<()> { +/// Inserts a new record and returns the offset +fn insert_record(file: &mut BufWriter, key: StoreKey, value: &[u8]) -> std::io::Result { + // The full key is the tombstone marker byte + kind + [internal key content] + size of value + file.write_u8(false as u8)?; + let mut traversed = 1; + match key { + StoreKey::ContractKey(key) => { + file.write_u8(KeyType::Contract as u8)?; + file.write_all(&key)?; + } + StoreKey::DelegateKey { key, code_hash } => { + file.write_u8(KeyType::Delegate as u8)?; + file.write_all(&key)?; + file.write_all(&code_hash)?; + traversed += 32; // additional code_hash bytes + } + } + traversed += 1 + 32; // key + type marker + file.write_u32::(value.as_ref().len() as u32)?; + traversed += std::mem::size_of::(); + file.write_all(value)?; + traversed += value.len(); + file.flush()?; + let current_offset = file.stream_position()?; + Ok(current_offset - traversed as u64) +} + +#[allow(clippy::type_complexity)] +fn process_record( + reader: &mut BufReader, +) -> std::io::Result>)>> +where + T: Read + Seek, +{ + let mut key_part = [0u8; TOMBSTONE_MARKER + std::mem::size_of::()]; + reader.read_exact(&mut key_part)?; + + let deleted = key_part[0] != 0; + let key_type = match key_part[1] { + 0 => KeyType::Contract, + 1 => KeyType::Delegate, + _ => unreachable!(), + }; + + if !deleted { + let store_key = match key_type { + KeyType::Contract => { + let mut contract_key = [0; INTERNAL_KEY]; + reader.read_exact(&mut contract_key)?; + StoreKey::ContractKey(contract_key) + } + KeyType::Delegate => { + let mut delegate_key = [0; INTERNAL_KEY]; + let mut code_hash = [0; INTERNAL_KEY]; + reader.read_exact(&mut delegate_key)?; + reader.read_exact(&mut code_hash)?; + StoreKey::DelegateKey { + key: delegate_key, + code_hash, + } + } + }; + + // Write the value part + let value_len = reader.read_u32::()?; + let value = if value_len == 32 { + let mut value = [0u8; 32]; + reader.read_exact(&mut value)?; + Either::Left(value) + } else { + let mut value = vec![0u8; value_len as usize]; + reader.read_exact(&mut value)?; + Either::Right(value) + }; + Ok(Some((store_key, value))) + } else { + // Skip the record if deleted + let value_len = match key_type { + KeyType::Contract => { + reader.seek_relative(32)?; // skip the actual key + reader.read_u32::()? // get the value len + } + KeyType::Delegate => { + reader.seek_relative(32)?; // skip the delegate key + reader.seek_relative(32)?; // skip the code hash + reader.read_u32::()? // get the value len + } + }; + reader.seek_relative(value_len as i64)?; + Ok(None) + } +} + +fn compact_index_file(key_file_path: &Path) -> std::io::Result<()> { use fs4::FileExt; - // Create a backup copy of the original file - let backup_file_path = key_file_path.with_extension("bak"); - fs::copy(key_file_path, &backup_file_path)?; - // Lock the original file exclusively let original_file = OpenOptions::new() .truncate(false) - .write(true) + .read(true) .open(key_file_path)?; + + // Lock the original file exclusively if original_file.try_lock_exclusive().is_err() { return Ok(()); } - // Perform the compaction process - let mut file_writer = BufWriter::new(original_file); - if let Err(error) = compact_function(&mut file_writer) { - // Restore the original file from the backup copy - fs::copy(&backup_file_path, key_file_path)?; - - // Clean up the backup file - fs::remove_file(&backup_file_path)?; + // Create a new temporary file to write compacted data + let temp_file_path = key_file_path.with_extension("tmp"); + let temp_file = OpenOptions::new() + .create(true) + .write(true) + .open(&temp_file_path) + .map_err(|e| { + let _ = original_file.unlock(); + e + })?; + + // Lock the temporary file exclusively + temp_file.try_lock_exclusive()?; + + // Read the original file and compact data into the temp file + let mut original_reader = BufReader::new(original_file); + let mut temp_writer = BufWriter::new(temp_file); + + let mut any_deleted = false; // Track if any deleted records were found + + loop { + match process_record(&mut original_reader) { + Ok(Some((store_key, value))) => { + let value = match &value { + Either::Left(v) => v.as_slice(), + Either::Right(v) => v.as_slice(), + }; + if let Err(err) = insert_record(&mut temp_writer, store_key, value) { + // Handle the error gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(err); + } + } + Ok(None) => { + // Skip record + any_deleted = true; // A deleted record was found + } + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { + // Done + break; + } + Err(other) => { + // Handle other errors gracefully + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Err(other); + } + } + } - return Err(error); + // Check if any deleted records were found; if not, skip compaction + if !any_deleted { + let _ = original_reader.into_inner().unlock(); + let _ = temp_writer.into_inner().map(|f| f.unlock()); + let _ = fs::remove_file(&temp_file_path); + return Ok(()); } - // Release the lock when compaction is complete - let original_file = file_writer.into_inner()?; + // Clean up and finalize the compaction process + let original_file = original_reader.into_inner(); original_file.unlock()?; + temp_writer.flush()?; + let _ = temp_writer.into_inner().map(|f| f.unlock()); + std::mem::drop(original_file); - // Clean up the backup file if compaction is successful - fs::remove_file(&backup_file_path)?; + // Replace the original file with the temporary file + fs::rename(&temp_file_path, key_file_path)?; + let _ = fs::remove_file(&temp_file_path); Ok(()) } @@ -371,7 +472,7 @@ mod tests { let mut handles = vec![]; for i in [0, 10, 20, 30] { - let mut shared_data = container.clone(); + let shared_data = container.clone(); let barrier = barrier.clone(); let mut file = BufWriter::new( OpenOptions::new() @@ -380,21 +481,11 @@ mod tests { .open(&temp_dir.path().join("contract_keys")) .expect("Failed to open key file"), ); - let p = contract_keys_file_path.clone(); + let key_file_path = contract_keys_file_path.clone(); let handle = std::thread::spawn(move || { // Wait for all threads to reach this point barrier.wait(); - for j in 0..10 { - let key = ContractInstanceId::new([i + j as u8; 32]); - let value = CodeHash::new([i + j as u8; 32]); - TestStore1::insert(&mut file, &mut shared_data, key, value) - .expect("Failed to update"); - } - for j in [3, 6, 9] { - let key = ContractInstanceId::new([i + j as u8; 32]); - let key_offset = shared_data.remove(&key).unwrap().1 .0; - TestStore1::remove(&p, key_offset).expect("Failed to remove key"); - } + create_test_data(&mut file, &key_file_path, shared_data, i) }); handles.push(handle); } @@ -412,5 +503,120 @@ mod tests { TestStore1::load_from_file(&contract_keys_file_path, &mut new_container) .expect("Failed to load from file"); assert_eq!(new_container.len(), container.len()); + for i in [0, 10, 20, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "does not have non-deleted key: {}", + i + j + ); + } + for j in [3, 6, 9] { + assert!( + !new_container.contains_key(&ContractInstanceId::new([i + j; 32])), + "has deleted key: {}", + i + j + ); + } + } + } + + #[test] + fn test_concurrent_compaction() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let key_file_path = temp_dir.path().join("data.dat"); + std::fs::File::create(&key_file_path).expect("Failed to create file"); + + let num_threads = 4; + let barrier = std::sync::Arc::new(std::sync::Barrier::new(num_threads)); + + let container = ::MemContainer::default(); + + // Start multiple threads to run the compaction function concurrently + let handles: Vec<_> = [0, 10, 20, 30] + .into_iter() + .map(|i| { + let key_file_path = key_file_path.clone(); + let barrier = barrier.clone(); + let shared_data = container.clone(); + let mut file = BufWriter::new( + OpenOptions::new() + .read(true) + .append(true) + .open(&key_file_path) + .expect("Failed to open key file"), + ); + std::thread::spawn(move || { + barrier.wait(); + // concurrently creates/removes some data and compacts + if [10, 30].contains(&i) { + create_test_data(&mut file, &key_file_path, shared_data, i); + } else if let Err(err) = super::compact_index_file(&key_file_path) { + eprintln!("Thread encountered an error during compaction: {err}"); + return Err(err); + } + barrier.wait(); + // compact a last time so we know what data to compare against + super::compact_index_file(&key_file_path).map_err(|err| { + eprintln!("Thread encountered an error during compaction: {err}"); + err + }) + }) + }) + .collect(); + + // Wait for all threads to finish + for handle in handles { + handle + .join() + .expect("Thread panicked") + .expect("Compaction not completed"); + } + + let mut file = BufReader::new(File::open(key_file_path).expect("Couldn't open file")); + + let mut deleted = 0; + let mut keys = vec![]; + while let Ok(rec) = process_record(&mut file) { + match rec { + Some((key, _)) => { + if let StoreKey::ContractKey(key) = key { + keys.push(key); + } + } + None => { + deleted += 1; + } + } + } + assert_eq!(keys.len(), 14); + for i in [10, 30] { + for j in [0, 1, 2, 4, 5, 7, 8] { + assert!( + keys.contains(&[i + j; 32]), + "does not have non-deleted key: {}", + i + j + ); + } + } + assert_eq!(deleted, 0); // should be clean after compaction + } + + fn create_test_data( + file: &mut BufWriter, + test_path: &Path, + mut shared_data: ::MemContainer, + thread: u8, + ) { + for j in 0..10 { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let value = CodeHash::new([thread + j as u8; 32]); + TestStore1::insert(file, &mut shared_data, key, value).expect("Failed to update"); + } + for j in [3, 6, 9] { + let key = ContractInstanceId::new([thread + j as u8; 32]); + let key_offset = shared_data.remove(&key).unwrap().1 .0; + TestStore1::remove(test_path, key_offset).expect("Failed to remove key"); + } } }