Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Items I am changing to test under load and have an RC into production #50

Open
wants to merge 17 commits into
base: update_automerge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 65 additions & 60 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use error::ErrorKind;
/// 2. Load the data into an automerge document
/// 3. `automerge::Automerge::save` the document to a temporary file
/// 4. Rename the temporary file to a file in the data directory named
/// `SHA356(automerge::Automerge::get_heads)`.snapshot`
/// `SHA256(automerge::Automerge::get_heads)`.snapshot`
/// 5. Delete all the files we loaded in step 1.
///
/// The fact that we name the file after the heads of the document means that
Expand Down Expand Up @@ -86,7 +86,7 @@ impl FsStore {
pub fn get(&self, id: &DocumentId) -> Result<Option<Vec<u8>>, Error> {
let chunks = Chunks::load(&self.root, id)?;
let Some(chunks) = chunks else {
return Ok(None)
return Ok(None);
};
let mut result = Vec::new();
result.extend(chunks.snapshots.into_values().flatten());
Expand All @@ -106,7 +106,7 @@ impl FsStore {
.map_err(|e| Error(ErrorKind::ErrReadingLevel1Path(entry.path(), e)))?
.is_file()
{
tracing::warn!(
tracing::trace!(
non_dir_path=%entry.path().display(),
"unexpected non-directory at level1 of database"
);
Expand All @@ -122,15 +122,15 @@ impl FsStore {
let metadata = entry
.metadata()
.map_err(|e| Error(ErrorKind::ErrReadingLevel2Path(entry.path(), e)))?;
if metadata.is_dir() {
tracing::warn!(
if !metadata.is_dir() {
tracing::trace!(
non_file_path=%entry.path().display(),
"unexpected directory at level2 of database"
"unexpected non-directory at level2 of database"
);
continue;
}
let Some(doc_paths) = DocIdPaths::parse(&level1, entry.path()) else {
tracing::warn!(
let Some(doc_paths) = DocIdPaths::parse(entry.path()) else {
tracing::trace!(
non_doc_path=%entry.path().display(),
"unexpected non-document path at level2 of database"
);
Expand All @@ -144,44 +144,52 @@ impl FsStore {

pub fn append(&self, id: &DocumentId, changes: &[u8]) -> Result<(), Error> {
let paths = DocIdPaths::from(id);
std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| {
Error(ErrorKind::CreateLevel2Path(
paths.level2_path(&self.root),
e,
))
})?;

let chunk_name = SavedChunkName::new_incremental(changes);
write_chunk(&self.root, &paths, changes, chunk_name)?;

Ok(())
}

pub fn compact(&self, id: &DocumentId, _full_doc: &[u8]) -> Result<(), Error> {
pub fn compact(
&self,
id: &DocumentId,
full_doc: &[u8],
new_heads: Vec<ChangeHash>,
) -> Result<(), Error> {
let paths = DocIdPaths::from(id);

// Load all the data we have into a doc
let Some(chunks) = Chunks::load(&self.root, id)? else {
tracing::warn!(doc_id=%id, "attempted to compact non-existent document");
return Ok(())
};
let mut doc = chunks
.to_doc()
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(&self.root, &paths, &chunk, output_chunk_name)?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);
std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
match Chunks::load(&self.root, id) {
Ok(Some(chunks)) => {
// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(new_heads);
write_chunk(&self.root, &paths, full_doc, output_chunk_name.clone())?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);

std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
}
Ok(None) => {
let output_chunk_name = SavedChunkName {
hash: uuid::Uuid::new_v4().as_bytes().to_vec(),
chunk_type: ChunkType::Snapshot,
};
// Write the snapshot
write_chunk(&self.root, &paths, full_doc, output_chunk_name)?;
}
Err(e) => {
tracing::error!(e=%e, "Error loading chunks");
}
}
Ok(())
}
Expand All @@ -195,7 +203,7 @@ fn write_chunk(
) -> Result<(), Error> {
// Write to a temp file and then rename to avoid partial writes
let mut temp_save =
tempfile::NamedTempFile::new().map_err(|e| Error(ErrorKind::CreateTempFile(e)))?;
tempfile::NamedTempFile::new_in(root).map_err(|e| Error(ErrorKind::CreateTempFile(e)))?;
let temp_save_path = temp_save.path().to_owned();
temp_save
.as_file_mut()
Expand All @@ -206,9 +214,16 @@ fn write_chunk(
.sync_all()
.map_err(|e| Error(ErrorKind::WriteTempFile(temp_save_path.clone(), e)))?;

std::fs::create_dir_all(paths.level2_path(root))
.map_err(|e| Error(ErrorKind::CreateLevel2Path(paths.level2_path(root), e)))?;

// Move the temporary file into a snapshot in the document data directory
// with a name based on the hash of the heads of the document
let output_path = paths.chunk_path(root, &name);

tracing::trace!("Renaming: {:?}", temp_save);
tracing::trace!("To: {:?}", output_path);

std::fs::rename(&temp_save_path, &output_path)
.map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?;

Expand Down Expand Up @@ -236,16 +251,18 @@ impl<'a> From<&'a DocumentId> for DocIdPaths {
}

impl DocIdPaths {
fn parse<P1: AsRef<Path>, P2: AsRef<Path>>(level1: P1, level2: P2) -> Option<Self> {
let level1 = level1.as_ref().to_str()?;
fn parse(level2: PathBuf) -> Option<Self> {
let level1 = level2.parent()?.file_name()?.to_str()?;
let level2 = level2.file_name()?.to_str()?;

let prefix = hex::decode(level1).ok()?;
let prefix = <[u8; 2]>::try_from(prefix).ok()?;

let level2 = level2.as_ref().to_str()?;
let doc_id_bytes = hex::decode(level2).ok()?;
let doc_id_str = String::from_utf8(doc_id_bytes).ok()?;
let doc_id = DocumentId::from(doc_id_str.as_str());
let result = Self::from(&doc_id);

if result.prefix != prefix {
None
} else {
Expand Down Expand Up @@ -274,13 +291,13 @@ impl DocIdPaths {
}
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
enum ChunkType {
Snapshot,
Incremental,
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct SavedChunkName {
hash: Vec<u8>,
chunk_type: ChunkType,
Expand Down Expand Up @@ -343,7 +360,7 @@ impl Chunks {
fn load(root: &Path, doc_id: &DocumentId) -> Result<Option<Self>, Error> {
let doc_id_hash = DocIdPaths::from(doc_id);
let level2_path = doc_id_hash.level2_path(root);
tracing::debug!(
tracing::trace!(
root=%root.display(),
doc_id=?doc_id,
doc_path=%level2_path.display(),
Expand Down Expand Up @@ -379,11 +396,12 @@ impl Chunks {
.map_err(|e| Error(ErrorKind::ErrReadingChunkFileMetadata(path.clone(), e)))?
.is_file()
{
tracing::warn!(bad_file=%path.display(), "unexpected non-file in level2 path");
tracing::trace!(bad_file=%path.display(), "unexpected non-file in level2 path");
continue;
}
let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) else {
tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path");
let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse)
else {
tracing::trace!(bad_file=%path.display(), "unexpected non-chunk file in level2 path");
continue;
};
tracing::debug!(chunk_path=%path.display(), "reading chunk file");
Expand All @@ -393,7 +411,7 @@ impl Chunks {
match e.kind() {
std::io::ErrorKind::NotFound => {
// Could be a concurrent process compacting, not an error
tracing::warn!(
tracing::trace!(
missing_chunk_path=%path.display(),
"chunk file disappeared while reading chunks",
);
Expand All @@ -417,17 +435,6 @@ impl Chunks {
incrementals,
}))
}

fn to_doc(&self) -> Result<automerge::Automerge, automerge::AutomergeError> {
let mut bytes = Vec::new();
for chunk in self.snapshots.values() {
bytes.extend(chunk);
}
for chunk in self.incrementals.values() {
bytes.extend(chunk);
}
automerge::Automerge::load(&bytes)
}
}

mod error {
Expand Down Expand Up @@ -471,8 +478,6 @@ mod error {
ErrReadingChunkFile(PathBuf, std::io::Error),
#[error("error creating level 2 path {0}: {1}")]
CreateLevel2Path(PathBuf, std::io::Error),
#[error("error loading doc to compact: {0}")]
LoadDocToCompact(automerge::AutomergeError),
#[error("error creating temp file: {0}")]
CreateTempFile(std::io::Error),
#[error("error writing temp file {0}: {1}")]
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use automerge::ChangeHash;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -113,5 +114,6 @@ pub trait Storage: Send {
&self,
_id: DocumentId,
_full_doc: Vec<u8>,
_new_heads: Vec<ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>>;
}
2 changes: 1 addition & 1 deletion src/network_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl RepoHandle {
Ok(repo_msg)
}
Ok(m) => {
tracing::warn!(?m, repo_id=?repo_id, "Received non-repo message");
tracing::trace!(?m, repo_id=?repo_id, "Received non-repo message");
Err(NetworkError::Error)
}
Err(e) => {
Expand Down
Loading