diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index 6be3fd034..c9b36a35f 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -13,16 +13,16 @@ use crate::bincode_options; /// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. /// /// [`next`]: Self::next -pub struct EntryIter<'t, T> { +pub struct EntryIter<'t, T1, T2 = T1> { /// Buffered reader around the file db_file: BufReader<&'t mut File>, finished: bool, /// The file position for the first read of `db_file`. start_pos: Option, - types: PhantomData, + types: PhantomData<(T1, T2)>, } -impl<'t, T> EntryIter<'t, T> { +impl<'t, T1, T2> EntryIter<'t, T1, T2> { pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { Self { db_file: BufReader::new(db_file), @@ -33,11 +33,12 @@ impl<'t, T> EntryIter<'t, T> { } } -impl<'t, T> Iterator for EntryIter<'t, T> +impl<'t, T1, T2> Iterator for EntryIter<'t, T1, T2> where - T: serde::de::DeserializeOwned, + T1: serde::de::DeserializeOwned + Default + serde::Serialize, + T2: serde::de::DeserializeOwned + Into, { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.finished { @@ -48,10 +49,79 @@ where self.db_file.seek(io::SeekFrom::Start(start))?; } - let pos_before_read = self.db_file.stream_position()?; - match bincode_options().deserialize_from(&mut self.db_file) { - Ok(changeset) => Ok(Some(changeset)), - Err(e) => { + let mut changeset_size: u64 = 0; + let mut pos_before_read = self.db_file.stream_position()?; + bincode_options() + .deserialize_from::<_, u64>(&mut self.db_file) + .and_then(|size| { + // Save changeset size to use in case a partial read is needed + changeset_size = size; + pos_before_read = self.db_file.stream_position()?; + bincode_options() + .with_limit(size) + .deserialize_from::<_, T1>(&mut self.db_file) + }) + .or_else(|e| { + let serialized_max_variant = bincode_options().serialize(&T1::default())?; + // allow trailing bytes because we are serializing the variant, not an u64, and + // we want to extract the varint index prefixed + let max_variant_index = bincode_options() + .allow_trailing_bytes() + .deserialize::(&serialized_max_variant)?; + + match &*e { + bincode::ErrorKind::Custom(e) if e.contains("expected variant index") => { + self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; + pos_before_read = self.db_file.stream_position()?; + let actual_index = + bincode_options().deserialize_from::<_, u64>(&mut self.db_file)?; + // This will always happen as the `expected variant index` only will be + // risen when actual_index > max_variant_index + if actual_index > max_variant_index { + pos_before_read = self.db_file.stream_position()?; + let fallback_decoded_value = bincode_options() + .deserialize_from::<_, T2>(&mut self.db_file) + .map(Into::into); + + if let Ok(v) = &fallback_decoded_value { + let actual_read_size = bincode_options().serialized_size(v)?; + let remaining_bytes_to_read = + (changeset_size - actual_read_size) as i64; + // Ignore remaining bytes + self.db_file + .seek(io::SeekFrom::Current(remaining_bytes_to_read))?; + } + + return fallback_decoded_value; + } + } + bincode::ErrorKind::InvalidTagEncoding(index) => { + if *index as u64 > max_variant_index { + pos_before_read = self.db_file.stream_position()?; + + let fallback_decoded_value = bincode_options() + .deserialize_from::<_, T2>(&mut self.db_file) + .map(Into::into); + + if let Ok(v) = &fallback_decoded_value { + let actual_read_size = bincode_options().serialized_size(v)?; + let remaining_bytes_to_read = + (changeset_size - actual_read_size) as i64; + // Ignore remaining bytes + self.db_file + .seek(io::SeekFrom::Current(remaining_bytes_to_read))?; + } + + return fallback_decoded_value; + } + } + _ => (), + }; + + Err(e) + }) + .map(|changeset| Some(changeset)) + .or_else(|e| { self.finished = true; let pos_after_read = self.db_file.stream_position()?; // allow unexpected EOF if 0 bytes were read @@ -64,14 +134,13 @@ where } self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; Err(IterError::Bincode(*e)) - } - } + }) })() .transpose() } } -impl<'t, T> Drop for EntryIter<'t, T> { +impl<'t, T1, T2> Drop for EntryIter<'t, T1, T2> { fn drop(&mut self) { // This syncs the underlying file's offset with the buffer's position. This way, we // maintain the correct position to start the next read/write. diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 62c3d91b6..f4042c96d 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -11,22 +11,16 @@ use std::{ /// Persists an append-only list of changesets (`C`) to a single file. #[derive(Debug)] -pub struct Store -where - C: Sync + Send, -{ +pub struct Store { magic_len: usize, db_file: File, - marker: PhantomData, + marker: PhantomData<(C, V)>, } -impl Store +impl Store where - C: Merge - + serde::Serialize - + serde::de::DeserializeOwned - + core::marker::Send - + core::marker::Sync, + C: Into + Merge + Clone + Default + serde::de::DeserializeOwned, + V: Into + Default + serde::Serialize + serde::de::DeserializeOwned, { /// Create a new [`Store`] file in write-only mode; error if the file exists. /// @@ -109,6 +103,19 @@ where } } + /// Iterates over the stored **versioned** changesets from first to last, changing the seek + /// position at each iteration. + /// + /// The iterator may fail to read an entry and therefore return an error. However, the first time + /// it returns an error will be the last. After doing so, the iterator will always yield `None`. + /// + /// **WARNING**: This method changes the write position in the underlying file. You should + /// always iterate over all entries until `None` is returned if you want your next write to go + /// at the end; otherwise, you will write over existing entries. + pub fn iter_versioned_changesets(&mut self) -> EntryIter { + EntryIter::new(self.magic_len as u64, &mut self.db_file) + } + /// Iterates over the stored changeset from first to last, changing the seek position at each /// iteration. /// @@ -118,8 +125,8 @@ where /// **WARNING**: This method changes the write position in the underlying file. You should /// always iterate over all entries until `None` is returned if you want your next write to go /// at the end; otherwise, you will write over existing entries. - pub fn iter_changesets(&mut self) -> EntryIter { - EntryIter::new(self.magic_len as u64, &mut self.db_file) + pub fn iter_changesets(&mut self) -> impl Iterator> + '_ { + self.iter_versioned_changesets().map(|r| r.map(Into::into)) } /// Loads all the changesets that have been stored as one giant changeset. @@ -135,23 +142,22 @@ where /// **WARNING**: This method changes the write position of the underlying file. The next /// changeset will be written over the erroring entry (or the end of the file if none existed). pub fn aggregate_changesets(&mut self) -> Result, AggregateChangesetsError> { - let mut changeset = Option::::None; - for next_changeset in self.iter_changesets() { - let next_changeset = match next_changeset { - Ok(next_changeset) => next_changeset, - Err(iter_error) => { - return Err(AggregateChangesetsError { - changeset, - iter_error, - }) + self.iter_changesets().try_fold( + Option::::None, + |mut aggregated_changeset: Option, next_changeset| match next_changeset { + Ok(next_changeset) => { + match &mut aggregated_changeset { + Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset), + aggregated_changeset => *aggregated_changeset = Some(next_changeset), + } + Ok(aggregated_changeset) } - }; - match &mut changeset { - Some(changeset) => changeset.merge(next_changeset), - changeset => *changeset = Some(next_changeset), - } - } - Ok(changeset) + Err(iter_error) => Err(AggregateChangesetsError { + changeset: aggregated_changeset, + iter_error, + }), + }, + ) } /// Append a new changeset to the file and truncate the file to the end of the appended @@ -165,8 +171,11 @@ where return Ok(()); } + let versioned_changeset: V = changeset.clone().into(); bincode_options() - .serialize_into(&mut self.db_file, changeset) + .serialized_size(&versioned_changeset) + .and_then(|size| bincode_options().serialize_into::<_, u64>(&mut self.db_file, &size)) + .and_then(|_| bincode_options().serialize_into(&mut self.db_file, &versioned_changeset)) .map_err(|e| match *e { bincode::ErrorKind::Io(error) => error, unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), @@ -316,6 +325,15 @@ mod test { let expected_bytes = { let mut buf = TEST_MAGIC_BYTES.to_vec(); + DefaultOptions::new() + .with_varint_encoding() + .serialized_size(&changeset) + .and_then(|size| { + DefaultOptions::new() + .with_varint_encoding() + .serialize_into(&mut buf, &size) + }) + .expect("should prefix changeset size"); DefaultOptions::new() .with_varint_encoding() .serialize_into(&mut buf, &changeset) @@ -336,6 +354,10 @@ mod test { TestChangeSet::from(["4".into(), "5".into(), "6".into()]), ]; let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); + let last_changeset_bytes_size = bincode_options() + .serialized_size(&last_changeset) + .and_then(|size| bincode_options().serialize::(&size)) + .unwrap(); let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); for short_write_len in 1..last_changeset_bytes.len() - 1 { @@ -348,6 +370,8 @@ mod test { for changeset in &changesets { db.append_changeset(changeset).unwrap(); } + // write last changeset full size + db.db_file.write_all(&last_changeset_bytes_size).unwrap(); // this is the incomplete write db.db_file .write_all(&last_changeset_bytes[..short_write_len]) @@ -369,6 +393,7 @@ mod test { }), "should recover all changesets that are written in full", ); + // write last changeset in full db.db_file.write_all(&last_changeset_bytes).unwrap(); } @@ -438,4 +463,151 @@ mod test { assert_eq!(aggregation, exp_aggregation); } } + + #[test] + fn decode_multiple_vw_enum_variant_index_not_in_vr_with_cr_when_cw_adds_any_kind_of_field() { + #[derive(PartialEq, Debug, Default, Clone, serde::Deserialize, serde::Serialize)] + struct TestChangeSetV1 { + field_a: BTreeSet, + } + impl Merge for TestChangeSetV1 { + fn merge(&mut self, other: Self) { + self.field_a.extend(other.field_a); + } + + fn is_empty(&self) -> bool { + self.field_a.is_empty() + } + } + + #[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)] + struct TestChangeSetV2 { + field_a: BTreeSet, + field_b: BTreeSet, + } + + impl Merge for TestChangeSetV2 { + fn merge(&mut self, other: Self) { + self.field_a.extend(other.field_a); + self.field_b.extend(other.field_b); + } + + fn is_empty(&self) -> bool { + self.field_a.is_empty() && self.field_b.is_empty() + } + } + + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("20.dat"); + + let new_code_aggregation = { + #[derive(serde::Serialize, serde::Deserialize)] + enum VersionedTestChangeSet { + V1(TestChangeSetV1), + V2(TestChangeSetV2), + } + + impl Default for VersionedTestChangeSet { + fn default() -> Self { + VersionedTestChangeSet::V2(TestChangeSetV2::default()) + } + } + + impl From for TestChangeSetV1 { + fn from(versioned_change_set: VersionedTestChangeSet) -> Self { + match versioned_change_set { + VersionedTestChangeSet::V1(changeset) => changeset, + _ => panic!(), + } + } + } + + impl From for VersionedTestChangeSet { + fn from(test_change_set: TestChangeSetV1) -> Self { + VersionedTestChangeSet::V1(test_change_set) + } + } + + impl From for TestChangeSetV2 { + fn from(_: VersionedTestChangeSet) -> Self { + Self::default() + } + } + + impl From for VersionedTestChangeSet { + fn from(_: TestChangeSetV2) -> Self { + VersionedTestChangeSet::default() + } + } + + let changesets = (0..4) + .map(|n| TestChangeSetV2 { + field_a: BTreeSet::from([format!("{}", n)]), + field_b: BTreeSet::from([format!("{}", n)]), + }) + .collect::>(); + + // First, we create the file with all the versioned changesets! + let mut db = Store::::create_new( + &TEST_MAGIC_BYTES, + &file_path, + ) + .unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + + // Get aggregated changeset with new code + db.iter_changesets() + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSetV2::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }) + }; + + let old_code_aggregation = { + #[derive(serde::Serialize, serde::Deserialize)] + enum VersionedTestChangeSet { + V1(TestChangeSetV1), + } + + impl Default for VersionedTestChangeSet { + fn default() -> Self { + VersionedTestChangeSet::V1(TestChangeSetV1::default()) + } + } + + impl From for TestChangeSetV1 { + fn from(versioned_change_set: VersionedTestChangeSet) -> Self { + match versioned_change_set { + VersionedTestChangeSet::V1(changeset) => changeset, + } + } + } + + impl From for VersionedTestChangeSet { + fn from(test_change_set: TestChangeSetV1) -> Self { + VersionedTestChangeSet::V1(test_change_set) + } + } + + // We re-open the file and read all the versioned changesets using the "old" + // VersionedTestChangeSet + let mut db = Store::::open( + &TEST_MAGIC_BYTES, + &file_path, + ) + .unwrap(); + // Merge all versioned changesets in the aggregated correct changeset + db.iter_changesets() + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSetV1::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }) + }; + + assert_eq!(new_code_aggregation.field_a, old_code_aggregation.field_a); + } }