Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bottomless: less bugs more robustness #1685

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion bottomless/src/bottomless_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {

let num_frames =
wrapped.insert_frames(page_size, page_headers, size_after, is_commit, sync_flags)?;
let new_valid_valid_frame_index = wrapped.frames_in_wal();

let mut guard = self.replicator.blocking_lock();
match &mut *guard {
Expand All @@ -83,11 +84,11 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
std::process::abort()
}
replicator.register_last_valid_frame(last_valid_frame);
let new_valid_valid_frame_index = wrapped.frames_in_wal();
replicator.submit_frames(new_valid_valid_frame_index - last_valid_frame);
}
None => return Err(Error::new(SQLITE_IOERR_WRITE)),
}
tracing::debug!("inserted {num_frames} with size_after={size_after}, is_commit={is_commit} ({last_valid_frame} -> {new_valid_valid_frame_index})");

Ok(num_frames)
}
Expand Down
1 change: 1 addition & 0 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod completion_progress;
pub mod read;
pub mod replicator;
pub mod transaction_cache;
mod utils;
pub mod uuid_utils;
mod wal;

Expand Down
155 changes: 131 additions & 24 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::backup::WalCopier;
use crate::completion_progress::{CompletionProgress, SavepointTracker};
use crate::read::BatchReader;
use crate::utils;
use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::{anyhow, bail};
Expand Down Expand Up @@ -61,6 +62,7 @@ pub struct Replicator {
pub page_size: usize,
generation: Arc<ArcSwapOption<Uuid>>,
verify_crc: bool,
validate_integrity: bool,
pub bucket: String,
pub db_path: String,
pub db_name: String,
Expand Down Expand Up @@ -93,6 +95,8 @@ pub struct Options {
/// If `true` when restoring, frames checksums will be verified prior their pages being flushed
/// into the main database file.
pub verify_crc: bool,
/// If `true` when restoring, db integrity will be verified before finishing the process
pub validate_integrity: bool,
/// Kind of compression algorithm used on the WAL frames to be sent to S3.
pub use_compression: CompressionKind,
pub encryption_config: Option<EncryptionConfig>,
Expand Down Expand Up @@ -216,6 +220,17 @@ impl Options {
other
),
};
let validate_integrity = match env_var_or("LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY", true)
.to_lowercase()
.as_ref()
{
"yes" | "true" | "1" | "y" | "t" => true,
"no" | "false" | "0" | "n" | "f" => false,
other => bail!(
"Invalid LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY environment variable: {}",
other
),
};
let skip_snapshot = match env_var_or("LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT", false)
.to_lowercase()
.as_ref()
Expand All @@ -240,6 +255,7 @@ impl Options {
db_id,
create_bucket_if_not_exists: true,
verify_crc,
validate_integrity,
use_compression,
encryption_config,
max_batch_interval,
Expand Down Expand Up @@ -377,7 +393,11 @@ impl Replicator {
.send()
.await
{
tracing::error!("Failed to send {} to S3: {}", fpath, e);
utils::caution!(
"Failed to send {} to S3: {} (this will lead to gaps in frame ranges)",
fpath,
e,
);
} else {
tokio::fs::remove_file(&fpath).await.unwrap();
let elapsed = Instant::now() - start;
Expand Down Expand Up @@ -405,6 +425,7 @@ impl Replicator {
flush_trigger: Some(flush_trigger),
last_committed_frame_no,
verify_crc: options.verify_crc,
validate_integrity: options.validate_integrity,
db_path,
db_name,
snapshot_waiter,
Expand Down Expand Up @@ -699,11 +720,11 @@ impl Replicator {
)));
tokio::spawn(async move {
if let Err(e) = request.send().await {
tracing::error!(
"Failed to store dependency between generations {} -> {}: {}",
utils::caution!(
"Failed to store dependency between generations {} -> {}: {} (this will lead to broken dependency chain)",
prev,
curr,
e
e,
);
} else {
tracing::trace!(
Expand Down Expand Up @@ -1222,7 +1243,7 @@ impl Replicator {
let elapsed = Instant::now() - start_ts;
tracing::info!("Finished database restoration in {:?}", elapsed);
tokio::fs::rename(&restore_path, &self.db_path).await?;
let _ = self.remove_wal_files().await; // best effort, WAL files may not exists
let _ = self.remove_wal_files(&self.db_name).await; // best effort, WAL files may not exists
Ok(result)
}
Err(e) => {
Expand Down Expand Up @@ -1450,6 +1471,7 @@ impl Replicator {
utc_time: Option<NaiveDateTime>,
db_path: &Path,
) -> Result<bool> {
tracing::debug!("restore wal from generation {generation}");
let encryption_config = self.encryption_config.clone();
let mut injector = libsql_replication::injector::SqliteInjector::new(
db_path.to_path_buf(),
Expand All @@ -1465,8 +1487,11 @@ impl Replicator {
unsafe { v.set_len(page_size) };
v
};

let mut next_marker = None;
let mut applied_wal_frame = false;
let mut last_injected_frame_no = 0;
let mut last_seen_frame_no = 0;
'restore_wal: loop {
let mut list_request = self.list_objects().prefix(&prefix);
if let Some(marker) = next_marker {
Expand All @@ -1481,34 +1506,67 @@ impl Replicator {
break;
}

let mut last_received_frame_no = 0;
for obj in objs {
let key = obj
.key()
.ok_or_else(|| anyhow::anyhow!("Failed to get key for an object"))?;
tracing::debug!("Loading {}", key);

let (first_frame_no, last_frame_no, timestamp, compression_kind) =
match Self::parse_frame_range(key) {
Some(result) => result,
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".zstd")
&& !key.ends_with(".db")
&& !key.ends_with(".meta")
// if this looks like a frame range from WAL - we must be able to parse
// range from it
if !key.ends_with(".meta")
&& !key.ends_with(".dep")
&& !key.ends_with(".changecounter")
&& !key.ends_with("db.gz")
&& !key.ends_with("db.zstd")
&& !key.ends_with("db.raw")
{
tracing::warn!("Failed to parse frame/page from key {}", key);
utils::caution!(
"Failed to parse frame/page: db_name={}, generation={}, key={}",
&self.db_name,
&generation,
key
);
}
continue;
}
};
if first_frame_no != last_received_frame_no + 1 {
tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process",
last_received_frame_no, first_frame_no);
tracing::debug!(
"loading object: key={}, first_frame_no={}, last_frame_no={}, timestamp={}, compression={}",
key,
first_frame_no,
last_frame_no,
timestamp,
compression_kind,
);
if first_frame_no != last_injected_frame_no + 1 {
utils::caution!(
"Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process: db_name={}, generation={} (this can lead to inconsistent restore)",
last_injected_frame_no,
first_frame_no,
&self.db_name,
&generation
);
break;
} else if first_frame_no != last_seen_frame_no + 1 {
// there can be the case that bottomless has several overlapping frame ranges
// but one of them is empty - so we will ignore it (as it has no frames)
// for example:
// 9 bytes (empty zstd frame) .../000000000001-000000000001-1724236016.zstd
// 878 bytes .../000000000001-000000000001-1724236022.zstd
// but it's good to detect such cases
utils::caution!(
"detected series of non-consecutive frames: last_seen_frame_no={}, first_frame_no={}: db_name={}, generation={} (this can lead to inconsistent restore)",
last_seen_frame_no,
first_frame_no,
&self.db_name,
&generation
);
}
last_seen_frame_no = last_frame_no;

if let Some(frame) = last_consistent_frame {
if last_frame_no > frame {
tracing::warn!("Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process",
Expand All @@ -1525,7 +1583,12 @@ impl Replicator {
}
}
_ => {
tracing::trace!("Couldn't parse requested frame batch {} timestamp. Stopping recovery.", key);
utils::caution!(
"Couldn't parse requested frame batch {} timestamp. Stopping recovery: db_name={}, generation={}",
&self.db_name,
&generation,
key
);
break 'restore_wal;
}
}
Expand All @@ -1539,7 +1602,7 @@ impl Replicator {
);

while let Some(frame) = reader.next_frame_header().await? {
last_received_frame_no = reader.next_frame_no();
last_injected_frame_no = reader.next_frame_no();
reader.next_page(&mut page_buf).await?;
if self.verify_crc {
checksum = frame.verify(checksum, &page_buf)?;
Expand All @@ -1548,13 +1611,18 @@ impl Replicator {
let checksum = (crc1 as u64) << 32 | crc2 as u64;
let frame_to_inject = libsql_replication::frame::Frame::from_parts(
&libsql_replication::frame::FrameHeader {
frame_no: (last_received_frame_no as u64).into(),
frame_no: (last_injected_frame_no as u64).into(),
checksum: checksum.into(),
page_no: frame.pgno().into(),
size_after: frame.size_after().into(),
},
page_buf.as_slice(),
);
tracing::debug!(
"ready to inject frame: pgno={}, sizeafter={}",
frame.pgno(),
frame.size_after()
);
let frame = RpcFrame {
data: frame_to_inject.bytes(),
timestamp: None,
Expand All @@ -1577,13 +1645,48 @@ impl Replicator {
break;
}
}
if self.validate_integrity {
if let Err(e) = injector.validate_integrity() {
utils::caution!(
"found integrity issues: {}, db_name={}, generation={}",
e,
&self.db_name,
&generation
);
return Err(anyhow!("DB is not correct"));
}
}
// drop of injector will cause drop&close of last DB connection which will perform final
// WAL checkpoint of the DB
drop(injector);

let db_path_str = db_path
.to_str()
.ok_or(anyhow!("failed to convert db path to string"))?;
let db_wal_file_path = format!("{}-wal", &db_path_str);
let db_wal_index_path = format!("{}-shm", &db_path_str);
let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?;
let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?;
if has_wal_file || has_wal_index {
// restore process was not finished successfully as WAL wasn't transferred completely
tracing::error!(
"WAL wasn't transferred completely during restoration: db_name={}, generation={}",
&self.db_name,
&generation
);
let _ = self
.remove_wal_files(&db_path_str)
.await
.inspect_err(|e| tracing::error!("unable to remove wal files: {}", e));
return Err(anyhow!("WAL wasn't transferred completely"));
}
Ok(applied_wal_frame)
}

async fn remove_wal_files(&self) -> Result<()> {
tracing::debug!("Overwriting any existing WAL file: {}-wal", &self.db_path);
tokio::fs::remove_file(&format!("{}-wal", &self.db_path)).await?;
tokio::fs::remove_file(&format!("{}-shm", &self.db_path)).await?;
async fn remove_wal_files(&self, db_path: &str) -> Result<()> {
tracing::debug!("Remove any existing WAL file: {}-wal", &db_path);
tokio::fs::remove_file(&format!("{}-wal", &db_path)).await?;
tokio::fs::remove_file(&format!("{}-shm", &db_path)).await?;
Ok(())
}

Expand Down Expand Up @@ -1672,7 +1775,11 @@ impl Replicator {
.send()
.await
{
tracing::error!("Failed to send {} to S3: {}", key, e);
utils::caution!(
"Failed to send {} to S3: {} (this will lead to gaps in the frame ranges)",
key,
e,
);
} else {
tokio::fs::remove_file(&fpath).await.unwrap();
tracing::trace!("Uploaded to S3: {}", key);
Expand Down
10 changes: 10 additions & 0 deletions bottomless/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
macro_rules! caution {
($msg:expr) => {
tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg));
};
($msg:expr, $($arg:tt)*) => {
tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg), $($arg)*);
};
}

pub(crate) use caution;
22 changes: 22 additions & 0 deletions libsql-replication/src/injector/sqlite_injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::{collections::VecDeque, path::PathBuf};

use parking_lot::Mutex;
use rusqlite::ffi::SQLITE_ERROR;
use rusqlite::OpenFlags;
use tokio::task::spawn_blocking;

Expand Down Expand Up @@ -65,6 +66,10 @@ impl SqliteInjector {
inner: Arc::new(Mutex::new(inner)),
})
}

pub fn validate_integrity(&mut self) -> Result<()> {
self.inner.lock().validate_integrity()
}
}

pub(in super::super) struct SqliteInjectorInner {
Expand Down Expand Up @@ -124,6 +129,23 @@ impl SqliteInjectorInner {
})
}

pub fn validate_integrity(&mut self) -> Result<()> {
self.connection
.lock()
.pragma_query(None, "integrity_check", |row| {
let row: String = row.get(0)?;
if row != "ok" {
Err(rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(SQLITE_ERROR),
Some(format!("found integrity issues: {row}")),
))
} else {
Ok(())
}
})?;
Ok(())
}

/// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)).
pub fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>, Error> {
let frame_close_txn = frame.header().size_after.get() != 0;
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub async fn metastore_connection_maker(
let options = bottomless::replicator::Options {
create_bucket_if_not_exists: true,
verify_crc: true,
validate_integrity: false,
use_compression: CompressionKind::None,
encryption_config: None,
aws_endpoint: Some(config.bucket_endpoint),
Expand Down
Loading
Loading