From 6b4e79b30cfd91001fcb126b49516b02377617e0 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sat, 24 Aug 2024 09:53:49 -0300 Subject: [PATCH 1/4] feat(store): add versioned serialization/deserialization Forward compatibility means the encoded V is of a newer version than what our software has, but our software needs to support deserializing it. To have forward compatibility, we need a length prefix for every encoded V. Why? Typically, the encoding length can be determined by the type definition of V. However, a newer version of V will have new variants, and the encoding lengths of those variants cannot be determined (because we don't have it). When we fail to deserialize into a variable of type V, we need to check the "variant index" encoding first. Why? If the variant index does not exist in our type V (which is an enum), then we are sure the data if of a newer type. Since the data is of a newer type, we can apply the forward-compatibility logic. On the flip side, if the data is of a variant index that exists in V, then we are sure that the data is corrupted. What is "variant index"? bincode encodes enums with the variant index first. The variant index is encoded with VarintEncoding by default. ENCODING FORMAT Each changeset should be encoded as follows: [L][V] - [L] is a bincode encoded u64. Use serialized_size on V to figure out the value of L. - [V] is a bincode encoded enum type that represents a versioned changeset. Because it is an enum, it is guaranteed that the encoded data begins with a VariantEncoding. Encoding: 1. Transform the changeset into a versioned-changeset first. 2. Determine the serialized_size of the versioned-changeset. The serialized_size of V is L. 3. Encode L: u64 and write to file. 4. Encode V and write to file. Decoding: 1. Decode L: u64 from file. 2. Read L bytes from file and attempt to decode as V. a. On success: Great! Extract C from V and return it. b. On failure: Decode V as u64 (to extract the enum-variant-varint-index). If variant index is too high, trim the variant-index from the data, and attempt to decode into C (changeset) directly. How do we determine if a variant-index is too high? Encode V::default() (which should always be the latest-version variant), and then decode the result as u64 to extract the variant-index. --- crates/file_store/src/entry_iter.rs | 67 ++++++++++++++++++++++------ crates/file_store/src/store.rs | 69 ++++++++++++++++------------- 2 files changed, 93 insertions(+), 43 deletions(-) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index 6be3fd034..5bdca547d 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -13,16 +13,16 @@ use crate::bincode_options; /// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. /// /// [`next`]: Self::next -pub struct EntryIter<'t, T> { +pub struct EntryIter<'t, T1, T2 = T1> { /// Buffered reader around the file db_file: BufReader<&'t mut File>, finished: bool, /// The file position for the first read of `db_file`. start_pos: Option, - types: PhantomData, + types: PhantomData<(T1, T2)>, } -impl<'t, T> EntryIter<'t, T> { +impl<'t, T1, T2> EntryIter<'t, T1, T2> { pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { Self { db_file: BufReader::new(db_file), @@ -33,11 +33,12 @@ impl<'t, T> EntryIter<'t, T> { } } -impl<'t, T> Iterator for EntryIter<'t, T> +impl<'t, T1, T2> Iterator for EntryIter<'t, T1, T2> where - T: serde::de::DeserializeOwned, + T1: serde::de::DeserializeOwned + Default + serde::Serialize, + T2: serde::de::DeserializeOwned + Into, { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.finished { @@ -48,10 +49,51 @@ where self.db_file.seek(io::SeekFrom::Start(start))?; } - let pos_before_read = self.db_file.stream_position()?; - match bincode_options().deserialize_from(&mut self.db_file) { - Ok(changeset) => Ok(Some(changeset)), - Err(e) => { + let mut pos_before_read = self.db_file.stream_position()?; + bincode_options() + .deserialize_from::<_, u64>(&mut self.db_file) + .and_then(|size| { + pos_before_read = self.db_file.stream_position()?; + bincode_options() + .with_limit(size) + .deserialize_from::<_, T1>(&mut self.db_file) + }) + .or_else(|e| { + let serialized_max_variant = bincode_options().serialize(&T1::default())?; + // allow trailing bytes because we are serializing the variant, not an u64, and + // we want to extract the varint index prefixed + let max_variant_index = bincode_options() + .allow_trailing_bytes() + .deserialize::(&serialized_max_variant)?; + + match &*e { + bincode::ErrorKind::Custom(e) if e.contains("expected variant index") => { + self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; + pos_before_read = self.db_file.stream_position()?; + let actual_index = + bincode_options().deserialize_from::<_, u64>(&mut self.db_file)?; + if actual_index > max_variant_index { + pos_before_read = self.db_file.stream_position()?; + return bincode_options() + .deserialize_from::<_, T2>(&mut self.db_file) + .map(Into::into); + } + } + bincode::ErrorKind::InvalidTagEncoding(index) => { + if *index as u64 > max_variant_index { + pos_before_read = self.db_file.stream_position()?; + return bincode_options() + .deserialize_from::<_, T2>(&mut self.db_file) + .map(Into::into); + } + } + _ => (), + }; + + Err(e) + }) + .map(|changeset| Some(changeset)) + .or_else(|e| { self.finished = true; let pos_after_read = self.db_file.stream_position()?; // allow unexpected EOF if 0 bytes were read @@ -64,14 +106,13 @@ where } self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; Err(IterError::Bincode(*e)) - } - } + }) })() .transpose() } } -impl<'t, T> Drop for EntryIter<'t, T> { +impl<'t, T1, T2> Drop for EntryIter<'t, T1, T2> { fn drop(&mut self) { // This syncs the underlying file's offset with the buffer's position. This way, we // maintain the correct position to start the next read/write. diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 62c3d91b6..ef210068e 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -11,22 +11,16 @@ use std::{ /// Persists an append-only list of changesets (`C`) to a single file. #[derive(Debug)] -pub struct Store -where - C: Sync + Send, -{ +pub struct Store { magic_len: usize, db_file: File, - marker: PhantomData, + marker: PhantomData<(C, V)>, } -impl Store +impl Store where - C: Merge - + serde::Serialize - + serde::de::DeserializeOwned - + core::marker::Send - + core::marker::Sync, + C: Into + Merge + Clone + Default + serde::de::DeserializeOwned, + V: Into + Default + serde::Serialize + serde::de::DeserializeOwned, { /// Create a new [`Store`] file in write-only mode; error if the file exists. /// @@ -109,6 +103,19 @@ where } } + /// Iterates over the stored **versioned** changesets from first to last, changing the seek + /// position at each iteration. + /// + /// The iterator may fail to read an entry and therefore return an error. However, the first time + /// it returns an error will be the last. After doing so, the iterator will always yield `None`. + /// + /// **WARNING**: This method changes the write position in the underlying file. You should + /// always iterate over all entries until `None` is returned if you want your next write to go + /// at the end; otherwise, you will write over existing entries. + pub fn iter_versioned_changesets(&mut self) -> EntryIter { + EntryIter::new(self.magic_len as u64, &mut self.db_file) + } + /// Iterates over the stored changeset from first to last, changing the seek position at each /// iteration. /// @@ -118,8 +125,8 @@ where /// **WARNING**: This method changes the write position in the underlying file. You should /// always iterate over all entries until `None` is returned if you want your next write to go /// at the end; otherwise, you will write over existing entries. - pub fn iter_changesets(&mut self) -> EntryIter { - EntryIter::new(self.magic_len as u64, &mut self.db_file) + pub fn iter_changesets(&mut self) -> impl Iterator> + '_ { + self.iter_versioned_changesets().map(|r| r.map(Into::into)) } /// Loads all the changesets that have been stored as one giant changeset. @@ -135,23 +142,22 @@ where /// **WARNING**: This method changes the write position of the underlying file. The next /// changeset will be written over the erroring entry (or the end of the file if none existed). pub fn aggregate_changesets(&mut self) -> Result, AggregateChangesetsError> { - let mut changeset = Option::::None; - for next_changeset in self.iter_changesets() { - let next_changeset = match next_changeset { - Ok(next_changeset) => next_changeset, - Err(iter_error) => { - return Err(AggregateChangesetsError { - changeset, - iter_error, - }) + self.iter_changesets().try_fold( + Option::::None, + |mut aggregated_changeset: Option, next_changeset| match next_changeset { + Ok(next_changeset) => { + match &mut aggregated_changeset { + Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset), + aggregated_changeset => *aggregated_changeset = Some(next_changeset), + } + Ok(aggregated_changeset) } - }; - match &mut changeset { - Some(changeset) => changeset.merge(next_changeset), - changeset => *changeset = Some(next_changeset), - } - } - Ok(changeset) + Err(iter_error) => Err(AggregateChangesetsError { + changeset: aggregated_changeset, + iter_error, + }), + }, + ) } /// Append a new changeset to the file and truncate the file to the end of the appended @@ -165,8 +171,11 @@ where return Ok(()); } + let versioned_changeset: V = changeset.clone().into(); bincode_options() - .serialize_into(&mut self.db_file, changeset) + .serialized_size(&versioned_changeset) + .and_then(|size| bincode_options().serialize_into::<_, u64>(&mut self.db_file, &size)) + .and_then(|_| bincode_options().serialize_into(&mut self.db_file, &versioned_changeset)) .map_err(|e| match *e { bincode::ErrorKind::Io(error) => error, unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), From f8841dd597456296bfcaca6bde55a51036a6d7d9 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sat, 24 Aug 2024 09:58:41 -0300 Subject: [PATCH 2/4] test(store): add size prefix to test serialization --- crates/file_store/src/entry_iter.rs | 2 ++ crates/file_store/src/store.rs | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index 5bdca547d..9980cde8e 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -72,6 +72,8 @@ where pos_before_read = self.db_file.stream_position()?; let actual_index = bincode_options().deserialize_from::<_, u64>(&mut self.db_file)?; + // This will always happen as the `expected variant index` only will be + // risen when actual_index > max_variant_index if actual_index > max_variant_index { pos_before_read = self.db_file.stream_position()?; return bincode_options() diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index ef210068e..ec3db4308 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -325,6 +325,15 @@ mod test { let expected_bytes = { let mut buf = TEST_MAGIC_BYTES.to_vec(); + DefaultOptions::new() + .with_varint_encoding() + .serialized_size(&changeset) + .and_then(|size| { + DefaultOptions::new() + .with_varint_encoding() + .serialize_into(&mut buf, &size) + }) + .expect("should prefix changeset size"); DefaultOptions::new() .with_varint_encoding() .serialize_into(&mut buf, &changeset) @@ -345,6 +354,10 @@ mod test { TestChangeSet::from(["4".into(), "5".into(), "6".into()]), ]; let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); + let last_changeset_bytes_size = bincode_options() + .serialized_size(&last_changeset) + .and_then(|size| bincode_options().serialize::(&size)) + .unwrap(); let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); for short_write_len in 1..last_changeset_bytes.len() - 1 { @@ -357,6 +370,8 @@ mod test { for changeset in &changesets { db.append_changeset(changeset).unwrap(); } + // write last changeset full size + db.db_file.write_all(&last_changeset_bytes_size).unwrap(); // this is the incomplete write db.db_file .write_all(&last_changeset_bytes[..short_write_len]) @@ -378,6 +393,7 @@ mod test { }), "should recover all changesets that are written in full", ); + // write last changeset in full db.db_file.write_all(&last_changeset_bytes).unwrap(); } From 2361b03d6471a41e0cea1876ef95d2cf8b232d75 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:00:11 -0300 Subject: [PATCH 3/4] test(store): add forward compatibility test Serialize newer versioned changeset and deserialize into alternative not versioned changeset. --- crates/file_store/src/store.rs | 167 +++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index ec3db4308..20f7289cc 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -463,4 +463,171 @@ mod test { assert_eq!(aggregation, exp_aggregation); } } + + #[test] + fn deserialize_newer_changeset_version_with_old_code() { + #[derive(PartialEq, Debug, Default, Clone, serde::Deserialize, serde::Serialize)] + struct TestChangeSetV1 { + field_a: BTreeSet, + } + impl Merge for TestChangeSetV1 { + fn merge(&mut self, other: Self) { + self.field_a.extend(other.field_a); + } + + fn is_empty(&self) -> bool { + self.field_a.is_empty() + } + } + + #[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)] + struct TestChangeSetV2 { + field_a: BTreeSet, + field_b: Option>, + } + + impl Merge for TestChangeSetV2 { + fn merge(&mut self, other: Self) { + self.field_a.extend(other.field_a); + if let Some(ref mut field_b) = self.field_b { + if let Some(other_field_b) = other.field_b { + field_b.extend(other_field_b) + } + } + } + + fn is_empty(&self) -> bool { + if self.field_b.is_none() { + false + } else { + self.field_a.is_empty() + } + } + } + + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("20.dat"); + println!("Test file: {:?}", file_path); + + let new_code_aggregation = { + #[derive(serde::Serialize, serde::Deserialize)] + enum VersionedTestChangeSet { + V1(TestChangeSetV1), + V2(TestChangeSetV2), + } + + impl Default for VersionedTestChangeSet { + fn default() -> Self { + VersionedTestChangeSet::V2(TestChangeSetV2::default()) + } + } + + impl From for TestChangeSetV1 { + fn from(versioned_change_set: VersionedTestChangeSet) -> Self { + match versioned_change_set { + VersionedTestChangeSet::V1(changeset) => changeset, + _ => panic!(), + } + } + } + + impl From for VersionedTestChangeSet { + fn from(test_change_set: TestChangeSetV1) -> Self { + VersionedTestChangeSet::V1(test_change_set) + } + } + + impl From for TestChangeSetV2 { + fn from(_: VersionedTestChangeSet) -> Self { + Self::default() + } + } + + impl From for VersionedTestChangeSet { + fn from(_: TestChangeSetV2) -> Self { + VersionedTestChangeSet::default() + } + } + + let changesets = (0..2) + .map(|n| TestChangeSetV2 { + field_a: BTreeSet::from([format!("{}", n)]), + field_b: None, + }) + .collect::>(); + + // First, we create the file with all the versioned changesets! + let mut db = Store::::create_new( + &TEST_MAGIC_BYTES, + &file_path, + ) + .unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + + // Get aggregated changeset with new code + db.iter_changesets() + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSetV2::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }) + }; + let old_code_aggregation = { + #[derive(serde::Serialize, serde::Deserialize)] + enum VersionedTestChangeSet { + V1(TestChangeSetV1), + } + + impl Default for VersionedTestChangeSet { + fn default() -> Self { + VersionedTestChangeSet::V1(TestChangeSetV1::default()) + } + } + + impl From for TestChangeSetV1 { + fn from(versioned_change_set: VersionedTestChangeSet) -> Self { + match versioned_change_set { + VersionedTestChangeSet::V1(changeset) => changeset, + } + } + } + + impl From for VersionedTestChangeSet { + fn from(test_change_set: TestChangeSetV1) -> Self { + VersionedTestChangeSet::V1(test_change_set) + } + } + + impl From for TestChangeSetV2 { + fn from(_: VersionedTestChangeSet) -> Self { + Self::default() + } + } + + impl From for VersionedTestChangeSet { + fn from(_: TestChangeSetV2) -> Self { + VersionedTestChangeSet::default() + } + } + + // We re-open the file and read all the versioned changesets using the "old" + // VersionedTestChangeSet + let mut db = Store::::open( + &TEST_MAGIC_BYTES, + &file_path, + ) + .unwrap(); + // Merge all versioned changesets in the aggregated correct changeset + db.iter_changesets() + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSetV2::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }) + }; + + assert_eq!(new_code_aggregation.field_a, old_code_aggregation.field_a); + } } From 3731a1c90a74d2a8dfc546223db6c00d3cc55dd3 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Sat, 14 Sep 2024 20:58:49 -0300 Subject: [PATCH 4/4] feat(store): add parse logic for variants with extra fields The V generic is intended for cases when the user keeps their changesets versioned and with a strict policy of only adding fields, not modifying older ones nor removing them. In the case the data has been encoded with a newer version, but is decoded with an older one, the code should attemp to parse only the fields it already knows of, ignoring the rest. This code implements that logic but also opens the door for parsing fields of different types as the wrong type. --- crates/file_store/src/entry_iter.rs | 30 +++++++++++++++++++++-- crates/file_store/src/store.rs | 38 +++++++---------------------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index 9980cde8e..c9b36a35f 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -49,10 +49,13 @@ where self.db_file.seek(io::SeekFrom::Start(start))?; } + let mut changeset_size: u64 = 0; let mut pos_before_read = self.db_file.stream_position()?; bincode_options() .deserialize_from::<_, u64>(&mut self.db_file) .and_then(|size| { + // Save changeset size to use in case a partial read is needed + changeset_size = size; pos_before_read = self.db_file.stream_position()?; bincode_options() .with_limit(size) @@ -76,17 +79,40 @@ where // risen when actual_index > max_variant_index if actual_index > max_variant_index { pos_before_read = self.db_file.stream_position()?; - return bincode_options() + let fallback_decoded_value = bincode_options() .deserialize_from::<_, T2>(&mut self.db_file) .map(Into::into); + + if let Ok(v) = &fallback_decoded_value { + let actual_read_size = bincode_options().serialized_size(v)?; + let remaining_bytes_to_read = + (changeset_size - actual_read_size) as i64; + // Ignore remaining bytes + self.db_file + .seek(io::SeekFrom::Current(remaining_bytes_to_read))?; + } + + return fallback_decoded_value; } } bincode::ErrorKind::InvalidTagEncoding(index) => { if *index as u64 > max_variant_index { pos_before_read = self.db_file.stream_position()?; - return bincode_options() + + let fallback_decoded_value = bincode_options() .deserialize_from::<_, T2>(&mut self.db_file) .map(Into::into); + + if let Ok(v) = &fallback_decoded_value { + let actual_read_size = bincode_options().serialized_size(v)?; + let remaining_bytes_to_read = + (changeset_size - actual_read_size) as i64; + // Ignore remaining bytes + self.db_file + .seek(io::SeekFrom::Current(remaining_bytes_to_read))?; + } + + return fallback_decoded_value; } } _ => (), diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 20f7289cc..f4042c96d 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -465,7 +465,7 @@ mod test { } #[test] - fn deserialize_newer_changeset_version_with_old_code() { + fn decode_multiple_vw_enum_variant_index_not_in_vr_with_cr_when_cw_adds_any_kind_of_field() { #[derive(PartialEq, Debug, Default, Clone, serde::Deserialize, serde::Serialize)] struct TestChangeSetV1 { field_a: BTreeSet, @@ -483,31 +483,22 @@ mod test { #[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)] struct TestChangeSetV2 { field_a: BTreeSet, - field_b: Option>, + field_b: BTreeSet, } impl Merge for TestChangeSetV2 { fn merge(&mut self, other: Self) { self.field_a.extend(other.field_a); - if let Some(ref mut field_b) = self.field_b { - if let Some(other_field_b) = other.field_b { - field_b.extend(other_field_b) - } - } + self.field_b.extend(other.field_b); } fn is_empty(&self) -> bool { - if self.field_b.is_none() { - false - } else { - self.field_a.is_empty() - } + self.field_a.is_empty() && self.field_b.is_empty() } } let temp_dir = tempfile::tempdir().unwrap(); let file_path = temp_dir.path().join("20.dat"); - println!("Test file: {:?}", file_path); let new_code_aggregation = { #[derive(serde::Serialize, serde::Deserialize)] @@ -549,10 +540,10 @@ mod test { } } - let changesets = (0..2) + let changesets = (0..4) .map(|n| TestChangeSetV2 { field_a: BTreeSet::from([format!("{}", n)]), - field_b: None, + field_b: BTreeSet::from([format!("{}", n)]), }) .collect::>(); @@ -574,6 +565,7 @@ mod test { acc }) }; + let old_code_aggregation = { #[derive(serde::Serialize, serde::Deserialize)] enum VersionedTestChangeSet { @@ -600,21 +592,9 @@ mod test { } } - impl From for TestChangeSetV2 { - fn from(_: VersionedTestChangeSet) -> Self { - Self::default() - } - } - - impl From for VersionedTestChangeSet { - fn from(_: TestChangeSetV2) -> Self { - VersionedTestChangeSet::default() - } - } - // We re-open the file and read all the versioned changesets using the "old" // VersionedTestChangeSet - let mut db = Store::::open( + let mut db = Store::::open( &TEST_MAGIC_BYTES, &file_path, ) @@ -622,7 +602,7 @@ mod test { // Merge all versioned changesets in the aggregated correct changeset db.iter_changesets() .map(|r| r.expect("must read valid changeset")) - .fold(TestChangeSetV2::default(), |mut acc, v| { + .fold(TestChangeSetV1::default(), |mut acc, v| { Merge::merge(&mut acc, v); acc })