diff --git a/bottomless-cli/Cargo.toml b/bottomless-cli/Cargo.toml index 41a850d8f7..593818951b 100644 --- a/bottomless-cli/Cargo.toml +++ b/bottomless-cli/Cargo.toml @@ -11,10 +11,12 @@ description = "Command-line interface for bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" +async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] } aws-config = "0.55" aws-sdk-s3 = "0.28" aws-smithy-types = "0.55" bottomless = { version = "0", path = "../bottomless" } +bytes = "1" chrono = "0.4.23" clap = { version = "4.0.29", features = ["derive"] } tokio = { version = "1.23.0", features = ["macros", "rt", "rt-multi-thread"] } diff --git a/bottomless-cli/src/main.rs b/bottomless-cli/src/main.rs index e8e0703617..28a7ed8896 100644 --- a/bottomless-cli/src/main.rs +++ b/bottomless-cli/src/main.rs @@ -71,6 +71,8 @@ enum Commands { long_help = "UTC timestamp which is an upper bound for the transactions to be restored." )] utc_time: Option, + #[clap(long, short, conflicts_with_all = ["generation", "utc_time"], long_help = "Restore from a local directory")] + from_dir: Option, }, #[clap(about = "Verify integrity of the database")] Verify { @@ -107,6 +109,51 @@ async fn run() -> Result<()> { tracing_subscriber::fmt::init(); let mut options = Cli::parse(); + if let Commands::Restore { + generation: _, + utc_time: _, + from_dir: Some(from_dir), + } = options.command + { + let database = match &options.database { + Some(database) => database, + None => { + println!("Please pass the database name with -d option"); + return Ok(()); + } + }; + println!("trying to restore from {}", from_dir.display()); + let mut db_file = tokio::fs::File::create(database).await?; + let (page_size, checksum) = match Replicator::get_local_metadata(&from_dir).await { + Ok(Some((page_size, checksum))) => (page_size, checksum), + Ok(None) => { + println!("No local metadata found, continuing anyway"); + (4096, 0) + } + Err(e) => { + println!("Failed to get local metadata: {e}, continuing anyway"); + (4096, 0) + } + }; + println!("Local metadata: page_size={page_size}, checksum={checksum:x}"); + Replicator::restore_from_local_snapshot(&from_dir, &mut db_file).await?; + println!("Restored local snapshot to {}", database); + let applied_frames = Replicator::apply_wal_from_local_generation( + &from_dir, + &mut db_file, + page_size, + checksum, + ) + .await?; + println!("Applied {applied_frames} frames from local generation"); + if let Err(e) = verify_db(&PathBuf::from(database)) { + println!("Verification failed: {e}"); + std::process::exit(1) + } + println!("Verification: ok"); + return Ok(()); + } + if let Some(ep) = options.endpoint.as_deref() { std::env::set_var("LIBSQL_BOTTOMLESS_ENDPOINT", ep) } else { @@ -166,6 +213,7 @@ async fn run() -> Result<()> { Commands::Restore { generation, utc_time, + .. } => { tokio::fs::create_dir_all(&database_dir).await?; client.restore(generation, utc_time).await?; diff --git a/bottomless-cli/src/replicator_extras.rs b/bottomless-cli/src/replicator_extras.rs index 002ae00350..85df761be0 100644 --- a/bottomless-cli/src/replicator_extras.rs +++ b/bottomless-cli/src/replicator_extras.rs @@ -264,4 +264,170 @@ impl Replicator { self.print_snapshot_summary(&generation).await?; Ok(()) } + + pub async fn restore_from_local_snapshot( + from_dir: impl AsRef, + db: &mut tokio::fs::File, + ) -> Result { + let from_dir = from_dir.as_ref(); + use bottomless::replicator::CompressionKind; + use tokio::io::AsyncWriteExt; + + let algos_to_try = &[ + CompressionKind::Gzip, + CompressionKind::Zstd, + CompressionKind::None, + ]; + + for algo in algos_to_try { + let main_db_path = match algo { + CompressionKind::None => from_dir.join("db.db"), + CompressionKind::Gzip => from_dir.join("db.gz"), + CompressionKind::Zstd => from_dir.join("db.zstd"), + }; + if let Ok(mut db_file) = tokio::fs::File::open(&main_db_path).await { + let db_size = match algo { + CompressionKind::None => tokio::io::copy(&mut db_file, db).await?, + CompressionKind::Gzip => { + let mut decompress_reader = + async_compression::tokio::bufread::GzipDecoder::new( + tokio::io::BufReader::new(db_file), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + CompressionKind::Zstd => { + let mut decompress_reader = + async_compression::tokio::bufread::ZstdDecoder::new( + tokio::io::BufReader::new(db_file), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + }; + db.flush().await?; + + tracing::info!("Restored the main database file ({} bytes)", db_size); + return Ok(true); + } + } + Ok(false) + } + + pub async fn apply_wal_from_local_generation( + from_dir: impl AsRef, + db: &mut tokio::fs::File, + page_size: u32, + checksum: u64, + ) -> Result { + use bottomless::transaction_cache::TransactionPageCache; + use tokio::io::AsyncWriteExt; + + const SWAP_AFTER: u32 = 65536; + const TMP_RESTORE_DIR: &str = ".bottomless.restore.tmp"; + + let from_dir = from_dir.as_ref(); + let mut page_buf = { + let mut v = Vec::with_capacity(page_size as usize); + v.spare_capacity_mut(); + unsafe { v.set_len(page_size as usize) }; + v + }; + + let objs = { + let mut objs = Vec::new(); + let mut dir = tokio::fs::read_dir(from_dir).await.unwrap(); + while let Some(entry) = dir.next_entry().await.unwrap() { + let path = entry.path(); + if let Some(file_name) = path.file_name() { + if let Some(file_name) = file_name.to_str() { + if file_name.ends_with(".gz") + || file_name.ends_with(".zstd") + || file_name.ends_with(".raw") + { + objs.push(path); + } + } + } + } + objs.sort(); + objs.into_iter() + }; + + let mut last_received_frame_no = 0; + let mut pending_pages = + TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into()); + + let mut checksum: Option = Some(checksum); + for obj in objs { + let key = obj.file_name().unwrap().to_str().unwrap(); + tracing::debug!("Loading {}", key); + + let (first_frame_no, _last_frame_no, _timestamp, compression_kind) = + match bottomless::replicator::Replicator::parse_frame_range(&format!("/{key}")) { + Some(result) => result, + None => { + if key != "db.gz" && key != "db.zstd" && key != "db.db" { + tracing::warn!("Failed to parse frame/page from key {}", key); + } + continue; + } + }; + if first_frame_no != last_received_frame_no + 1 { + tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process", + last_received_frame_no, first_frame_no); + break; + } + // read frame from the file - from_dir and `obj` dir entry compose the path to it + let frame = tokio::fs::File::open(&obj).await?; + + let mut frameno = first_frame_no; + let mut reader = bottomless::read::BatchReader::new( + frameno, + frame, + page_size as usize, + compression_kind, + ); + + while let Some(frame) = reader.next_frame_header().await? { + let pgno = frame.pgno(); + reader.next_page(&mut page_buf).await?; + if let Some(ck) = checksum { + checksum = match frame.verify(ck, &page_buf) { + Ok(checksum) => Some(checksum), + Err(e) => { + println!("ERROR: failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated"); + tracing::error!("Failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated"); + None + } + }; + } + pending_pages.insert(pgno, &page_buf).await?; + if frame.is_committed() { + let pending_pages = std::mem::replace( + &mut pending_pages, + TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into()), + ); + pending_pages.flush(db).await?; + } + frameno += 1; + last_received_frame_no += 1; + } + db.flush().await?; + } + Ok(last_received_frame_no) + } + + pub async fn get_local_metadata( + from_dir: impl AsRef, + ) -> Result> { + use bytes::Buf; + + if let Ok(data) = tokio::fs::read(from_dir.as_ref().join(".meta")).await { + let mut data = bytes::Bytes::from(data); + let page_size = data.get_u32(); + let crc = data.get_u64(); + Ok(Some((page_size, crc))) + } else { + Ok(None) + } + } } diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 8fea6fb8ca..82f55bebef 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -23,6 +23,7 @@ arc-swap = "1.6" chrono = "0.4.23" uuid = "1.4.1" rand = "0.8.5" +futures-core = "0.3.29" [features] libsql_linked_statically = [] diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 833b1ec809..d2ed880987 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -5,11 +5,11 @@ mod ffi; mod backup; -mod read; +pub mod read; pub mod replicator; -mod transaction_cache; +pub mod transaction_cache; pub mod uuid_utils; -mod wal; +pub mod wal; use crate::ffi::{ bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal, diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 5f53532cac..7cdfbc12fe 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -2,15 +2,13 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; -use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; -use tokio_util::io::StreamReader; type AsyncByteReader = dyn AsyncRead + Send + Sync; -pub(crate) struct BatchReader { +pub struct BatchReader { reader: Pin>, next_frame_no: u32, } @@ -18,12 +16,11 @@ pub(crate) struct BatchReader { impl BatchReader { pub fn new( init_frame_no: u32, - content: ByteStream, + content_stream: impl AsyncRead + Send + Sync + 'static, page_size: usize, use_compression: CompressionKind, ) -> Self { - let reader = - BufReader::with_capacity(page_size + WalFrameHeader::SIZE, StreamReader::new(content)); + let reader = BufReader::with_capacity(page_size + WalFrameHeader::SIZE, content_stream); BatchReader { next_frame_no: init_frame_no, reader: match use_compression { @@ -41,7 +38,7 @@ impl BatchReader { } /// Reads next frame header without frame body (WAL page). - pub(crate) async fn next_frame_header(&mut self) -> Result> { + pub async fn next_frame_header(&mut self) -> Result> { let mut buf = [0u8; WalFrameHeader::SIZE]; let res = self.reader.read_exact(&mut buf).await; match res { @@ -53,7 +50,7 @@ impl BatchReader { /// Reads the next frame stored in a current batch. /// Returns a frame number or `None` if no frame was remaining in the buffer. - pub(crate) async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> { + pub async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> { self.reader.read_exact(page_buf).await?; self.next_frame_no += 1; Ok(()) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 9b3613e0ff..0da2dae4be 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -381,6 +381,10 @@ impl Replicator { self.last_sent_frame_no.load(Ordering::Acquire) } + pub fn compression_kind(&self) -> CompressionKind { + self.use_compression + } + pub async fn wait_until_snapshotted(&mut self) -> Result { if let Ok(generation) = self.generation() { if !self.main_db_exists_and_not_empty().await { @@ -963,7 +967,7 @@ impl Replicator { // Parses the frame and page number from given key. // Format: -/--. - fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> { + pub fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> { let frame_delim = key.rfind('/')?; let frame_suffix = &key[(frame_delim + 1)..]; let timestamp_delim = frame_suffix.rfind('-')?; @@ -1329,8 +1333,12 @@ impl Replicator { } let frame = self.get_object(key.into()).send().await?; let mut frameno = first_frame_no; - let mut reader = - BatchReader::new(frameno, frame.body, self.page_size, compression_kind); + let mut reader = BatchReader::new( + frameno, + tokio_util::io::StreamReader::new(frame.body), + self.page_size, + compression_kind, + ); while let Some(frame) = reader.next_frame_header().await? { let pgno = frame.pgno(); diff --git a/bottomless/src/transaction_cache.rs b/bottomless/src/transaction_cache.rs index 78fdc83982..b9e5ffd7cd 100644 --- a/bottomless/src/transaction_cache.rs +++ b/bottomless/src/transaction_cache.rs @@ -7,7 +7,7 @@ use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; #[derive(Debug)] -pub(crate) struct TransactionPageCache { +pub struct TransactionPageCache { /// Threshold (in pages) after which, the cache will start flushing pages on disk. swap_after_pages: u32, page_size: u32, diff --git a/bottomless/src/wal.rs b/bottomless/src/wal.rs index 25ab5db396..9b2e008d01 100644 --- a/bottomless/src/wal.rs +++ b/bottomless/src/wal.rs @@ -6,7 +6,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite}; #[repr(transparent)] #[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct WalFrameHeader([u8; WalFrameHeader::SIZE]); +pub struct WalFrameHeader([u8; WalFrameHeader::SIZE]); impl WalFrameHeader { pub const SIZE: usize = 24;