Skip to content

Commit

Permalink
WIP log based debugging, catch situation where current snapshot is be…
Browse files Browse the repository at this point in the history
…ing deleted
  • Loading branch information
issackelly committed Sep 15, 2023
1 parent dc3d23c commit 3b13c37
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
27 changes: 24 additions & 3 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,40 @@ impl FsStore {
// Load all the data we have into a doc
match Chunks::load(&self.root, id) {
Ok(Some(chunks)) => {
println!("hmm...");
let doc = chunks
.to_doc()
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(&self.root, &paths, &chunk, output_chunk_name)?;
println!("Going to write: {:#?}", output_chunk_name);
write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
println!("Removing {:?}", path);
std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
let just_wrote = paths.chunk_path(&self.root, &output_chunk_name);
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);
println!("Removing Snap {:?}", path);

if path == just_wrote {
tracing::error!("Somehow trying to delete the same path we just wrote to. Not today satan");
continue;
}

std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
}
Ok(None) => {
println!("No existing files,and compaction requested first");
let output_chunk_name = SavedChunkName {
hash: uuid::Uuid::new_v4().as_bytes().to_vec(),
chunk_type: ChunkType::Snapshot,
Expand All @@ -187,6 +199,7 @@ impl FsStore {
write_chunk(&self.root, &paths, full_doc, output_chunk_name)?;
}
Err(e) => {
println!("Error loading chunks for {:?} {}", self.root, id);
tracing::error!(e=%e, "Error loading chunks");
}
}
Expand Down Expand Up @@ -219,6 +232,10 @@ fn write_chunk(
// Move the temporary file into a snapshot in the document data directory
// with a name based on the hash of the heads of the document
let output_path = paths.chunk_path(root, &name);

tracing::warn!("Renaming: {:?}", temp_save);
tracing::warn!("To: {:?}", output_path);

std::fs::rename(&temp_save_path, &output_path)
.map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?;

Expand Down Expand Up @@ -355,7 +372,7 @@ impl Chunks {
fn load(root: &Path, doc_id: &DocumentId) -> Result<Option<Self>, Error> {
let doc_id_hash = DocIdPaths::from(doc_id);
let level2_path = doc_id_hash.level2_path(root);
tracing::debug!(
tracing::warn!(
root=%root.display(),
doc_id=?doc_id,
doc_path=%level2_path.display(),
Expand Down Expand Up @@ -439,7 +456,11 @@ impl Chunks {
for chunk in self.incrementals.values() {
bytes.extend(chunk);
}
automerge::Automerge::load(&bytes)

automerge::Automerge::load_with_options(
&bytes,
automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore),
)
}
}

Expand Down
27 changes: 17 additions & 10 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ pub(crate) struct DocumentInfo {
change_observers: Vec<RepoFutureResolver<Result<(), RepoError>>>,
/// Counter of local saves since last compact,
/// used to make decisions about full or incemental saves.
saves_since_last_compact: usize,
patches_since_last_compact: usize,
///
allowable_changes_until_compaction: usize,
/// Last heads obtained from the automerge doc.
Expand All @@ -580,7 +580,7 @@ impl DocumentInfo {
handle_count,
sync_states: Default::default(),
change_observers: Default::default(),
saves_since_last_compact: 0,
patches_since_last_compact: 0,
allowable_changes_until_compaction: 10,
last_heads,
}
Expand All @@ -600,7 +600,7 @@ impl DocumentInfo {
| DocState::Error
| DocState::LoadPending { .. }
| DocState::Bootstrap { .. } => {
assert_eq!(self.saves_since_last_compact, 0);
assert_eq!(self.patches_since_last_compact, 0);
DocState::PendingRemoval(vec![])
}
DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)),
Expand Down Expand Up @@ -704,14 +704,18 @@ impl DocumentInfo {
let count = {
let doc = self.document.read();
let changes = doc.automerge.get_changes(&self.last_heads);
println!("last: {:?}, current: {:?}", self.last_heads, doc.automerge.get_heads());
println!(
"last: {:?}, current: {:?}",
self.last_heads,
doc.automerge.get_heads()
);
//self.last_heads = doc.automerge.get_heads();
changes.len()
};
let has_patches = count > 0;
println!("Has patches: {:?}", has_patches);
self.saves_since_last_compact = self
.saves_since_last_compact
self.patches_since_last_compact = self
.patches_since_last_compact
.checked_add(count)
.unwrap_or(0);
has_patches
Expand All @@ -735,14 +739,14 @@ impl DocumentInfo {
return;
}
let should_compact =
self.saves_since_last_compact > self.allowable_changes_until_compaction;
self.patches_since_last_compact > self.allowable_changes_until_compaction;
let (storage_fut, new_heads) = if should_compact {
println!("We decided to Compact the document");
let (to_save, new_heads) = {
let doc = self.document.read();
(doc.automerge.save(), doc.automerge.get_heads())
};
self.saves_since_last_compact = 0;
self.patches_since_last_compact = 0;
println!("Since compact is zero");
(storage.compact(document_id.clone(), to_save), new_heads)
} else {
Expand All @@ -754,8 +758,11 @@ impl DocumentInfo {
doc.automerge.get_heads(),
)
};
self.saves_since_last_compact.checked_add(1).unwrap_or(0);
println!("Saves since last compact {}", self.saves_since_last_compact);
self.patches_since_last_compact.checked_add(1).unwrap_or(0);
println!(
"Saves since last compact {}",
self.patches_since_last_compact
);
(storage.append(document_id.clone(), to_save), new_heads)
};
match self.state {
Expand Down

0 comments on commit 3b13c37

Please sign in to comment.