Skip to content

Commit

Permalink
add snapshot to libsql-replication
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 22, 2023
1 parent 0cddb99 commit 69ef4b8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsql-replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio-stream = "0.1.14"
async-trait = "0.1.74"
uuid = { version = "1.5.0", features = ["v4"] }
tokio-util = "0.7.9"
async-stream = "0.3.5"

[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
Expand Down
1 change: 1 addition & 0 deletions libsql-replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod injector;
pub mod meta;
pub mod replicator;
pub mod rpc;
pub mod snapshot;

mod error;

Expand Down
86 changes: 86 additions & 0 deletions libsql-replication/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::io::SeekFrom;

Check failure on line 1 in libsql-replication/src/snapshot.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-replication/src/snapshot.rs
use std::mem::MaybeUninit;
use std::mem::size_of;
use std::path::Path;

use tokio::fs::File;
use bytemuck::{pod_read_unaligned, Pod, Zeroable};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio_stream::Stream;
use tokio_stream::StreamExt;

use crate::frame::Frame;
use crate::frame::FrameBorrowed;
use crate::frame::FrameNo;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Invalid snapshot file")]
InvalidSnapshot,
}

#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)]
#[repr(C)]
pub struct SnapshotFileHeader {
/// id of the database
pub log_id: u128,
/// first frame in the snapshot
pub start_frame_no: u64,
/// end frame in the snapshot
pub end_frame_no: u64,
/// number of frames in the snapshot
pub frame_count: u64,
/// safe of the database after applying the snapshot
pub size_after: u32,
pub _pad: u32,
}

pub struct SnapshotFile {
file: File,
header: SnapshotFileHeader,
}

impl SnapshotFile {
pub async fn open(path: &Path) -> Result<Self, Error> {
let mut file = File::open(path).await?;
let mut header_buf = [0; size_of::<SnapshotFileHeader>()];
file.read_exact(&mut header_buf).await?;
let header: SnapshotFileHeader = pod_read_unaligned(&header_buf);

Ok(Self { file, header })
}

pub fn into_stream(mut self) -> impl Stream<Item = Result<Frame, Error>> {
async_stream::try_stream! {
let mut previous_frame_no = None;
self.file.seek(SeekFrom::Start(size_of::<FrameBorrowed>() as _)).await?;
for _ in 0..self.header.frame_count {
let mut frame: MaybeUninit<FrameBorrowed> = MaybeUninit::uninit();
let buf = unsafe { std::slice::from_raw_parts_mut(frame.as_mut_ptr() as *mut u8, size_of::<FrameBorrowed>()) };
self.file.read_exact(buf).await?;
let frame = unsafe { frame.assume_init() };

if previous_frame_no.is_none() {
previous_frame_no = Some(frame.header().frame_no);
} else if previous_frame_no.unwrap() <= frame.header().frame_no {
// frames in snapshot must be in reverse ordering
Err(Error::InvalidSnapshot)?;
} else {
previous_frame_no = Some(frame.header().frame_no);
}

yield Frame::from(frame)
}
}
}

pub fn into_stream_from(self, from: FrameNo) -> impl Stream<Item = Result<Frame, Error>> {
self.into_stream().take_while(move |f| match f {
Ok(f) => f.header().frame_no >= from,
Err(_) => true,
})
}
}

0 comments on commit 69ef4b8

Please sign in to comment.