Skip to content

Commit

Permalink
bottomless-cli: add restoring from a directory
Browse files Browse the repository at this point in the history
Convenience function for manual restoration.
  • Loading branch information
psarna committed Nov 9, 2023
1 parent de02b00 commit 4a71b20
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 16 deletions.
2 changes: 2 additions & 0 deletions bottomless-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ description = "Command-line interface for bottomless replication for libSQL"

[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] }
aws-config = "0.55"
aws-sdk-s3 = "0.28"
aws-smithy-types = "0.55"
bottomless = { version = "0", path = "../bottomless" }
bytes = "1"
chrono = "0.4.23"
clap = { version = "4.0.29", features = ["derive"] }
tokio = { version = "1.23.0", features = ["macros", "rt", "rt-multi-thread"] }
Expand Down
48 changes: 48 additions & 0 deletions bottomless-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ enum Commands {
long_help = "UTC timestamp which is an upper bound for the transactions to be restored."
)]
utc_time: Option<NaiveDateTime>,
#[clap(long, short, conflicts_with_all = ["generation", "utc_time"], long_help = "Restore from a local directory")]
from_dir: Option<PathBuf>,
},
#[clap(about = "Verify integrity of the database")]
Verify {
Expand Down Expand Up @@ -107,6 +109,51 @@ async fn run() -> Result<()> {
tracing_subscriber::fmt::init();
let mut options = Cli::parse();

if let Commands::Restore {
generation: _,
utc_time: _,
from_dir: Some(from_dir),
} = options.command
{
let database = match &options.database {
Some(database) => database,
None => {
println!("Please pass the database name with -d option");
return Ok(());
}
};
println!("trying to restore from {}", from_dir.display());
let mut db_file = tokio::fs::File::create(database).await?;
let (page_size, checksum) = match Replicator::get_local_metadata(&from_dir).await {
Ok(Some((page_size, checksum))) => (page_size, checksum),
Ok(None) => {
println!("No local metadata found, continuing anyway");
(4096, 0)
}
Err(e) => {
println!("Failed to get local metadata: {e}, continuing anyway");
(4096, 0)
}
};
println!("Local metadata: page_size={page_size}, checksum={checksum:x}");
Replicator::restore_from_local_snapshot(&from_dir, &mut db_file).await?;
println!("Restored local snapshot to {}", database);
let applied_frames = Replicator::apply_wal_from_local_generation(
&from_dir,
&mut db_file,
page_size,
checksum,
)
.await?;
println!("Applied {applied_frames} frames from local generation");
if let Err(e) = verify_db(&PathBuf::from(database)) {
println!("Verification failed: {e}");
std::process::exit(1)
}
println!("Verification: ok");
return Ok(());
}

if let Some(ep) = options.endpoint.as_deref() {
std::env::set_var("LIBSQL_BOTTOMLESS_ENDPOINT", ep)
} else {
Expand Down Expand Up @@ -166,6 +213,7 @@ async fn run() -> Result<()> {
Commands::Restore {
generation,
utc_time,
..
} => {
tokio::fs::create_dir_all(&database_dir).await?;
client.restore(generation, utc_time).await?;
Expand Down
166 changes: 166 additions & 0 deletions bottomless-cli/src/replicator_extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,170 @@ impl Replicator {
self.print_snapshot_summary(&generation).await?;
Ok(())
}

pub async fn restore_from_local_snapshot(
from_dir: impl AsRef<std::path::Path>,
db: &mut tokio::fs::File,
) -> Result<bool> {
let from_dir = from_dir.as_ref();
use bottomless::replicator::CompressionKind;
use tokio::io::AsyncWriteExt;

let algos_to_try = &[
CompressionKind::Gzip,
CompressionKind::Zstd,
CompressionKind::None,
];

for algo in algos_to_try {
let main_db_path = match algo {
CompressionKind::None => from_dir.join("db.db"),
CompressionKind::Gzip => from_dir.join("db.gz"),
CompressionKind::Zstd => from_dir.join("db.zstd"),
};
if let Ok(mut db_file) = tokio::fs::File::open(&main_db_path).await {
let db_size = match algo {
CompressionKind::None => tokio::io::copy(&mut db_file, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader =
async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(db_file),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Zstd => {
let mut decompress_reader =
async_compression::tokio::bufread::ZstdDecoder::new(
tokio::io::BufReader::new(db_file),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;

tracing::info!("Restored the main database file ({} bytes)", db_size);
return Ok(true);
}
}
Ok(false)
}

pub async fn apply_wal_from_local_generation(
from_dir: impl AsRef<std::path::Path>,
db: &mut tokio::fs::File,
page_size: u32,
checksum: u64,
) -> Result<u32> {
use bottomless::transaction_cache::TransactionPageCache;
use tokio::io::AsyncWriteExt;

const SWAP_AFTER: u32 = 65536;
const TMP_RESTORE_DIR: &str = ".bottomless.restore.tmp";

let from_dir = from_dir.as_ref();
let mut page_buf = {
let mut v = Vec::with_capacity(page_size as usize);
v.spare_capacity_mut();
unsafe { v.set_len(page_size as usize) };
v
};

let objs = {
let mut objs = Vec::new();
let mut dir = tokio::fs::read_dir(from_dir).await.unwrap();
while let Some(entry) = dir.next_entry().await.unwrap() {
let path = entry.path();
if let Some(file_name) = path.file_name() {
if let Some(file_name) = file_name.to_str() {
if file_name.ends_with(".gz")
|| file_name.ends_with(".zstd")
|| file_name.ends_with(".raw")
{
objs.push(path);
}
}
}
}
objs.sort();
objs.into_iter()
};

let mut last_received_frame_no = 0;
let mut pending_pages =
TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into());

let mut checksum: Option<u64> = Some(checksum);
for obj in objs {
let key = obj.file_name().unwrap().to_str().unwrap();
tracing::debug!("Loading {}", key);

let (first_frame_no, _last_frame_no, _timestamp, compression_kind) =
match bottomless::replicator::Replicator::parse_frame_range(&format!("/{key}")) {
Some(result) => result,
None => {
if key != "db.gz" && key != "db.zstd" && key != "db.db" {
tracing::warn!("Failed to parse frame/page from key {}", key);
}
continue;
}
};
if first_frame_no != last_received_frame_no + 1 {
tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process",
last_received_frame_no, first_frame_no);
break;
}
// read frame from the file - from_dir and `obj` dir entry compose the path to it
let frame = tokio::fs::File::open(&obj).await?;

let mut frameno = first_frame_no;
let mut reader = bottomless::read::BatchReader::new(
frameno,
frame,
page_size as usize,
compression_kind,
);

while let Some(frame) = reader.next_frame_header().await? {
let pgno = frame.pgno();
reader.next_page(&mut page_buf).await?;
if let Some(ck) = checksum {
checksum = match frame.verify(ck, &page_buf) {
Ok(checksum) => Some(checksum),
Err(e) => {
println!("ERROR: failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated");
tracing::error!("Failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated");
None
}
};
}
pending_pages.insert(pgno, &page_buf).await?;
if frame.is_committed() {
let pending_pages = std::mem::replace(
&mut pending_pages,
TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into()),
);
pending_pages.flush(db).await?;
}
frameno += 1;
last_received_frame_no += 1;
}
db.flush().await?;
}
Ok(last_received_frame_no)
}

pub async fn get_local_metadata(
from_dir: impl AsRef<std::path::Path>,
) -> Result<Option<(u32, u64)>> {
use bytes::Buf;

if let Ok(data) = tokio::fs::read(from_dir.as_ref().join(".meta")).await {
let mut data = bytes::Bytes::from(data);
let page_size = data.get_u32();
let crc = data.get_u64();
Ok(Some((page_size, crc)))
} else {
Ok(None)
}
}
}
1 change: 1 addition & 0 deletions bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ arc-swap = "1.6"
chrono = "0.4.23"
uuid = "1.4.1"
rand = "0.8.5"
futures-core = "0.3.29"

[features]
libsql_linked_statically = []
Expand Down
6 changes: 3 additions & 3 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
mod ffi;

mod backup;
mod read;
pub mod read;
pub mod replicator;
mod transaction_cache;
pub mod transaction_cache;
pub mod uuid_utils;
mod wal;
pub mod wal;

use crate::ffi::{
bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal,
Expand Down
13 changes: 5 additions & 8 deletions bottomless/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@ use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use tokio_util::io::StreamReader;

type AsyncByteReader = dyn AsyncRead + Send + Sync;

pub(crate) struct BatchReader {
pub struct BatchReader {
reader: Pin<Box<AsyncByteReader>>,
next_frame_no: u32,
}

impl BatchReader {
pub fn new(
init_frame_no: u32,
content: ByteStream,
content_stream: impl AsyncRead + Send + Sync + 'static,
page_size: usize,
use_compression: CompressionKind,
) -> Self {
let reader =
BufReader::with_capacity(page_size + WalFrameHeader::SIZE, StreamReader::new(content));
let reader = BufReader::with_capacity(page_size + WalFrameHeader::SIZE, content_stream);
BatchReader {
next_frame_no: init_frame_no,
reader: match use_compression {
Expand All @@ -41,7 +38,7 @@ impl BatchReader {
}

/// Reads next frame header without frame body (WAL page).
pub(crate) async fn next_frame_header(&mut self) -> Result<Option<WalFrameHeader>> {
pub async fn next_frame_header(&mut self) -> Result<Option<WalFrameHeader>> {
let mut buf = [0u8; WalFrameHeader::SIZE];
let res = self.reader.read_exact(&mut buf).await;
match res {
Expand All @@ -53,7 +50,7 @@ impl BatchReader {

/// Reads the next frame stored in a current batch.
/// Returns a frame number or `None` if no frame was remaining in the buffer.
pub(crate) async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> {
pub async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> {
self.reader.read_exact(page_buf).await?;
self.next_frame_no += 1;
Ok(())
Expand Down
14 changes: 11 additions & 3 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ impl Replicator {
self.last_sent_frame_no.load(Ordering::Acquire)
}

pub fn compression_kind(&self) -> CompressionKind {
self.use_compression
}

pub async fn wait_until_snapshotted(&mut self) -> Result<bool> {
if let Ok(generation) = self.generation() {
if !self.main_db_exists_and_not_empty().await {
Expand Down Expand Up @@ -963,7 +967,7 @@ impl Replicator {

// Parses the frame and page number from given key.
// Format: <db-name>-<generation>/<first-frame-no>-<last-frame-no>-<timestamp>.<compression-kind>
fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> {
pub fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> {
let frame_delim = key.rfind('/')?;
let frame_suffix = &key[(frame_delim + 1)..];
let timestamp_delim = frame_suffix.rfind('-')?;
Expand Down Expand Up @@ -1329,8 +1333,12 @@ impl Replicator {
}
let frame = self.get_object(key.into()).send().await?;
let mut frameno = first_frame_no;
let mut reader =
BatchReader::new(frameno, frame.body, self.page_size, compression_kind);
let mut reader = BatchReader::new(
frameno,
tokio_util::io::StreamReader::new(frame.body),
self.page_size,
compression_kind,
);

while let Some(frame) = reader.next_frame_header().await? {
let pgno = frame.pgno();
Expand Down
2 changes: 1 addition & 1 deletion bottomless/src/transaction_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};

#[derive(Debug)]
pub(crate) struct TransactionPageCache {
pub struct TransactionPageCache {
/// Threshold (in pages) after which, the cache will start flushing pages on disk.
swap_after_pages: u32,
page_size: u32,
Expand Down
2 changes: 1 addition & 1 deletion bottomless/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite};

#[repr(transparent)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct WalFrameHeader([u8; WalFrameHeader::SIZE]);
pub struct WalFrameHeader([u8; WalFrameHeader::SIZE]);

impl WalFrameHeader {
pub const SIZE: usize = 24;
Expand Down

0 comments on commit 4a71b20

Please sign in to comment.