Skip to content

Commit

Permalink
piecrust: added tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
miloszm committed Oct 9, 2024
1 parent 6860464 commit 0921918
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 32 deletions.
1 change: 1 addition & 0 deletions piecrust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ rand = "0.8"
hex = "0.4"
dusk-merkle = { version = "0.5", features = ["rkyv-impl"] }
const-decoder = "0.3"
tracing = "=0.1.40"

[dev-dependencies]
once_cell = "1.18"
Expand Down
116 changes: 84 additions & 32 deletions piecrust/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ fn commit_from_dir<P: AsRef<Path>>(
fn index_from_path<P: AsRef<Path>>(path: P) -> io::Result<NewContractIndex> {
let path = path.as_ref();

tracing::trace!("reading index file started");
let index_bytes = fs::read(path)?;
tracing::trace!("reading index file finished");

let parent_name = path
.parent()
Expand Down Expand Up @@ -400,12 +402,15 @@ fn index_from_path<P: AsRef<Path>>(path: P) -> io::Result<NewContractIndex> {
return Ok(new_contract_index);
}

tracing::trace!("deserializing index file started");
let index = rkyv::from_bytes(&index_bytes).map_err(|err| {
tracing::trace!("deserializing index file failed {}", err);
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid index file \"{path:?}\": {err}"),
)
})?;
tracing::trace!("deserializing index file finished");

