Skip to content

Commit

Permalink
Merge pull request #890 from freenet/186394182-minor-store-fixes
Browse files Browse the repository at this point in the history
186394182 - Store fixes
  • Loading branch information
iduartgomez authored Nov 8, 2023
2 parents 35e937e + 92ec60b commit 6c633c4
Show file tree
Hide file tree
Showing 14 changed files with 766 additions and 330 deletions.
17 changes: 12 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/freenet-email-app/web/src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ mod tests {
minimum_tier: Tier::Hour1,
private_key,
},
key: ContractKey::from((&params.try_into()?, ContractCode::from([].as_slice()))),
key: ContractKey::from_params_and_code(&params.try_into()?, ContractCode::from([].as_slice())),
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ path = "src/bin/freenet.rs"
anyhow = "1"
asynchronous-codec = "0.6"
async-trait = "0.1"
arrayvec = { workspace = true }
axum = { version = "0.6", default-features = false, features = ["ws", "tower-log", "matched-path", "headers", "query", "http1"] }
bincode = "1"
blake3 = { workspace = true }
Expand All @@ -33,6 +32,7 @@ dashmap = "^5.5"
delegate = "0.10"
directories = "5"
either = { workspace = true , features = ["serde"] }
fs4 = "0.7"
futures = "0.3.21"
libp2p = { version = "0.52.3", features = [
"autonat",
Expand Down Expand Up @@ -90,6 +90,7 @@ pico-args = "0.5"
statrs = "0.16.0"
freenet-stdlib = { workspace = true, features = ["testing", "net"] }
chrono = { workspace = true, features = ["arbitrary"] }
tempfile = "3.8"

[features]
default = ["trace", "websocket", "sqlite"]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ impl Executor<Runtime> {
if let Some(notifiers) = self.update_notifications.get_mut(key) {
let summaries = self.subscriber_summaries.get_mut(key).unwrap();
// in general there should be less than 32 failures
let mut failures = arrayvec::ArrayVec::<_, 32>::new();
let mut failures = Vec::with_capacity(32);
for (peer_key, notifier) in notifiers.iter() {
let peer_summary = summaries.get_mut(peer_key).unwrap();
let update = match peer_summary {
Expand Down Expand Up @@ -1153,7 +1153,7 @@ impl Executor<Runtime> {
}
.into()))
{
let _ = failures.try_push(*peer_key);
failures.push(*peer_key);
tracing::error!(cli_id = %peer_key, "{err}");
} else {
tracing::debug!(cli_id = %peer_key, contract = %key, "notified of update");
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::{
};

use anyhow::bail;
use arrayvec::ArrayVec;
use dashmap::{mapref::one::Ref as DmRef, DashMap, DashSet};
use either::Either;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
Expand Down Expand Up @@ -695,7 +694,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
self.max_connections - open_connections,
)
Expand Down Expand Up @@ -739,7 +738,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
should_swap.len(),
)
Expand Down
112 changes: 45 additions & 67 deletions crates/core/src/runtime/contract_store.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,48 @@
use std::{fs::File, io::Write, iter::FromIterator, path::PathBuf, sync::Arc};
use std::{
fs::{File, OpenOptions},
io::{BufWriter, Write},
path::PathBuf,
sync::Arc,
};

use dashmap::DashMap;
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use stretto::Cache;

use super::{
error::RuntimeInnerError,
store::{StoreEntriesContainer, StoreFsManagement},
RuntimeResult,
};

#[derive(Serialize, Deserialize, Default)]
struct KeyToCodeMap(Vec<(ContractKey, CodeHash)>);

impl StoreEntriesContainer for KeyToCodeMap {
type MemContainer = Arc<DashMap<ContractKey, CodeHash>>;
type Key = ContractKey;
type Value = CodeHash;

fn update(self, container: &mut Self::MemContainer) {
for (k, v) in self.0 {
container.insert(k, v);
}
}

fn replace(container: &Self::MemContainer) -> Self {
KeyToCodeMap::from(&**container)
}

fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) {
container.insert(key, value);
}
}

impl From<&DashMap<ContractKey, CodeHash>> for KeyToCodeMap {
fn from(vals: &DashMap<ContractKey, CodeHash>) -> Self {
let mut map = vec![];
for r in vals.iter() {
map.push((r.key().clone(), *r.value()));
}
Self(map)
}
}
use super::{error::RuntimeInnerError, store::StoreFsManagement, RuntimeResult};

/// Handle contract blob storage on the file system.
pub struct ContractStore {
contracts_dir: PathBuf,
contract_cache: Cache<CodeHash, Arc<ContractCode<'static>>>,
key_to_code_part: Arc<DashMap<ContractKey, CodeHash>>,
key_to_code_part: Arc<DashMap<ContractInstanceId, (u64, CodeHash)>>,
index_file: BufWriter<File>,
}
// TODO: add functionality to delete old contracts which have not been used for a while
// to keep the total space used under a configured threshold

static LOCK_FILE_PATH: once_cell::sync::OnceCell<PathBuf> = once_cell::sync::OnceCell::new();
static KEY_FILE_PATH: once_cell::sync::OnceCell<PathBuf> = once_cell::sync::OnceCell::new();

impl StoreFsManagement<KeyToCodeMap> for ContractStore {}
impl StoreFsManagement for ContractStore {
type MemContainer = Arc<DashMap<ContractInstanceId, (u64, CodeHash)>>;
type Key = ContractInstanceId;
type Value = CodeHash;

fn insert_in_container(
container: &mut Self::MemContainer,
(key, offset): (Self::Key, u64),
value: Self::Value,
) {
container.insert(key, (offset, value));
}
}

impl ContractStore {
/// # Arguments
/// - max_size: max size in bytes of the contracts being cached
pub fn new(contracts_dir: PathBuf, max_size: i64) -> RuntimeResult<Self> {
const ERR: &str = "failed to build mem cache";
let key_to_code_part;
let _ = LOCK_FILE_PATH.try_insert(contracts_dir.join("__LOCK"));
// if the lock file exists is from a previous execution so is safe to delete it
let _ = std::fs::remove_file(LOCK_FILE_PATH.get().unwrap().as_path());
let mut key_to_code_part = Arc::new(DashMap::new());
let key_file = match KEY_FILE_PATH
.try_insert(contracts_dir.join("KEY_DATA"))
.map_err(|(e, _)| e)
Expand All @@ -79,24 +55,25 @@ impl ContractStore {
tracing::error!("error creating contract dir: {err}");
err
})?;
key_to_code_part = Arc::new(DashMap::new());
File::create(contracts_dir.join("KEY_DATA"))?;
} else {
let map = Self::load_from_file(
KEY_FILE_PATH.get().unwrap().as_path(),
LOCK_FILE_PATH.get().unwrap().as_path(),
Self::load_from_file(
KEY_FILE_PATH.get().expect("infallible").as_path(),
&mut key_to_code_part,
)?;
key_to_code_part = Arc::new(DashMap::from_iter(map.0));
}
Self::watch_changes(
key_to_code_part.clone(),
KEY_FILE_PATH.get().unwrap().as_path(),
LOCK_FILE_PATH.get().unwrap().as_path(),
KEY_FILE_PATH.get().expect("infallible").as_path(),
)?;

let index_file =
std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?);
Ok(Self {
contract_cache: Cache::new(100, max_size).expect(ERR),
contracts_dir,
key_to_code_part,
index_file,
})
}

Expand All @@ -121,8 +98,8 @@ impl ContractStore {
return result;
}

self.key_to_code_part.get(key).and_then(|key| {
let code_hash = key.value();
self.key_to_code_part.get(key.id()).and_then(|key| {
let code_hash = key.value().1;
let path = code_hash.encode();
let key_path = self.contracts_dir.join(path).with_extension("wasm");
let ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract {
Expand All @@ -140,7 +117,7 @@ impl ContractStore {
};
// add back the contract part to the mem store
let size = data.data().len() as i64;
self.contract_cache.insert(*code_hash, data.clone(), size);
self.contract_cache.insert(code_hash, data.clone(), size);
Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1(
WrappedContract::new(data, params),
)))
Expand All @@ -162,15 +139,6 @@ impl ContractStore {
if self.contract_cache.get(code_hash).is_some() {
return Ok(());
}

Self::update(
&mut self.key_to_code_part,
key.clone(),
*code_hash,
KEY_FILE_PATH.get().unwrap(),
LOCK_FILE_PATH.get().unwrap().as_path(),
)?;

let key_path = code_hash.encode();
let key_path = self.contracts_dir.join(key_path).with_extension("wasm");
if let Ok((code, _ver)) = ContractCode::load_versioned_from_path(&key_path) {
Expand All @@ -185,10 +153,17 @@ impl ContractStore {
self.contract_cache
.insert(*code_hash, Arc::new(ContractCode::from(data)), size);

// save on disc
let version = APIVersion::from(contract);
let output: Vec<u8> = code.to_bytes_versioned(version)?;
let mut file = File::create(key_path)?;
file.write_all(output.as_slice())?;
Self::insert(
&mut self.index_file,
&mut self.key_to_code_part,
*key.id(),
*code_hash,
)?;

Ok(())
}
Expand All @@ -213,6 +188,9 @@ impl ContractStore {
RuntimeInnerError::UnwrapContract
})?,
};
if let Some((_, (offset, _))) = self.key_to_code_part.remove(key.id()) {
Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?;
}
let key_path = self
.contracts_dir
.join(contract_hash.encode())
Expand All @@ -222,7 +200,7 @@ impl ContractStore {
}

pub fn code_hash_from_key(&self, key: &ContractKey) -> Option<CodeHash> {
self.key_to_code_part.get(key).map(|r| *r.value())
self.key_to_code_part.get(key.id()).map(|r| r.value().1)
}
}

Expand Down
Loading

0 comments on commit 6c633c4

Please sign in to comment.