Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

186394182 - minor store fixes #890

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/freenet-email-app/web/src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ mod tests {
minimum_tier: Tier::Hour1,
private_key,
},
key: ContractKey::from((&params.try_into()?, ContractCode::from([].as_slice()))),
key: ContractKey::from_params_and_code(&params.try_into()?, ContractCode::from([].as_slice())),
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ path = "src/bin/freenet.rs"
anyhow = "1"
asynchronous-codec = "0.6"
async-trait = "0.1"
arrayvec = { workspace = true }
axum = { version = "0.6", default-features = false, features = ["ws", "tower-log", "matched-path", "headers", "query", "http1"] }
bincode = "1"
blake3 = { workspace = true }
Expand All @@ -33,6 +32,7 @@ dashmap = "^5.5"
delegate = "0.10"
directories = "5"
either = { workspace = true , features = ["serde"] }
fs4 = "0.7"
futures = "0.3.21"
libp2p = { version = "0.52.3", features = [
"autonat",
Expand Down Expand Up @@ -90,6 +90,7 @@ pico-args = "0.5"
statrs = "0.16.0"
freenet-stdlib = { workspace = true, features = ["testing", "net"] }
chrono = { workspace = true, features = ["arbitrary"] }
tempfile = "3.8"

[features]
default = ["trace", "websocket", "sqlite"]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ impl Executor<Runtime> {
if let Some(notifiers) = self.update_notifications.get_mut(key) {
let summaries = self.subscriber_summaries.get_mut(key).unwrap();
// in general there should be less than 32 failures
let mut failures = arrayvec::ArrayVec::<_, 32>::new();
let mut failures = Vec::with_capacity(32);
for (peer_key, notifier) in notifiers.iter() {
let peer_summary = summaries.get_mut(peer_key).unwrap();
let update = match peer_summary {
Expand Down Expand Up @@ -1153,7 +1153,7 @@ impl Executor<Runtime> {
}
.into()))
{
let _ = failures.try_push(*peer_key);
failures.push(*peer_key);
tracing::error!(cli_id = %peer_key, "{err}");
} else {
tracing::debug!(cli_id = %peer_key, contract = %key, "notified of update");
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::{
};

use anyhow::bail;
use arrayvec::ArrayVec;
use dashmap::{mapref::one::Ref as DmRef, DashMap, DashSet};
use either::Either;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
Expand Down Expand Up @@ -695,7 +694,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
self.max_connections - open_connections,
)
Expand Down Expand Up @@ -739,7 +738,7 @@ impl Ring {
live_tx = self
.acquire_new(
ideal_location,
&missing.values().collect::<ArrayVec<_, { 5120 / 80 }>>(),
&missing.values().collect::<Vec<_>>(),
&notifier,
should_swap.len(),
)
Expand Down
112 changes: 45 additions & 67 deletions crates/core/src/runtime/contract_store.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,48 @@
use std::{fs::File, io::Write, iter::FromIterator, path::PathBuf, sync::Arc};
use std::{
fs::{File, OpenOptions},
io::{BufWriter, Write},
path::PathBuf,
sync::Arc,
};

use dashmap::DashMap;
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use stretto::Cache;

use super::{
error::RuntimeInnerError,
store::{StoreEntriesContainer, StoreFsManagement},
RuntimeResult,
};

#[derive(Serialize, Deserialize, Default)]
struct KeyToCodeMap(Vec<(ContractKey, CodeHash)>);

impl StoreEntriesContainer for KeyToCodeMap {
type MemContainer = Arc<DashMap<ContractKey, CodeHash>>;
type Key = ContractKey;
type Value = CodeHash;

fn update(self, container: &mut Self::MemContainer) {
for (k, v) in self.0 {
container.insert(k, v);
}
}

fn replace(container: &Self::MemContainer) -> Self {
KeyToCodeMap::from(&**container)
}

fn insert(container: &mut Self::MemContainer, key: Self::Key, value: Self::Value) {
container.insert(key, value);
}
}

impl From<&DashMap<ContractKey, CodeHash>> for KeyToCodeMap {
fn from(vals: &DashMap<ContractKey, CodeHash>) -> Self {
let mut map = vec![];
for r in vals.iter() {
map.push((r.key().clone(), *r.value()));
}
Self(map)
}
}
use super::{error::RuntimeInnerError, store::StoreFsManagement, RuntimeResult};

/// Handle contract blob storage on the file system.
pub struct ContractStore {
contracts_dir: PathBuf,
contract_cache: Cache<CodeHash, Arc<ContractCode<'static>>>,
key_to_code_part: Arc<DashMap<ContractKey, CodeHash>>,
key_to_code_part: Arc<DashMap<ContractInstanceId, (u64, CodeHash)>>,
index_file: BufWriter<File>,
}
// TODO: add functionality to delete old contracts which have not been used for a while
// to keep the total space used under a configured threshold

static LOCK_FILE_PATH: once_cell::sync::OnceCell<PathBuf> = once_cell::sync::OnceCell::new();
static KEY_FILE_PATH: once_cell::sync::OnceCell<PathBuf> = once_cell::sync::OnceCell::new();

impl StoreFsManagement<KeyToCodeMap> for ContractStore {}
impl StoreFsManagement for ContractStore {
type MemContainer = Arc<DashMap<ContractInstanceId, (u64, CodeHash)>>;
type Key = ContractInstanceId;
type Value = CodeHash;

fn insert_in_container(
container: &mut Self::MemContainer,
(key, offset): (Self::Key, u64),
value: Self::Value,
) {
container.insert(key, (offset, value));
}
}

impl ContractStore {
/// # Arguments
/// - max_size: max size in bytes of the contracts being cached
pub fn new(contracts_dir: PathBuf, max_size: i64) -> RuntimeResult<Self> {
const ERR: &str = "failed to build mem cache";
let key_to_code_part;
let _ = LOCK_FILE_PATH.try_insert(contracts_dir.join("__LOCK"));
// if the lock file exists is from a previous execution so is safe to delete it
let _ = std::fs::remove_file(LOCK_FILE_PATH.get().unwrap().as_path());
let mut key_to_code_part = Arc::new(DashMap::new());
let key_file = match KEY_FILE_PATH
.try_insert(contracts_dir.join("KEY_DATA"))
.map_err(|(e, _)| e)
Expand All @@ -79,24 +55,25 @@ impl ContractStore {
tracing::error!("error creating contract dir: {err}");
err
})?;
key_to_code_part = Arc::new(DashMap::new());
File::create(contracts_dir.join("KEY_DATA"))?;
} else {
let map = Self::load_from_file(
KEY_FILE_PATH.get().unwrap().as_path(),
LOCK_FILE_PATH.get().unwrap().as_path(),
Self::load_from_file(
KEY_FILE_PATH.get().expect("infallible").as_path(),
&mut key_to_code_part,
)?;
key_to_code_part = Arc::new(DashMap::from_iter(map.0));
}
Self::watch_changes(
key_to_code_part.clone(),
KEY_FILE_PATH.get().unwrap().as_path(),
LOCK_FILE_PATH.get().unwrap().as_path(),
KEY_FILE_PATH.get().expect("infallible").as_path(),
)?;

let index_file =
std::io::BufWriter::new(OpenOptions::new().append(true).read(true).open(key_file)?);
Ok(Self {
contract_cache: Cache::new(100, max_size).expect(ERR),
contracts_dir,
key_to_code_part,
index_file,
})
}

Expand All @@ -121,8 +98,8 @@ impl ContractStore {
return result;
}

self.key_to_code_part.get(key).and_then(|key| {
let code_hash = key.value();
self.key_to_code_part.get(key.id()).and_then(|key| {
let code_hash = key.value().1;
let path = code_hash.encode();
let key_path = self.contracts_dir.join(path).with_extension("wasm");
let ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract {
Expand All @@ -140,7 +117,7 @@ impl ContractStore {
};
// add back the contract part to the mem store
let size = data.data().len() as i64;
self.contract_cache.insert(*code_hash, data.clone(), size);
self.contract_cache.insert(code_hash, data.clone(), size);
Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1(
WrappedContract::new(data, params),
)))
Expand All @@ -162,15 +139,6 @@ impl ContractStore {
if self.contract_cache.get(code_hash).is_some() {
return Ok(());
}

Self::update(
&mut self.key_to_code_part,
key.clone(),
*code_hash,
KEY_FILE_PATH.get().unwrap(),
LOCK_FILE_PATH.get().unwrap().as_path(),
)?;

let key_path = code_hash.encode();
let key_path = self.contracts_dir.join(key_path).with_extension("wasm");
if let Ok((code, _ver)) = ContractCode::load_versioned_from_path(&key_path) {
Expand All @@ -185,10 +153,17 @@ impl ContractStore {
self.contract_cache
.insert(*code_hash, Arc::new(ContractCode::from(data)), size);

// save on disc
let version = APIVersion::from(contract);
let output: Vec<u8> = code.to_bytes_versioned(version)?;
let mut file = File::create(key_path)?;
file.write_all(output.as_slice())?;
Self::insert(
&mut self.index_file,
&mut self.key_to_code_part,
*key.id(),
*code_hash,
)?;

Ok(())
}
Expand All @@ -213,6 +188,9 @@ impl ContractStore {
RuntimeInnerError::UnwrapContract
})?,
};
if let Some((_, (offset, _))) = self.key_to_code_part.remove(key.id()) {
Self::remove(KEY_FILE_PATH.get().expect("infallible"), offset)?;
}
let key_path = self
.contracts_dir
.join(contract_hash.encode())
Expand All @@ -222,7 +200,7 @@ impl ContractStore {
}

pub fn code_hash_from_key(&self, key: &ContractKey) -> Option<CodeHash> {
self.key_to_code_part.get(key).map(|r| *r.value())
self.key_to_code_part.get(key.id()).map(|r| r.value().1)
}
}

Expand Down
Loading
Loading