Skip to content

Commit

Permalink
feat(store): add parse logic for variants with extra fields
Browse files Browse the repository at this point in the history
The V generic is intended for cases when the user keeps their changesets
versioned and with a strict policy of only adding fields, not modifying
older ones nor removing them.

In the case the data has been encoded with a newer version, but is
decoded with an older one, the code should attemp to parse only the
fields it already knows of, ignoring the rest.

This code implements that logic but also opens the door for parsing
fields of different types as the wrong type.
  • Loading branch information
nymius committed Sep 17, 2024
1 parent 2361b03 commit 3731a1c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
30 changes: 28 additions & 2 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ where
self.db_file.seek(io::SeekFrom::Start(start))?;
}

let mut changeset_size: u64 = 0;
let mut pos_before_read = self.db_file.stream_position()?;
bincode_options()
.deserialize_from::<_, u64>(&mut self.db_file)
.and_then(|size| {
// Save changeset size to use in case a partial read is needed
changeset_size = size;
pos_before_read = self.db_file.stream_position()?;
bincode_options()
.with_limit(size)
Expand All @@ -76,17 +79,40 @@ where
// risen when actual_index > max_variant_index
if actual_index > max_variant_index {
pos_before_read = self.db_file.stream_position()?;
return bincode_options()
let fallback_decoded_value = bincode_options()
.deserialize_from::<_, T2>(&mut self.db_file)
.map(Into::into);

if let Ok(v) = &fallback_decoded_value {
let actual_read_size = bincode_options().serialized_size(v)?;
let remaining_bytes_to_read =
(changeset_size - actual_read_size) as i64;
// Ignore remaining bytes
self.db_file
.seek(io::SeekFrom::Current(remaining_bytes_to_read))?;
}

return fallback_decoded_value;
}
}
bincode::ErrorKind::InvalidTagEncoding(index) => {
if *index as u64 > max_variant_index {
pos_before_read = self.db_file.stream_position()?;
return bincode_options()

let fallback_decoded_value = bincode_options()
.deserialize_from::<_, T2>(&mut self.db_file)
.map(Into::into);

if let Ok(v) = &fallback_decoded_value {
let actual_read_size = bincode_options().serialized_size(v)?;
let remaining_bytes_to_read =
(changeset_size - actual_read_size) as i64;
// Ignore remaining bytes
self.db_file
.seek(io::SeekFrom::Current(remaining_bytes_to_read))?;
}

return fallback_decoded_value;
}
}
_ => (),
Expand Down
38 changes: 9 additions & 29 deletions crates/file_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod test {
}

#[test]
fn deserialize_newer_changeset_version_with_old_code() {
fn decode_multiple_vw_enum_variant_index_not_in_vr_with_cr_when_cw_adds_any_kind_of_field() {
#[derive(PartialEq, Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
struct TestChangeSetV1 {
field_a: BTreeSet<String>,
Expand All @@ -483,31 +483,22 @@ mod test {
#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
struct TestChangeSetV2 {
field_a: BTreeSet<String>,
field_b: Option<BTreeSet<String>>,
field_b: BTreeSet<String>,
}

impl Merge for TestChangeSetV2 {
fn merge(&mut self, other: Self) {
self.field_a.extend(other.field_a);
if let Some(ref mut field_b) = self.field_b {
if let Some(other_field_b) = other.field_b {
field_b.extend(other_field_b)
}
}
self.field_b.extend(other.field_b);
}

fn is_empty(&self) -> bool {
if self.field_b.is_none() {
false
} else {
self.field_a.is_empty()
}
self.field_a.is_empty() && self.field_b.is_empty()
}
}

let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("20.dat");
println!("Test file: {:?}", file_path);

let new_code_aggregation = {
#[derive(serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -549,10 +540,10 @@ mod test {
}
}

let changesets = (0..2)
let changesets = (0..4)
.map(|n| TestChangeSetV2 {
field_a: BTreeSet::from([format!("{}", n)]),
field_b: None,
field_b: BTreeSet::from([format!("{}", n)]),
})
.collect::<Vec<_>>();

Expand All @@ -574,6 +565,7 @@ mod test {
acc
})
};

let old_code_aggregation = {
#[derive(serde::Serialize, serde::Deserialize)]
enum VersionedTestChangeSet {
Expand All @@ -600,29 +592,17 @@ mod test {
}
}

impl From<VersionedTestChangeSet> for TestChangeSetV2 {
fn from(_: VersionedTestChangeSet) -> Self {
Self::default()
}
}

impl From<TestChangeSetV2> for VersionedTestChangeSet {
fn from(_: TestChangeSetV2) -> Self {
VersionedTestChangeSet::default()
}
}

// We re-open the file and read all the versioned changesets using the "old"
// VersionedTestChangeSet
let mut db = Store::<TestChangeSetV2, VersionedTestChangeSet>::open(
let mut db = Store::<TestChangeSetV1, VersionedTestChangeSet>::open(
&TEST_MAGIC_BYTES,
&file_path,
)
.unwrap();
// Merge all versioned changesets in the aggregated correct changeset
db.iter_changesets()
.map(|r| r.expect("must read valid changeset"))
.fold(TestChangeSetV2::default(), |mut acc, v| {
.fold(TestChangeSetV1::default(), |mut acc, v| {
Merge::merge(&mut acc, v);
acc
})
Expand Down

0 comments on commit 3731a1c

Please sign in to comment.