Skip to content

Commit

Permalink
Catch situation where current snapshot is being deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
issackelly authored and teohhanhui committed Nov 11, 2023
1 parent 184b811 commit 810d40a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
44 changes: 33 additions & 11 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl FsStore {
.map_err(|e| Error(ErrorKind::ErrReadingLevel1Path(entry.path(), e)))?
.is_file()
{
tracing::warn!(
tracing::trace!(
non_dir_path=%entry.path().display(),
"unexpected non-directory at level1 of database"
);
Expand All @@ -146,14 +146,14 @@ impl FsStore {
.metadata()
.map_err(|e| Error(ErrorKind::ErrReadingLevel2Path(entry.path(), e)))?;
if !metadata.is_dir() {
tracing::warn!(
tracing::trace!(
non_dir_path=%entry.path().display(),
"unexpected non-directory at level2 of database"
);
continue;
}
let Some(doc_paths) = DocIdPaths::parse(entry.path()) else {
tracing::warn!(
tracing::trace!(
non_doc_path=%entry.path().display(),
"unexpected non-document path at level2 of database"
);
Expand Down Expand Up @@ -193,16 +193,33 @@ impl FsStore {
// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(&self.root, &paths, &chunk, output_chunk_name, &self.tmpdir)?;
write_chunk(
&self.root,
&paths,
&chunk,
output_chunk_name.clone(),
&self.tmpdir,
)?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
let just_wrote = paths.chunk_path(&self.root, &output_chunk_name);
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);

if path == just_wrote {
tracing::error!(
?path,
"Somehow trying to delete the same path we just wrote to. Not today \
Satan"
);
continue;
}

std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
Expand Down Expand Up @@ -231,7 +248,7 @@ impl FsStore {
)?;
}
Err(e) => {
tracing::error!(e=%e, "Error loading chunks");
tracing::error!(doc_id=%id, %e, "error loading chunks");
}
}
Ok(())
Expand Down Expand Up @@ -261,6 +278,7 @@ fn write_chunk(
// Move the temporary file into a snapshot in the document data directory
// with a name based on the hash of the heads of the document
let output_path = paths.chunk_path(root, &name);
tracing::trace!(?temp_save_path, ?output_path, "renaming chunk file");
std::fs::rename(&temp_save_path, &output_path)
.map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?;

Expand Down Expand Up @@ -327,13 +345,13 @@ impl DocIdPaths {
}
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
enum ChunkType {
Snapshot,
Incremental,
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct SavedChunkName {
hash: Vec<u8>,
chunk_type: ChunkType,
Expand Down Expand Up @@ -432,12 +450,12 @@ impl Chunks {
.map_err(|e| Error(ErrorKind::ErrReadingChunkFileMetadata(path.clone(), e)))?
.is_file()
{
tracing::warn!(bad_file=%path.display(), "unexpected non-file in level2 path");
tracing::trace!(bad_file=%path.display(), "unexpected non-file in level2 path");
continue;
}
let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse)
else {
tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path");
tracing::trace!(bad_file=%path.display(), "unexpected non-chunk file in level2 path");
continue;
};
tracing::debug!(chunk_path=%path.display(), "reading chunk file");
Expand All @@ -447,7 +465,7 @@ impl Chunks {
match e.kind() {
std::io::ErrorKind::NotFound => {
// Could be a concurrent process compacting, not an error
tracing::warn!(
tracing::trace!(
missing_chunk_path=%path.display(),
"chunk file disappeared while reading chunks",
);
Expand Down Expand Up @@ -480,7 +498,11 @@ impl Chunks {
for chunk in self.incrementals.values() {
bytes.extend(chunk);
}
automerge::Automerge::load(&bytes)

automerge::Automerge::load_with_options(
&bytes,
automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore),
)
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ pub(crate) struct DocumentInfo {
change_observers: Vec<RepoFutureResolver<Result<(), RepoError>>>,
/// Counter of local saves since last compact,
/// used to make decisions about full or incemental saves.
saves_since_last_compact: usize,
patches_since_last_compact: usize,
///
allowable_changes_until_compaction: usize,
/// Last heads obtained from the automerge doc.
Expand All @@ -580,7 +580,7 @@ impl DocumentInfo {
handle_count,
sync_states: Default::default(),
change_observers: Default::default(),
saves_since_last_compact: 0,
patches_since_last_compact: 0,
allowable_changes_until_compaction: 10,
last_heads,
}
Expand All @@ -600,7 +600,7 @@ impl DocumentInfo {
| DocState::Error
| DocState::LoadPending { .. }
| DocState::Bootstrap { .. } => {
assert_eq!(self.saves_since_last_compact, 0);
assert_eq!(self.patches_since_last_compact, 0);
DocState::PendingRemoval(vec![])
}
DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)),
Expand Down Expand Up @@ -713,8 +713,8 @@ impl DocumentInfo {
changes.len()
};
let has_patches = count > 0;
self.saves_since_last_compact = self
.saves_since_last_compact
self.patches_since_last_compact = self
.patches_since_last_compact
.checked_add(count)
.unwrap_or(0);
has_patches
Expand All @@ -736,13 +736,13 @@ impl DocumentInfo {
return;
}
let should_compact =
self.saves_since_last_compact > self.allowable_changes_until_compaction;
self.patches_since_last_compact > self.allowable_changes_until_compaction;
let (storage_fut, new_heads) = if should_compact {
let (to_save, new_heads) = {
let doc = self.document.read();
(doc.automerge.save(), doc.automerge.get_heads())
};
self.saves_since_last_compact = 0;
self.patches_since_last_compact = 0;
(storage.compact(document_id.clone(), to_save), new_heads)
} else {
let (to_save, new_heads) = {
Expand All @@ -752,7 +752,7 @@ impl DocumentInfo {
doc.automerge.get_heads(),
)
};
self.saves_since_last_compact.checked_add(1).unwrap_or(0);
self.patches_since_last_compact.checked_add(1).unwrap_or(0);
(storage.append(document_id.clone(), to_save), new_heads)
};
match self.state {
Expand Down

0 comments on commit 810d40a

Please sign in to comment.