Skip to content

Commit

Permalink
wip: add compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 7, 2023
1 parent 55ab087 commit a433c41
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 8 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dashmap = "^5.5"
delegate = "0.10"
directories = "5"
either = { workspace = true , features = ["serde"] }
fs4 = "0.7"
futures = "0.3.21"
libp2p = { version = "0.52.3", features = [
"autonat",
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
self.max_connections - open_connections,
)
Expand Down Expand Up @@ -738,7 +738,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
should_swap.len(),
)
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/runtime/contract_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl ContractStore {
return result;
}

self.key_to_code_part.get(&key.id()).and_then(|key| {
self.key_to_code_part.get(key.id()).and_then(|key| {
let code_hash = key.value().1;
let path = code_hash.encode();
let key_path = self.contracts_dir.join(path).with_extension("wasm");
Expand Down Expand Up @@ -161,7 +161,7 @@ impl ContractStore {
Self::insert(
&mut self.index_file,
&mut self.key_to_code_part,
key.id(),
*key.id(),
*code_hash,
)?;

Expand All @@ -188,7 +188,7 @@ impl ContractStore {
RuntimeInnerError::UnwrapContract
})?,
};
if let Some((_, (offset, _))) = self.key_to_code_part.remove(&key.id()) {
if let Some((_, (offset, _))) = self.key_to_code_part.remove(key.id()) {
Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?;
}
let key_path = self
Expand All @@ -200,7 +200,7 @@ impl ContractStore {
}

pub fn code_hash_from_key(&self, key: &ContractKey) -> Option<CodeHash> {
self.key_to_code_part.get(&key.id()).map(|r| r.value().1)
self.key_to_code_part.get(key.id()).map(|r| r.value().1)
}
}

Expand Down
102 changes: 101 additions & 1 deletion crates/core/src/runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arrayvec::ArrayVec;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, DelegateKey};
use notify::Watcher;
use std::fs::{self, OpenOptions};
use std::io::{BufReader, BufWriter, Seek, Write};
use std::path::Path;
use std::{fs::File, io::Read};
Expand Down Expand Up @@ -195,9 +196,52 @@ pub(super) trait StoreFsManagement {
}
}

pub fn compact_and_backup_file(
key_file_path: &Path,
compact_function: fn(&mut BufWriter<File>) -> std::io::Result<()>,
) -> std::io::Result<()> {
use fs4::FileExt;
// Create a backup copy of the original file
let backup_file_path = key_file_path.with_extension("bak");
fs::copy(key_file_path, &backup_file_path)?;

// Lock the original file exclusively
let original_file = OpenOptions::new()
.truncate(false)
.write(true)
.open(key_file_path)?;
if original_file.try_lock_exclusive().is_err() {
return Ok(());
}

// Perform the compaction process
let mut file_writer = BufWriter::new(original_file);
if let Err(error) = compact_function(&mut file_writer) {
// Restore the original file from the backup copy
fs::copy(&backup_file_path, key_file_path)?;

// Clean up the backup file
fs::remove_file(&backup_file_path)?;

return Err(error);
}

// Release the lock when compaction is complete
let original_file = file_writer.into_inner()?;
original_file.unlock()?;

// Clean up the backup file if compaction is successful
fs::remove_file(&backup_file_path)?;

Ok(())
}

#[cfg(test)]
mod tests {
use std::{fs::OpenOptions, sync::Arc};
use std::{
fs::OpenOptions,
sync::{Arc, Barrier},
};

use super::*;
use dashmap::DashMap;
Expand Down Expand Up @@ -313,4 +357,60 @@ mod tests {
assert!(loaded_value_1.is_none(), "Key still exists");
}
}

#[test]
fn test_concurrent_updates() {
const NUM_THREADS: usize = 4;

let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let contract_keys_file_path = temp_dir.path().join("contract_keys");
std::fs::File::create(&contract_keys_file_path).expect("Failed to create file");

let container = <TestStore1 as StoreFsManagement>::MemContainer::default();
let barrier = Arc::new(Barrier::new(NUM_THREADS));

let mut handles = vec![];
for i in [0, 10, 20, 30] {
let mut shared_data = container.clone();
let barrier = barrier.clone();
let mut file = BufWriter::new(
OpenOptions::new()
.read(true)
.append(true)
.open(&temp_dir.path().join("contract_keys"))
.expect("Failed to open key file"),
);
let p = contract_keys_file_path.clone();
let handle = std::thread::spawn(move || {
// Wait for all threads to reach this point
barrier.wait();
for j in 0..10 {
let key = ContractInstanceId::new([i + j as u8; 32]);
let value = CodeHash::new([i + j as u8; 32]);
TestStore1::insert(&mut file, &mut shared_data, key, value)
.expect("Failed to update");
}
for j in [3, 6, 9] {
let key = ContractInstanceId::new([i + j as u8; 32]);
let key_offset = shared_data.remove(&key).unwrap().1 .0;
TestStore1::remove(&p, key_offset).expect("Failed to remove key");
}
});
handles.push(handle);
}

// Wait for all threads to finish
for handle in handles {
handle.join().expect("Thread panicked");
}

// Assert the correctness of append-only updates in the shared data
// Check if the shared data contains the expected content after updates
let container = Arc::try_unwrap(container).unwrap();
assert_eq!(container.len(), NUM_THREADS * 7);
let mut new_container = <TestStore1 as StoreFsManagement>::MemContainer::default();
TestStore1::load_from_file(&contract_keys_file_path, &mut new_container)
.expect("Failed to load from file");
assert_eq!(new_container.len(), container.len());
}
}
2 changes: 1 addition & 1 deletion stdlib

0 comments on commit a433c41

Please sign in to comment.