From 1a1a1981cb65c00a86cbfb88274d471869f4746d Mon Sep 17 00:00:00 2001 From: Milosz Muszynski Date: Wed, 16 Oct 2024 23:59:56 +0200 Subject: [PATCH] piecrust: introduced commit store, reduced commit cloning --- piecrust/src/store.rs | 192 +++++++++++++++++++++++----------- piecrust/src/store/session.rs | 5 + piecrust/src/vm.rs | 10 +- 3 files changed, 142 insertions(+), 65 deletions(-) diff --git a/piecrust/src/store.rs b/piecrust/src/store.rs index 9d4fc7fb..2973c71f 100644 --- a/piecrust/src/store.rs +++ b/piecrust/src/store.rs @@ -15,11 +15,12 @@ mod tree; use std::cell::Ref; use std::collections::btree_map::Entry::*; +use std::collections::btree_map::Keys; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Formatter}; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; -use std::sync::mpsc; +use std::sync::{mpsc, Arc, Mutex}; use std::{fs, io, thread}; use dusk_wasmtime::Engine; @@ -49,11 +50,12 @@ const MAIN_DIR: &str = "main"; /// A store for all contract commits. pub struct ContractStore { - sync_loop: thread::JoinHandle<()>, + sync_loop: Option>, engine: Engine, - call: mpsc::Sender, + call: Option>, root_dir: PathBuf, + pub commit_store: Arc>, } impl Debug for ContractStore { @@ -66,6 +68,38 @@ impl Debug for ContractStore { } } +pub struct CommitStore { + commits: BTreeMap, +} + +impl CommitStore { + pub fn new() -> Self { + Self { + commits: BTreeMap::new(), + } + } + + pub fn insert_commit(&mut self, hash: Hash, commit: Commit) { + self.commits.insert(hash, commit); + } + + pub fn get_commit(&self, hash: &Hash) -> Option<&Commit> { + self.commits.get(hash) + } + + pub fn contains_key(&self, hash: &Hash) -> bool { + self.commits.contains_key(hash) + } + + pub fn keys(&self) -> Keys<'_, Hash, Commit> { + self.commits.keys() + } + + pub fn remove_commit(&mut self, hash: &Hash) { + self.commits.remove(hash); + } +} + impl ContractStore { /// Loads a new contract store from the given `dir`ectory. /// @@ -81,30 +115,41 @@ impl ContractStore { fs::create_dir_all(root_dir)?; + Ok(Self { + sync_loop: None, + engine, + call: None, + root_dir: root_dir.into(), + commit_store: Arc::new(Mutex::new(CommitStore::new())), + }) + } + + pub fn finish_new(&mut self) -> io::Result<()> { + let loop_root_dir = self.root_dir.to_path_buf(); let (call, calls) = mpsc::channel(); - let commits = read_all_commits(&engine, root_dir)?; + let commit_store = self.commit_store.clone(); + + read_all_commits(&self.engine, self.root_dir.clone(), commit_store)?; - let loop_root_dir = root_dir.to_path_buf(); + let commit_store = self.commit_store.clone(); // The thread is given a name to allow for easily identifying it while // debugging. let sync_loop = thread::Builder::new() .name(String::from("PiecrustSync")) - .spawn(|| sync_loop(loop_root_dir, commits, calls))?; + .spawn(|| sync_loop(loop_root_dir, commit_store, calls))?; - Ok(Self { - sync_loop, - engine, - call, - root_dir: root_dir.into(), - }) + self.sync_loop = Some(sync_loop); + self.call = Some(call); + Ok(()) } /// Create a new [`ContractSession`] with the given `base` commit. /// /// Errors if the given base commit does not exist in the store. pub fn session(&self, base: Hash) -> io::Result { - let base_commit = self + tracing::trace!("session creation started"); + let base_commit_hash = self .call_with_replier(|replier| Call::CommitHold { base, replier }) .ok_or_else(|| { io::Error::new( @@ -113,7 +158,9 @@ impl ContractStore { ) })?; - Ok(self.session_with_base(Some(base_commit))) + let r = Ok(self.session_with_base(Some(base_commit_hash))); + tracing::trace!("session creation finished"); + r } /// Create a new [`ContractSession`] that has no base commit. @@ -154,7 +201,10 @@ impl ContractStore { /// Return the handle to the thread running the store's synchronization /// loop. pub fn sync_loop(&self) -> &thread::Thread { - self.sync_loop.thread() + self.sync_loop + .as_ref() + .expect("sync thread should exist") + .thread() } /// Return the path to the VM directory. @@ -168,21 +218,28 @@ impl ContractStore { { let (replier, receiver) = mpsc::sync_channel(1); - self.call.send(closure(replier)).expect( - "The receiver should never be dropped while there are senders", - ); + self.call + .as_ref() + .expect("call should exist") + .send(closure(replier)) + .expect( + "The receiver should never be dropped while there are senders", + ); receiver .recv() .expect("The sender should never be dropped without responding") } - fn session_with_base(&self, base: Option) -> ContractSession { + fn session_with_base(&self, base: Option) -> ContractSession { + let base_commit = base.and_then(|hash| { + self.commit_store.lock().unwrap().get_commit(&hash).cloned() // todo: clone here + }); ContractSession::new( &self.root_dir, self.engine.clone(), - base, - self.call.clone(), + base_commit, + self.call.as_ref().expect("call should exist").clone(), ) } } @@ -190,9 +247,9 @@ impl ContractStore { fn read_all_commits>( engine: &Engine, root_dir: P, -) -> io::Result> { + commit_store: Arc>, +) -> io::Result<()> { let root_dir = root_dir.as_ref(); - let mut commits = BTreeMap::new(); let root_dir = root_dir.join(MAIN_DIR); fs::create_dir_all(root_dir.clone())?; @@ -209,11 +266,11 @@ fn read_all_commits>( } let commit = read_commit(engine, entry.path())?; let root = *commit.root(); - commits.insert(root, commit); + commit_store.lock().unwrap().insert_commit(root, commit); } } - Ok(commits) + Ok(()) } fn read_commit>( @@ -529,20 +586,19 @@ pub(crate) enum Call { }, CommitHold { base: Hash, - replier: mpsc::SyncSender>, + replier: mpsc::SyncSender>, }, SessionDrop(Hash), } fn sync_loop>( root_dir: P, - commits: BTreeMap, + commit_store: Arc>, calls: mpsc::Receiver, ) { let root_dir = root_dir.as_ref(); let mut sessions = BTreeMap::new(); - let mut commits = commits; let mut delete_bag = BTreeMap::new(); @@ -556,8 +612,12 @@ fn sync_loop>( replier, } => { tracing::trace!("writing commit started"); - let io_result = - write_commit(root_dir, &mut commits, base, contracts); + let io_result = write_commit( + root_dir, + &mut commit_store.lock().unwrap(), + base, + contracts, + ); match &io_result { Ok(hash) => tracing::trace!( "writing commit finished: {:?}", @@ -570,7 +630,9 @@ fn sync_loop>( // Copy all commits and send them back to the caller. Call::GetCommits { replier } => { tracing::trace!("get commits started"); - let _ = replier.send(commits.keys().copied().collect()); + let _ = replier.send( + commit_store.lock().unwrap().keys().copied().collect(), + ); tracing::trace!("get commits finished"); } // Delete a commit from disk. If the commit is currently in use - as @@ -595,7 +657,7 @@ fn sync_loop>( } let io_result = delete_commit_dir(root_dir, root); - commits.remove(&root); + commit_store.lock().unwrap().remove_commit(&root); tracing::trace!("delete commit finished"); let _ = replier.send(io_result); } @@ -618,12 +680,13 @@ fn sync_loop>( continue; } - if let Some(commit) = commits.get(&root).cloned() { + let mut commit_store = commit_store.lock().unwrap(); + if let Some(commit) = commit_store.get_commit(&root) { tracing::trace!( "finalizing commit proper started {}", hex::encode(root.as_bytes()) ); - let io_result = finalize_commit(root, root_dir, &commit); + let io_result = finalize_commit(root, root_dir, commit); match &io_result { Ok(_) => tracing::trace!( "finalizing commit proper finished: {:?}", @@ -634,7 +697,7 @@ fn sync_loop>( e ), } - commits.remove(&root); + commit_store.remove_commit(&root); tracing::trace!("finalizing commit finished"); let _ = replier.send(io_result); } else { @@ -646,10 +709,10 @@ fn sync_loop>( // on a `Call::CommitDelete`. Call::CommitHold { base, replier } => { tracing::trace!("hold commit open session started"); - let base_commit = commits.get(&base).cloned(); - tracing::trace!("hold commit getting commit finished"); + let mut maybe_base = None; + if commit_store.lock().unwrap().contains_key(&base) { + maybe_base = Some(base); - if base_commit.is_some() { match sessions.entry(base) { Vacant(entry) => { entry.insert(1); @@ -661,7 +724,7 @@ fn sync_loop>( } tracing::trace!("hold commit open session finished"); - let _ = replier.send(base_commit); + let _ = replier.send(maybe_base); } // Signal that a session with a base commit has dropped and // decrements the hold count, once incremented using @@ -684,7 +747,7 @@ fn sync_loop>( for replier in entry.remove() { let io_result = delete_commit_dir(root_dir, base); - commits.remove(&base); + commit_store.lock().unwrap().remove_commit(&base); let _ = replier.send(io_result); } } @@ -700,25 +763,33 @@ fn sync_loop>( fn write_commit>( root_dir: P, - commits: &mut BTreeMap, + commit_store: &mut CommitStore, base: Option, commit_contracts: BTreeMap, ) -> io::Result { let root_dir = root_dir.as_ref(); - let index = base - .as_ref() - .map_or(NewContractIndex::new(), |base| base.index.clone()); - let contracts_merkle = - base.as_ref().map_or(ContractsMerkle::default(), |base| { - base.contracts_merkle.clone() - }); - let mut commit = Commit { - index, - contracts_merkle, - maybe_hash: base.as_ref().and_then(|base| base.maybe_hash), + let base_info = BaseInfo { + maybe_base: base.as_ref().map(|base| *base.root()), + ..Default::default() }; + // base is already a copy, no point cloning it again + + // let index = base + // .as_ref() + // .map_or(NewContractIndex::new(), |base| base.index.clone()); + // let contracts_merkle = + // base.as_ref().map_or(ContractsMerkle::default(), |base| { + // base.contracts_merkle.clone() + // }); + // let mut commit = Commit { + // index, + // contracts_merkle, + // maybe_hash: base.as_ref().and_then(|base| base.maybe_hash), + // }; + + let mut commit = base.unwrap_or(Commit::new()); for (contract_id, contract_data) in &commit_contracts { if contract_data.is_new { commit.remove_and_insert(*contract_id, &contract_data.memory); @@ -734,16 +805,15 @@ fn write_commit>( // Don't write the commit if it already exists on disk. This may happen if // the same transactions on the same base commit for example. - if commits.contains_key(&root) { + if commit_store.contains_key(&root) { return Ok(root); } - write_commit_inner(root_dir, &commit, commit_contracts, root_hex, base).map( - |_| { - commits.insert(root, commit); + write_commit_inner(root_dir, &commit, commit_contracts, root_hex, base_info) + .map(|_| { + commit_store.insert_commit(root, commit); root - }, - ) + }) } /// Writes a commit to disk. @@ -752,13 +822,9 @@ fn write_commit_inner, S: AsRef>( commit: &Commit, commit_contracts: BTreeMap, commit_id: S, - maybe_base: Option, + mut base_info: BaseInfo, ) -> io::Result<()> { let root_dir = root_dir.as_ref(); - let mut base_info = BaseInfo { - maybe_base: maybe_base.map(|base| *base.root()), - ..Default::default() - }; struct Directories { main_dir: PathBuf, diff --git a/piecrust/src/store/session.rs b/piecrust/src/store/session.rs index bab933da..b235f1fb 100644 --- a/piecrust/src/store/session.rs +++ b/piecrust/src/store/session.rs @@ -87,11 +87,13 @@ impl ContractSession { /// /// [`contract`]: ContractSession::contract pub fn root(&self) -> Hash { + tracing::trace!("root called commit cloning"); let mut commit = self.base.clone().unwrap_or(Commit::new()); for (contract, entry) in &self.contracts { commit.insert(*contract, &entry.memory); } let root = commit.root(); + tracing::trace!("root call finished"); *root } @@ -102,6 +104,7 @@ impl ContractSession { &self, contract: ContractId, ) -> Option> { + tracing::trace!("memory_pages called commit cloning"); let mut commit = self.base.clone().unwrap_or(Commit::new()); for (contract, entry) in &self.contracts { commit.insert(*contract, &entry.memory); @@ -134,6 +137,7 @@ impl ContractSession { /// /// [`contract`]: ContractSession::contract pub fn commit(&mut self) -> io::Result { + tracing::trace!("commit started"); let (replier, receiver) = mpsc::sync_channel(1); let mut contracts = BTreeMap::new(); @@ -153,6 +157,7 @@ impl ContractSession { replier, }) .expect("The receiver should never drop before sending"); + tracing::trace!("commit sent"); receiver .recv() diff --git a/piecrust/src/vm.rs b/piecrust/src/vm.rs index f4d36da9..05d77342 100644 --- a/piecrust/src/vm.rs +++ b/piecrust/src/vm.rs @@ -144,7 +144,10 @@ impl VM { "Configuration should be valid since its set at compile time", ); - let store = ContractStore::new(engine.clone(), root_dir) + let mut store = ContractStore::new(engine.clone(), root_dir) + .map_err(|err| PersistenceError(Arc::new(err)))?; + store + .finish_new() .map_err(|err| PersistenceError(Arc::new(err)))?; Ok(Self { @@ -171,7 +174,10 @@ impl VM { "Configuration should be valid since its set at compile time", ); - let store = ContractStore::new(engine.clone(), tmp) + let mut store = ContractStore::new(engine.clone(), tmp) + .map_err(|err| PersistenceError(Arc::new(err)))?; + store + .finish_new() .map_err(|err| PersistenceError(Arc::new(err)))?; Ok(Self {