Ok(index)
}
Expand Down Expand Up @@ -467,25 +472,39 @@ fn sync_loop<P: AsRef<Path>>(

for call in calls {
match call {
// Writes a session to disk and adds it to the map of existing commits.
// Writes a session to disk and adds it to the map of existing
// commits.
Call::Commit {
contracts,
base,
replier,
} => {
let io_result = write_commit(root_dir, &mut commits, base, contracts);
tracing::trace!("writing commit started");
let io_result =
write_commit(root_dir, &mut commits, base, contracts);
match &io_result {
Ok(hash) => tracing::trace!(
"writing commit finished: {:?}",
hex::encode(hash.as_bytes())
),
Err(e) => tracing::trace!("writing commit failed {:?}", e),
}
let _ = replier.send(io_result);
}
// Copy all commits and send them back to the caller.
Call::GetCommits {
replier
} => {
Call::GetCommits { replier } => {
tracing::trace!("get commits started");
let _ = replier.send(commits.keys().copied().collect());
tracing::trace!("get commits finished");
}
// Delete a commit from disk. If the commit is currently in use - as
// in it is held by at least one session using `Call::SessionHold` -
// queue it for deletion once no session is holding it.
Call::CommitDelete { commit: root, replier } => {
Call::CommitDelete {
commit: root,
replier,
} => {
tracing::trace!("delete commit started");
if sessions.contains_key(&root) {
match delete_bag.entry(root) {
Vacant(entry) => {
Expand All @@ -501,10 +520,15 @@ fn sync_loop<P: AsRef<Path>>(

let io_result = delete_commit_dir(root_dir, root);
commits.remove(&root);
tracing::trace!("delete commit finished");
let _ = replier.send(io_result);
}
// Finalize commit
Call::CommitFinalize { commit: root, replier } => {
Call::CommitFinalize {
commit: root,
replier,
} => {
tracing::trace!("finalizing commit started");
if sessions.contains_key(&root) {
match delete_bag.entry(root) {
Vacant(entry) => {
Expand All @@ -519,20 +543,35 @@ fn sync_loop<P: AsRef<Path>>(
}

if let Some(commit) = commits.get(&root).cloned() {
tracing::trace!(
"finalizing commit proper started {}",
hex::encode(root.as_bytes())
);
let io_result = finalize_commit(root, root_dir, &commit);
match &io_result {
Ok(_) => tracing::trace!(
"finalizing commit proper finished: {:?}",
hex::encode(root.as_bytes())
),
Err(e) => tracing::trace!(
"finalizing commit proper failed {:?}",
e
),
}
commits.remove(&root);
tracing::trace!("finalizing commit finished");
let _ = replier.send(io_result);
} else {
tracing::trace!("finalizing commit finished");
let _ = replier.send(Ok(()));
}
}
// Increment the hold count of a commit to prevent it from deletion
// on a `Call::CommitDelete`.
Call::CommitHold {
base,
replier,
} => {
Call::CommitHold { base, replier } => {
tracing::trace!("hold commit open session started");
let base_commit = commits.get(&base).cloned();
tracing::trace!("hold commit getting commit finished");

if base_commit.is_some() {
match sessions.entry(base) {
Expand All @@ -544,36 +583,41 @@ fn sync_loop<P: AsRef<Path>>(
}
}
}
tracing::trace!("hold commit open session finished");

let _ = replier.send(base_commit);
}
// Signal that a session with a base commit has dropped and
// decrements the hold count, once incremented using
// `Call::SessionHold`. If this is the last session that held that
// commit, and there are queued deletions, execute them.
Call::SessionDrop(base) => match sessions.entry(base) {
Vacant(_) => unreachable!("If a session is dropped there must be a session hold entry"),
Occupied(mut entry) => {
*entry.get_mut() -= 1;

if *entry.get() == 0 {
entry.remove();

// Try all deletions first
match delete_bag.entry(base) {
Vacant(_) => {}
Occupied(entry) => {
for replier in entry.remove() {
let io_result =
delete_commit_dir(root_dir, base);
commits.remove(&base);
let _ = replier.send(io_result);
Call::SessionDrop(base) => {
tracing::trace!("session drop started");
match sessions.entry(base) {
Vacant(_) => unreachable!("If a session is dropped there must be a session hold entry"),
Occupied(mut entry) => {
*entry.get_mut() -= 1;

if *entry.get() == 0 {
entry.remove();

// Try all deletions first
match delete_bag.entry(base) {
Vacant(_) => {}
Occupied(entry) => {
for replier in entry.remove() {
let io_result =
delete_commit_dir(root_dir, base);
commits.remove(&base);
let _ = replier.send(io_result);
}
}
}
}
}
}
},
};
tracing::trace!("session drop finished");
}
}
}
}
Expand All @@ -598,8 +642,10 @@ fn write_commit<P: AsRef<Path>>(
}
}

tracing::trace!("calculating root started");
let root = *index.root();
let root_hex = hex::encode(root);
tracing::trace!("calculating root finished");

// Don't write the commit if it already exists on disk. This may happen if
// the same transactions on the same base commit for example.
Expand All @@ -624,8 +670,10 @@ fn write_commit_inner<P: AsRef<Path>, S: AsRef<str>>(
maybe_base: Option<Commit>,
) -> io::Result<Commit> {
let root_dir = root_dir.as_ref();
let mut base_info = BaseInfo::default();
base_info.maybe_base = maybe_base.map(|base| *base.index.root());
let mut base_info = BaseInfo {
maybe_base: maybe_base.map(|base| *base.index.root()),
..Default::default()
};

struct Directories {
main_dir: PathBuf,
Expand Down Expand Up @@ -696,13 +744,17 @@ fn write_commit_inner<P: AsRef<Path>, S: AsRef<str>>(

let index_main_path =
index_path_main(directories.main_dir.clone(), commit_id.as_ref())?;
tracing::trace!("serializing index started");
let index_bytes = rkyv::to_bytes::<_, 128>(&index).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed serializing index file: {err}"),
)
})?;
tracing::trace!("serializing index finished");
tracing::trace!("writing index file started");
fs::write(index_main_path.clone(), index_bytes)?;
tracing::trace!("writing index file finished");

let base_main_path =
base_path_main(directories.main_dir, commit_id.as_ref())?;
Expand Down

0 comments on commit 0921918

Please sign in to comment.