Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce backwards/forwards compatibility for bdk_file_store. #1527

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 82 additions & 13 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ use crate::bincode_options;
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
pub struct EntryIter<'t, T1, T2 = T1> {
/// Buffered reader around the file
db_file: BufReader<&'t mut File>,
finished: bool,
/// The file position for the first read of `db_file`.
start_pos: Option<u64>,
types: PhantomData<T>,
types: PhantomData<(T1, T2)>,
}

impl<'t, T> EntryIter<'t, T> {
impl<'t, T1, T2> EntryIter<'t, T1, T2> {
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
Self {
db_file: BufReader::new(db_file),
Expand All @@ -33,11 +33,12 @@ impl<'t, T> EntryIter<'t, T> {
}
}

impl<'t, T> Iterator for EntryIter<'t, T>
impl<'t, T1, T2> Iterator for EntryIter<'t, T1, T2>
where
T: serde::de::DeserializeOwned,
T1: serde::de::DeserializeOwned + Default + serde::Serialize,
T2: serde::de::DeserializeOwned + Into<T1>,
{
type Item = Result<T, IterError>;
type Item = Result<T1, IterError>;

fn next(&mut self) -> Option<Self::Item> {
if self.finished {
Expand All @@ -48,10 +49,79 @@ where
self.db_file.seek(io::SeekFrom::Start(start))?;
}

let pos_before_read = self.db_file.stream_position()?;
match bincode_options().deserialize_from(&mut self.db_file) {
Ok(changeset) => Ok(Some(changeset)),
Err(e) => {
let mut changeset_size: u64 = 0;
let mut pos_before_read = self.db_file.stream_position()?;
bincode_options()
.deserialize_from::<_, u64>(&mut self.db_file)
.and_then(|size| {
// Save changeset size to use in case a partial read is needed
changeset_size = size;
pos_before_read = self.db_file.stream_position()?;
bincode_options()
.with_limit(size)
.deserialize_from::<_, T1>(&mut self.db_file)
})
.or_else(|e| {
let serialized_max_variant = bincode_options().serialize(&T1::default())?;
// allow trailing bytes because we are serializing the variant, not an u64, and
// we want to extract the varint index prefixed
let max_variant_index = bincode_options()
.allow_trailing_bytes()
.deserialize::<u64>(&serialized_max_variant)?;

match &*e {
bincode::ErrorKind::Custom(e) if e.contains("expected variant index") => {
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
pos_before_read = self.db_file.stream_position()?;
let actual_index =
bincode_options().deserialize_from::<_, u64>(&mut self.db_file)?;
// This will always happen as the `expected variant index` only will be
// risen when actual_index > max_variant_index
if actual_index > max_variant_index {
pos_before_read = self.db_file.stream_position()?;
let fallback_decoded_value = bincode_options()
.deserialize_from::<_, T2>(&mut self.db_file)
.map(Into::into);

if let Ok(v) = &fallback_decoded_value {
let actual_read_size = bincode_options().serialized_size(v)?;
let remaining_bytes_to_read =
(changeset_size - actual_read_size) as i64;
// Ignore remaining bytes
self.db_file
.seek(io::SeekFrom::Current(remaining_bytes_to_read))?;
}

return fallback_decoded_value;
}
}
bincode::ErrorKind::InvalidTagEncoding(index) => {
if *index as u64 > max_variant_index {
pos_before_read = self.db_file.stream_position()?;

let fallback_decoded_value = bincode_options()
.deserialize_from::<_, T2>(&mut self.db_file)
.map(Into::into);

if let Ok(v) = &fallback_decoded_value {
let actual_read_size = bincode_options().serialized_size(v)?;
let remaining_bytes_to_read =
(changeset_size - actual_read_size) as i64;
// Ignore remaining bytes
self.db_file
.seek(io::SeekFrom::Current(remaining_bytes_to_read))?;
}

return fallback_decoded_value;
}
}
_ => (),
};

Err(e)
})
.map(|changeset| Some(changeset))
.or_else(|e| {
self.finished = true;
let pos_after_read = self.db_file.stream_position()?;
// allow unexpected EOF if 0 bytes were read
Expand All @@ -64,14 +134,13 @@ where
}
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
Err(IterError::Bincode(*e))
}
}
})
})()
.transpose()
}
}

impl<'t, T> Drop for EntryIter<'t, T> {
impl<'t, T1, T2> Drop for EntryIter<'t, T1, T2> {
fn drop(&mut self) {
// This syncs the underlying file's offset with the buffer's position. This way, we
// maintain the correct position to start the next read/write.
Expand Down
Loading
Loading