From f88e6992bb48d879d08ca625dfa8408f0db6871d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 12:39:50 +0400 Subject: [PATCH 01/15] fix wal restore process for situation where S3 has more than 1 page of data (1000+) --- bottomless/src/replicator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index cd37a70165..cd5234159e 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1467,6 +1467,7 @@ impl Replicator { }; let mut next_marker = None; let mut applied_wal_frame = false; + let mut last_received_frame_no = 0; 'restore_wal: loop { let mut list_request = self.list_objects().prefix(&prefix); if let Some(marker) = next_marker { @@ -1481,7 +1482,6 @@ impl Replicator { break; } - let mut last_received_frame_no = 0; for obj in objs { let key = obj .key() From f41bc58dbd1a61a570a967e27faf50a1e766e0fd Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 12:40:21 +0400 Subject: [PATCH 02/15] validate that WAL was transferred completely to the DB after restore - now we are relying on the fact that last SQLite connection will perform checkpoint this is fragile because if DB + WAL malformed somehow - SQLite will exit silently - one way to resolve this issue is to trigger wal_checkpoint(TRUNCATE) manually, but this potentially can interfere with bottomless somehow - so, more robust way to resolve this issue were implemented: we just check that WAL was transfered and -wal + -shm files were deleted. If no - we abort restore process --- bottomless/src/replicator.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index cd5234159e..2ba143e19c 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1577,6 +1577,21 @@ impl Replicator { break; } } + // drop of injector will cause drop&close of last DB connection which will perform final + // WAL checkpoint of the DB + drop(injector); + + let db_path_str = db_path.to_str().ok_or(anyhow!("failed to convert db path to string"))?; + let db_wal_file_path = format!("{}-wal", &db_path_str); + let db_wal_index_path = format!("{}-shm", &db_path_str); + let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?; + let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?; + if has_wal_file || has_wal_index { + let _ = tokio::fs::remove_file(db_wal_file_path).await.inspect_err(|e| tracing::error!("unable to remove wal file: {}", e)); + let _ = tokio::fs::remove_file(db_wal_index_path).await.inspect_err(|e| tracing::error!("unable to remove wal index file: {}", e)); + // restore process was not finished successfully as WAL wasn't transferred completely + return Err(anyhow!("WAL wasn't transferred completely")); + } Ok(applied_wal_frame) } From d49154b7a5d3c2f0a14a39f72aeb6aae71b3be62 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 12:43:05 +0400 Subject: [PATCH 03/15] add logs in bottomless wrapper --- bottomless/src/bottomless_wal.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bottomless/src/bottomless_wal.rs b/bottomless/src/bottomless_wal.rs index 98fd59ce16..9cbee2732a 100644 --- a/bottomless/src/bottomless_wal.rs +++ b/bottomless/src/bottomless_wal.rs @@ -74,6 +74,7 @@ impl WrapWal for BottomlessWalWrapper { let num_frames = wrapped.insert_frames(page_size, page_headers, size_after, is_commit, sync_flags)?; + let new_valid_valid_frame_index = wrapped.frames_in_wal(); let mut guard = self.replicator.blocking_lock(); match &mut *guard { @@ -83,11 +84,11 @@ impl WrapWal for BottomlessWalWrapper { std::process::abort() } replicator.register_last_valid_frame(last_valid_frame); - let new_valid_valid_frame_index = wrapped.frames_in_wal(); replicator.submit_frames(new_valid_valid_frame_index - last_valid_frame); } None => return Err(Error::new(SQLITE_IOERR_WRITE)), } + tracing::debug!("inserted {num_frames} with size_after={size_after}, is_commit={is_commit} ({last_valid_frame} -> {new_valid_valid_frame_index})"); Ok(num_frames) } From c399ea5a4e958af514b5b1055f2c354eec8bcf6c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 14:42:51 +0400 Subject: [PATCH 04/15] do not restore backup if we detected issues with it --- bottomless/src/replicator.rs | 54 ++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 2ba143e19c..0a47ab1874 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1222,7 +1222,7 @@ impl Replicator { let elapsed = Instant::now() - start_ts; tracing::info!("Finished database restoration in {:?}", elapsed); tokio::fs::rename(&restore_path, &self.db_path).await?; - let _ = self.remove_wal_files().await; // best effort, WAL files may not exists + let _ = self.remove_wal_files(&self.db_name).await; // best effort, WAL files may not exists Ok(result) } Err(e) => { @@ -1465,6 +1465,16 @@ impl Replicator { unsafe { v.set_len(page_size) }; v }; + let db_path_str = db_path + .to_str() + .ok_or(anyhow!("failed to convert db path to string"))?; + let cleanup = || async move { + let _ = self + .remove_wal_files(&db_path_str) + .await + .inspect_err(|e| tracing::error!("unable to remove wal files: {}", e)); + }; + let mut next_marker = None; let mut applied_wal_frame = false; let mut last_received_frame_no = 0; @@ -1493,6 +1503,7 @@ impl Replicator { Some(result) => result, None => { if !key.ends_with(".gz") + && !key.ends_with(".raw") && !key.ends_with(".zstd") && !key.ends_with(".db") && !key.ends_with(".meta") @@ -1505,15 +1516,27 @@ impl Replicator { } }; if first_frame_no != last_received_frame_no + 1 { - tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process", - last_received_frame_no, first_frame_no); - break; + tracing::error!( + "Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process: db_name={}, generation={}", + last_received_frame_no, + first_frame_no, + &self.db_name, + &generation + ); + cleanup().await; + return Err(anyhow!("WAL frame series is inconsistent")); } if let Some(frame) = last_consistent_frame { if last_frame_no > frame { - tracing::warn!("Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process", - last_frame_no, frame); - break; + tracing::error!( + "Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process: db_name={}, generation={}", + last_frame_no, + frame, + &self.db_name, + &generation + ); + cleanup().await; + return Err(anyhow!("WAL frame is larget than last consistent frame")); } } if let Some(threshold) = utc_time.as_ref() { @@ -1581,24 +1604,27 @@ impl Replicator { // WAL checkpoint of the DB drop(injector); - let db_path_str = db_path.to_str().ok_or(anyhow!("failed to convert db path to string"))?; let db_wal_file_path = format!("{}-wal", &db_path_str); let db_wal_index_path = format!("{}-shm", &db_path_str); let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?; let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?; if has_wal_file || has_wal_index { - let _ = tokio::fs::remove_file(db_wal_file_path).await.inspect_err(|e| tracing::error!("unable to remove wal file: {}", e)); - let _ = tokio::fs::remove_file(db_wal_index_path).await.inspect_err(|e| tracing::error!("unable to remove wal index file: {}", e)); // restore process was not finished successfully as WAL wasn't transferred completely + tracing::error!( + "WAL wasn't transferred completely during restoration: db_name={}, generation={}", + &self.db_name, + &generation + ); + cleanup().await; return Err(anyhow!("WAL wasn't transferred completely")); } Ok(applied_wal_frame) } - async fn remove_wal_files(&self) -> Result<()> { - tracing::debug!("Overwriting any existing WAL file: {}-wal", &self.db_path); - tokio::fs::remove_file(&format!("{}-wal", &self.db_path)).await?; - tokio::fs::remove_file(&format!("{}-shm", &self.db_path)).await?; + async fn remove_wal_files(&self, db_path: &str) -> Result<()> { + tracing::debug!("Remove any existing WAL file: {}-wal", &db_path); + tokio::fs::remove_file(&format!("{}-wal", &db_path)).await?; + tokio::fs::remove_file(&format!("{}-shm", &db_path)).await?; Ok(()) } From 3445e9ab1932d3880c32e698ac882ef5fc197831 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 14:46:19 +0400 Subject: [PATCH 05/15] add simple test to check that DB will not be restored from broken bottomless backup --- libsql-server/src/test/bottomless.rs | 133 +++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 9 deletions(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index 5f3015e11e..e63ba0bd07 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -10,6 +10,7 @@ use s3s::service::S3ServiceBuilder; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::PathBuf; use std::sync::Once; +use tokio::task::JoinError; use tokio::time::sleep; use tokio::time::Duration; use url::Url; @@ -61,7 +62,7 @@ async fn start_s3_server() { } /// returns a future that once polled will shutdown the server and wait for cleanup -fn start_db(step: u32, server: Server) -> impl Future { +fn start_db(step: u32, server: Server) -> impl Future> { let notify = server.shutdown.clone(); let handle = tokio::spawn(async move { if let Err(e) = server.start().await { @@ -71,7 +72,7 @@ fn start_db(step: u32, server: Server) -> impl Future { async move { notify.notify_waiters(); - handle.await.unwrap(); + handle.await } } @@ -176,7 +177,7 @@ async fn backup_restore() { sleep(Duration::from_secs(2)).await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -195,7 +196,7 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "A").await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -213,7 +214,7 @@ async fn backup_restore() { // wait for WAL to backup sleep(Duration::from_secs(2)).await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -228,7 +229,7 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "B").await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -247,7 +248,121 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "B").await; - db_job.await; + db_job.await.unwrap(); + drop(cleaner); + } +} + +async fn list_bucket(bucket: &str) -> Vec { + let client = s3_client().await.expect("failed to create s3 client"); + let objects = client + .list_objects() + .bucket(bucket) + .prefix("") + .send() + .await + .expect("failed to list objects"); + objects + .contents() + .unwrap() + .iter() + .map(|x| String::from(x.key().unwrap())) + .collect() +} + +#[tokio::test] +async fn do_not_restore_malformed_db() { + let _ = tracing_subscriber::fmt::try_init(); + + start_s3_server().await; + + const DB_ID: &str = "malformedbackup"; + const BUCKET: &str = "malformedbackup"; + const PATH: &str = "malformedbackup.sqld"; + const PORT: u16 = 15003; + + let _ = S3BucketCleaner::new(BUCKET).await; + assert_bucket_occupancy(BUCKET, true).await; + + let listener_addr = format!("0.0.0.0:{}", PORT) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let conn = Url::parse(&format!("http://localhost:{}", PORT)).unwrap(); + let options = bottomless::replicator::Options { + db_id: Some(DB_ID.to_string()), + create_bucket_if_not_exists: true, + verify_crc: true, + use_compression: bottomless::replicator::CompressionKind::None, + bucket_name: BUCKET.to_string(), + max_batch_interval: Duration::from_millis(10), + ..bottomless::replicator::Options::from_env().unwrap() + }; + let make_server = || async { configure_server(&options, listener_addr, PATH).await }; + { + tracing::info!( + "---STEP 1: create db, write rows, remove random S3 files to corrupt backup---" + ); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(1, make_server().await); + + sleep(Duration::from_secs(2)).await; + + let _ = sql( + &conn, + ["CREATE TABLE IF NOT EXISTS t(id INT PRIMARY KEY, name TEXT, payload BLOB);"], + ) + .await + .unwrap(); + for i in 0..128 { + sql( + &conn, + [format!("INSERT INTO t VALUES({i}, '{i}', zeroblob(4096))")], + ) + .await + .expect("SQL query failed"); + } + + tracing::info!("Ready to remove files from S3"); + let client = s3_client().await.expect("failed to create s3 client"); + let mut i = 0; + for key in list_bucket(BUCKET).await { + // delete full snapshot and random wal frame ranges + let should_delete = key.ends_with(".changecounter") + || key.ends_with(".dep") + || key.ends_with("db.raw") + || i % 10 == 0; + + if should_delete { + client + .delete_object() + .bucket(BUCKET) + .key(key) + .send() + .await + .expect("failed to delete object"); + } + i += 1; + } + + db_job.await.unwrap(); + drop(cleaner); + } + + { + sleep(Duration::from_secs(2)).await; + + tracing::info!("---STEP 2: recreate database, check that it is unable to start ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(2, make_server().await); + sleep(Duration::from_secs(2)).await; + + assert!(sql(&conn, ["SELECT COUNT(*) FROM t"]).await.is_err()); + assert!(db_job + .await + .inspect_err(|e| tracing::error!("db process failed: {}", e)) + .is_err()); drop(cleaner); } } @@ -340,7 +455,7 @@ async fn rollback_restore() { "rollback value should not be updated" ); - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -369,7 +484,7 @@ async fn rollback_restore() { ] ); - db_job.await; + db_job.await.unwrap(); drop(cleaner); } } From 77bf7658e1f741777335ff0a6cd200f37d79e293 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 14:58:20 +0400 Subject: [PATCH 06/15] fix test --- libsql-server/src/test/bottomless.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index e63ba0bd07..edc926258f 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -264,7 +264,6 @@ async fn list_bucket(bucket: &str) -> Vec { .expect("failed to list objects"); objects .contents() - .unwrap() .iter() .map(|x| String::from(x.key().unwrap())) .collect() From 04c21dfaab2be496bc92765c8bfc8259388a0d7a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 22:26:28 +0400 Subject: [PATCH 07/15] make bottomless more tolerant to errors --- bottomless/src/lib.rs | 1 + bottomless/src/replicator.rs | 85 +++++++++++++++++++++++------------- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 1a67e92f84..bda942630f 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -2,6 +2,7 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] #![allow(improper_ctypes)] +mod utils; mod backup; pub mod bottomless_wal; mod completion_progress; diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 0a47ab1874..06c587d22d 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1,6 +1,7 @@ use crate::backup::WalCopier; use crate::completion_progress::{CompletionProgress, SavepointTracker}; use crate::read::BatchReader; +use crate::utils; use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; @@ -377,7 +378,7 @@ impl Replicator { .send() .await { - tracing::error!("Failed to send {} to S3: {}", fpath, e); + utils::caution!("Failed to send {} to S3: {} (this will lead to gaps in frame ranges)", fpath, e); } else { tokio::fs::remove_file(&fpath).await.unwrap(); let elapsed = Instant::now() - start; @@ -699,8 +700,8 @@ impl Replicator { ))); tokio::spawn(async move { if let Err(e) = request.send().await { - tracing::error!( - "Failed to store dependency between generations {} -> {}: {}", + utils::caution!( + "Failed to store dependency between generations {} -> {}: {} (this will lead to broken dependency chain)", prev, curr, e @@ -1450,6 +1451,7 @@ impl Replicator { utc_time: Option, db_path: &Path, ) -> Result { + tracing::debug!("restore wal from generation {generation}"); let encryption_config = self.encryption_config.clone(); let mut injector = libsql_replication::injector::SqliteInjector::new( db_path.to_path_buf(), @@ -1477,7 +1479,8 @@ impl Replicator { let mut next_marker = None; let mut applied_wal_frame = false; - let mut last_received_frame_no = 0; + let mut last_injected_frame_no = 0; + let mut last_seen_frame_no = 0; 'restore_wal: loop { let mut list_request = self.list_objects().prefix(&prefix); if let Some(marker) = next_marker { @@ -1496,47 +1499,62 @@ impl Replicator { let key = obj .key() .ok_or_else(|| anyhow::anyhow!("Failed to get key for an object"))?; - tracing::debug!("Loading {}", key); - let (first_frame_no, last_frame_no, timestamp, compression_kind) = match Self::parse_frame_range(key) { Some(result) => result, None => { - if !key.ends_with(".gz") - && !key.ends_with(".raw") - && !key.ends_with(".zstd") - && !key.ends_with(".db") - && !key.ends_with(".meta") + // if this looks like a frame range from WAL - we must be able to parse + // range from it + if !key.ends_with(".meta") && !key.ends_with(".dep") && !key.ends_with(".changecounter") + && !key.ends_with("db.gz") + && !key.ends_with("db.zstd") + && !key.ends_with("db.raw") { - tracing::warn!("Failed to parse frame/page from key {}", key); + utils::caution!( + "Failed to parse frame/page: db_name={}, generation={}, key={}", + &self.db_name, + &generation, + key + ); } continue; } }; - if first_frame_no != last_received_frame_no + 1 { - tracing::error!( - "Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process: db_name={}, generation={}", - last_received_frame_no, + tracing::debug!( + "loading object: key={}, first_frame_no={}, last_frame_no={}, timestamp={}, compression={}", + key, + first_frame_no, + last_frame_no, + timestamp, + compression_kind, + ); + if first_frame_no != last_injected_frame_no + 1 { + utils::caution!( + "Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process: db_name={}, generation={} (this can lead to inconsistent restore)", + last_injected_frame_no, + first_frame_no, + &self.db_name, + &generation + ); + break; + } else if first_frame_no != last_seen_frame_no + 1 { + utils::caution!( + "detected series of non-consecutive frames: last_seen_frame_no={}, first_frame_no={}: db_name={}, generation={} (this can lead to inconsistent restore)", + last_seen_frame_no, first_frame_no, &self.db_name, &generation ); - cleanup().await; - return Err(anyhow!("WAL frame series is inconsistent")); } + last_seen_frame_no = last_frame_no; + if let Some(frame) = last_consistent_frame { if last_frame_no > frame { - tracing::error!( - "Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process: db_name={}, generation={}", - last_frame_no, - frame, - &self.db_name, - &generation - ); - cleanup().await; - return Err(anyhow!("WAL frame is larget than last consistent frame")); + tracing::warn!("Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process", + last_frame_no, frame); + break; } } if let Some(threshold) = utc_time.as_ref() { @@ -1548,7 +1566,7 @@ impl Replicator { } } _ => { - tracing::trace!("Couldn't parse requested frame batch {} timestamp. Stopping recovery.", key); + utils::caution!("Couldn't parse requested frame batch {} timestamp. Stopping recovery.", key); break 'restore_wal; } } @@ -1562,7 +1580,7 @@ impl Replicator { ); while let Some(frame) = reader.next_frame_header().await? { - last_received_frame_no = reader.next_frame_no(); + last_injected_frame_no = reader.next_frame_no(); reader.next_page(&mut page_buf).await?; if self.verify_crc { checksum = frame.verify(checksum, &page_buf)?; @@ -1571,13 +1589,18 @@ impl Replicator { let checksum = (crc1 as u64) << 32 | crc2 as u64; let frame_to_inject = libsql_replication::frame::Frame::from_parts( &libsql_replication::frame::FrameHeader { - frame_no: (last_received_frame_no as u64).into(), + frame_no: (last_injected_frame_no as u64).into(), checksum: checksum.into(), page_no: frame.pgno().into(), size_after: frame.size_after().into(), }, page_buf.as_slice(), ); + tracing::debug!( + "ready to inject frame: pgno={}, sizeafter={}", + frame.pgno(), + frame.size_after() + ); let frame = RpcFrame { data: frame_to_inject.bytes(), timestamp: None, @@ -1713,7 +1736,7 @@ impl Replicator { .send() .await { - tracing::error!("Failed to send {} to S3: {}", key, e); + utils::caution!("Failed to send {} to S3: {} (this will lead to gaps in the frame ranges)", key, e); } else { tokio::fs::remove_file(&fpath).await.unwrap(); tracing::trace!("Uploaded to S3: {}", key); From 42689dfd9525574eccf8e2775822b9465a104563 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 22:29:42 +0400 Subject: [PATCH 08/15] change test --- libsql-server/src/test/bottomless.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index edc926258f..c9808c5f06 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -270,7 +270,7 @@ async fn list_bucket(bucket: &str) -> Vec { } #[tokio::test] -async fn do_not_restore_malformed_db() { +async fn restore_from_partial_db() { let _ = tracing_subscriber::fmt::try_init(); start_s3_server().await; @@ -357,11 +357,14 @@ async fn do_not_restore_malformed_db() { let db_job = start_db(2, make_server().await); sleep(Duration::from_secs(2)).await; - assert!(sql(&conn, ["SELECT COUNT(*) FROM t"]).await.is_err()); - assert!(db_job - .await - .inspect_err(|e| tracing::error!("db process failed: {}", e)) - .is_err()); + let result = sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.unwrap(); + let count = result.first().unwrap().clone().into_result_set().unwrap().rows[0].cells["cnt"].clone(); + if let Value::Integer(x) = count { + assert!(0 < x && x < 128); + } else { + assert!(false); + } + db_job.await.unwrap(); drop(cleaner); } } From fbfa80859d952fff1061504b93c5fb03c91a9ccb Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 22:51:42 +0400 Subject: [PATCH 09/15] cargo fmt --- libsql-server/src/test/bottomless.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index c9808c5f06..6aa1dfcb72 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -358,7 +358,15 @@ async fn restore_from_partial_db() { sleep(Duration::from_secs(2)).await; let result = sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.unwrap(); - let count = result.first().unwrap().clone().into_result_set().unwrap().rows[0].cells["cnt"].clone(); + let count = result + .first() + .unwrap() + .clone() + .into_result_set() + .unwrap() + .rows[0] + .cells["cnt"] + .clone(); if let Value::Integer(x) = count { assert!(0 < x && x < 128); } else { From b63857d7630600705b9ba8ed27c611d792605d51 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 22:56:10 +0400 Subject: [PATCH 10/15] minor cosmetic changes --- bottomless/src/replicator.rs | 44 ++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 06c587d22d..11f86cbd00 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -378,7 +378,11 @@ impl Replicator { .send() .await { - utils::caution!("Failed to send {} to S3: {} (this will lead to gaps in frame ranges)", fpath, e); + utils::caution!( + "Failed to send {} to S3: {} (this will lead to gaps in frame ranges)", + fpath, + e, + ); } else { tokio::fs::remove_file(&fpath).await.unwrap(); let elapsed = Instant::now() - start; @@ -704,7 +708,7 @@ impl Replicator { "Failed to store dependency between generations {} -> {}: {} (this will lead to broken dependency chain)", prev, curr, - e + e, ); } else { tracing::trace!( @@ -1467,15 +1471,6 @@ impl Replicator { unsafe { v.set_len(page_size) }; v }; - let db_path_str = db_path - .to_str() - .ok_or(anyhow!("failed to convert db path to string"))?; - let cleanup = || async move { - let _ = self - .remove_wal_files(&db_path_str) - .await - .inspect_err(|e| tracing::error!("unable to remove wal files: {}", e)); - }; let mut next_marker = None; let mut applied_wal_frame = false; @@ -1540,6 +1535,12 @@ impl Replicator { ); break; } else if first_frame_no != last_seen_frame_no + 1 { + // there can be the case that bottomless has several overlapping frame ranges + // but one of them is empty - so we will ignore it (as it has no frames) + // for example: + // 9 bytes (empty zstd frame) .../000000000001-000000000001-1724236016.zstd + // 878 bytes .../000000000001-000000000001-1724236022.zstd + // but it's good to detect such cases utils::caution!( "detected series of non-consecutive frames: last_seen_frame_no={}, first_frame_no={}: db_name={}, generation={} (this can lead to inconsistent restore)", last_seen_frame_no, @@ -1566,7 +1567,12 @@ impl Replicator { } } _ => { - utils::caution!("Couldn't parse requested frame batch {} timestamp. Stopping recovery.", key); + utils::caution!( + "Couldn't parse requested frame batch {} timestamp. Stopping recovery: db_name={}, generation={}", + &self.db_name, + &generation, + key + ); break 'restore_wal; } } @@ -1627,6 +1633,9 @@ impl Replicator { // WAL checkpoint of the DB drop(injector); + let db_path_str = db_path + .to_str() + .ok_or(anyhow!("failed to convert db path to string"))?; let db_wal_file_path = format!("{}-wal", &db_path_str); let db_wal_index_path = format!("{}-shm", &db_path_str); let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?; @@ -1638,7 +1647,10 @@ impl Replicator { &self.db_name, &generation ); - cleanup().await; + let _ = self + .remove_wal_files(&db_path_str) + .await + .inspect_err(|e| tracing::error!("unable to remove wal files: {}", e)); return Err(anyhow!("WAL wasn't transferred completely")); } Ok(applied_wal_frame) @@ -1736,7 +1748,11 @@ impl Replicator { .send() .await { - utils::caution!("Failed to send {} to S3: {} (this will lead to gaps in the frame ranges)", key, e); + utils::caution!( + "Failed to send {} to S3: {} (this will lead to gaps in the frame ranges)", + key, + e, + ); } else { tokio::fs::remove_file(&fpath).await.unwrap(); tracing::trace!("Uploaded to S3: {}", key); From 58bd1963725799851f49aec8bc755d4bc09dc199 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 21 Aug 2024 22:59:25 +0400 Subject: [PATCH 11/15] add utils.rs --- bottomless/src/utils.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 bottomless/src/utils.rs diff --git a/bottomless/src/utils.rs b/bottomless/src/utils.rs new file mode 100644 index 0000000000..963ada90b0 --- /dev/null +++ b/bottomless/src/utils.rs @@ -0,0 +1,10 @@ +macro_rules! caution { + ($msg:expr) => { + tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg)); + }; + ($msg:expr, $($arg:tt)*) => { + tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg), $($arg)*); + }; +} + +pub (crate) use caution; From ca675a7c96f39f106636ac31af690470d8f7d57a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 22 Aug 2024 00:17:02 +0400 Subject: [PATCH 12/15] implement optional integrity check and add one more test --- bottomless/src/lib.rs | 2 +- bottomless/src/replicator.rs | 22 +++ bottomless/src/utils.rs | 2 +- .../src/injector/sqlite_injector/mod.rs | 22 +++ libsql-server/src/namespace/meta_store.rs | 1 + libsql-server/src/test/bottomless.rs | 154 +++++++++++++++++- 6 files changed, 197 insertions(+), 6 deletions(-) diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index bda942630f..ecb5576b43 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -2,13 +2,13 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] #![allow(improper_ctypes)] -mod utils; mod backup; pub mod bottomless_wal; mod completion_progress; pub mod read; pub mod replicator; pub mod transaction_cache; +mod utils; pub mod uuid_utils; mod wal; diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 11f86cbd00..a219296658 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -62,6 +62,7 @@ pub struct Replicator { pub page_size: usize, generation: Arc>, verify_crc: bool, + validate_integrity: bool, pub bucket: String, pub db_path: String, pub db_name: String, @@ -94,6 +95,8 @@ pub struct Options { /// If `true` when restoring, frames checksums will be verified prior their pages being flushed /// into the main database file. pub verify_crc: bool, + /// If `true` when restoring, db integrity will be verified before finishing the process + pub validate_integrity: bool, /// Kind of compression algorithm used on the WAL frames to be sent to S3. pub use_compression: CompressionKind, pub encryption_config: Option, @@ -217,6 +220,17 @@ impl Options { other ), }; + let validate_integrity = match env_var_or("LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY", true) + .to_lowercase() + .as_ref() + { + "yes" | "true" | "1" | "y" | "t" => true, + "no" | "false" | "0" | "n" | "f" => false, + other => bail!( + "Invalid LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY environment variable: {}", + other + ), + }; let skip_snapshot = match env_var_or("LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT", false) .to_lowercase() .as_ref() @@ -241,6 +255,7 @@ impl Options { db_id, create_bucket_if_not_exists: true, verify_crc, + validate_integrity, use_compression, encryption_config, max_batch_interval, @@ -410,6 +425,7 @@ impl Replicator { flush_trigger: Some(flush_trigger), last_committed_frame_no, verify_crc: options.verify_crc, + validate_integrity: options.validate_integrity, db_path, db_name, snapshot_waiter, @@ -1629,6 +1645,12 @@ impl Replicator { break; } } + if self.validate_integrity { + if let Err(e) = injector.validate_integrity() { + utils::caution!("found integrity issues: {}, db_name={}, generation={}", e, &self.db_name, &generation); + return Err(anyhow!("DB is not correct")); + } + } // drop of injector will cause drop&close of last DB connection which will perform final // WAL checkpoint of the DB drop(injector); diff --git a/bottomless/src/utils.rs b/bottomless/src/utils.rs index 963ada90b0..4622015f41 100644 --- a/bottomless/src/utils.rs +++ b/bottomless/src/utils.rs @@ -7,4 +7,4 @@ macro_rules! caution { }; } -pub (crate) use caution; +pub(crate) use caution; diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index f6ce2aa89f..bf63be19d0 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::{collections::VecDeque, path::PathBuf}; use parking_lot::Mutex; +use rusqlite::ffi::SQLITE_ERROR; use rusqlite::OpenFlags; use tokio::task::spawn_blocking; @@ -65,6 +66,10 @@ impl SqliteInjector { inner: Arc::new(Mutex::new(inner)), }) } + + pub fn validate_integrity(&mut self) -> Result<()> { + self.inner.lock().validate_integrity() + } } pub(in super::super) struct SqliteInjectorInner { @@ -124,6 +129,23 @@ impl SqliteInjectorInner { }) } + pub fn validate_integrity(&mut self) -> Result<()> { + self.connection + .lock() + .pragma_query(None, "integrity_check", |row| { + let row: String = row.get(0)?; + if row != "ok" { + Err(rusqlite::Error::SqliteFailure( + rusqlite::ffi::Error::new(SQLITE_ERROR), + Some(format!("found integrity issues: {row}")), + )) + } else { + Ok(()) + } + })?; + Ok(()) + } + /// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)). pub fn inject_frame(&mut self, frame: Frame) -> Result, Error> { let frame_close_txn = frame.header().size_after.get() != 0; diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 1fb75db63c..a78deef252 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -114,6 +114,7 @@ pub async fn metastore_connection_maker( let options = bottomless::replicator::Options { create_bucket_if_not_exists: true, verify_crc: true, + validate_integrity: false, use_compression: CompressionKind::None, encryption_config: None, aws_endpoint: Some(config.bucket_endpoint), diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index 6aa1dfcb72..99ce76f502 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -275,9 +275,9 @@ async fn restore_from_partial_db() { start_s3_server().await; - const DB_ID: &str = "malformedbackup"; - const BUCKET: &str = "malformedbackup"; - const PATH: &str = "malformedbackup.sqld"; + const DB_ID: &str = "partialbackup"; + const BUCKET: &str = "partialbackup"; + const PATH: &str = "partialbackup.sqld"; const PORT: u16 = 15003; let _ = S3BucketCleaner::new(BUCKET).await; @@ -301,7 +301,7 @@ async fn restore_from_partial_db() { let make_server = || async { configure_server(&options, listener_addr, PATH).await }; { tracing::info!( - "---STEP 1: create db, write rows, remove random S3 files to corrupt backup---" + "---STEP 1: create db, write rows, remove random S3 files to create effect of partial backup---" ); let cleaner = DbFileCleaner::new(PATH); let db_job = start_db(1, make_server().await); @@ -377,6 +377,152 @@ async fn restore_from_partial_db() { } } +#[tokio::test] +async fn do_not_restore_from_corrupted_db() { + let _ = tracing_subscriber::fmt::try_init(); + + start_s3_server().await; + + const DB_ID: &str = "corruptedbackup"; + const BUCKET: &str = "corruptedbackup"; + const PATH: &str = "corruptedbackup.sqld"; + const PORT: u16 = 15004; + + let _ = S3BucketCleaner::new(BUCKET).await; + assert_bucket_occupancy(BUCKET, true).await; + + let listener_addr = format!("0.0.0.0:{}", PORT) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let conn = Url::parse(&format!("http://localhost:{}", PORT)).unwrap(); + let options = bottomless::replicator::Options { + db_id: Some(DB_ID.to_string()), + create_bucket_if_not_exists: true, + verify_crc: false, + validate_integrity: true, + use_compression: bottomless::replicator::CompressionKind::None, + bucket_name: BUCKET.to_string(), + max_batch_interval: Duration::from_millis(10), + ..bottomless::replicator::Options::from_env().unwrap() + }; + let make_server = || async { configure_server(&options, listener_addr, PATH).await }; + { + tracing::info!("---STEP 1: create db, write rows, corrupt random S3 files ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(1, make_server().await); + + sleep(Duration::from_secs(2)).await; + + let _ = sql( + &conn, + ["CREATE TABLE IF NOT EXISTS t(id INT PRIMARY KEY, name TEXT, payload BLOB);"], + ) + .await + .unwrap(); + for i in 0..128 { + sql( + &conn, + [format!("INSERT INTO t VALUES({i}, '{i}', zeroblob(128))")], + ) + .await + .expect("SQL query failed"); + } + + tracing::info!("Ready to remove files from S3"); + let client = s3_client().await.expect("failed to create s3 client"); + let mut i = 0; + for key in list_bucket(BUCKET).await { + // delete full snapshot files + let should_delete = + key.ends_with(".changecounter") || key.ends_with(".dep") || key.ends_with("db.raw"); + if should_delete { + client + .delete_object() + .bucket(BUCKET) + .key(&key) + .send() + .await + .expect("failed to delete object"); + } else if key.ends_with(".raw") { + tracing::info!("corrupt frame range: {key}"); + // corrupt random wal frame range + let response = client + .get_object() + .bucket(BUCKET) + .key(&key) + .send() + .await + .expect("failed to read object"); + let mut bytes: Vec = response + .body + .collect() + .await + .expect("failed to read body") + .into_bytes() + .into(); + let single_frame_size = 24 + 4096; + assert!(bytes.len() % single_frame_size == 0); + for frame in 0..bytes.len() / single_frame_size { + let page_number = u32::from_be_bytes( + bytes[frame * single_frame_size..frame * single_frame_size + 4] + .try_into() + .unwrap(), + ); + if page_number <= 1 { + continue; + } + tracing::info!("corrupting page {page_number}"); + for b in frame * single_frame_size + 24..(frame + 1) * single_frame_size { + bytes[b] = 255; + } + } + client + .put_object() + .bucket(BUCKET) + .key(&key) + .body(bytes.into()) + .send() + .await + .expect("failed to put body"); + } + i += 1; + } + + db_job.await.unwrap(); + drop(cleaner); + } + + { + sleep(Duration::from_secs(2)).await; + + tracing::info!("---STEP 2: recreate database, check that it is unable to start ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(2, make_server().await); + sleep(Duration::from_secs(2)).await; + + let result = sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.unwrap(); + let count = result + .first() + .unwrap() + .clone() + .into_result_set() + .unwrap() + .rows[0] + .cells["cnt"] + .clone(); + if let Value::Integer(x) = count { + dbg!(x); + assert!(0 < x && x < 128); + } else { + assert!(false); + } + db_job.await.unwrap(); + drop(cleaner); + } +} + #[tokio::test] async fn rollback_restore() { let _ = tracing_subscriber::fmt::try_init(); From f67abef43f6cdf16542b0872222adeb926114f2f Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 22 Aug 2024 00:20:31 +0400 Subject: [PATCH 13/15] fix test --- libsql-server/src/test/bottomless.rs | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index 99ce76f502..8dc8fbd276 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -432,7 +432,6 @@ async fn do_not_restore_from_corrupted_db() { tracing::info!("Ready to remove files from S3"); let client = s3_client().await.expect("failed to create s3 client"); - let mut i = 0; for key in list_bucket(BUCKET).await { // delete full snapshot files let should_delete = @@ -502,23 +501,8 @@ async fn do_not_restore_from_corrupted_db() { let db_job = start_db(2, make_server().await); sleep(Duration::from_secs(2)).await; - let result = sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.unwrap(); - let count = result - .first() - .unwrap() - .clone() - .into_result_set() - .unwrap() - .rows[0] - .cells["cnt"] - .clone(); - if let Value::Integer(x) = count { - dbg!(x); - assert!(0 < x && x < 128); - } else { - assert!(false); - } - db_job.await.unwrap(); + assert!(sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.is_err()); + assert!(db_job.await.is_err()); drop(cleaner); } } From 2feba76f81fc0d1f6f002d21f5624b5850974919 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 22 Aug 2024 00:21:53 +0400 Subject: [PATCH 14/15] cargo fmt --- bottomless/src/replicator.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index a219296658..61323c9fd4 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1647,7 +1647,12 @@ impl Replicator { } if self.validate_integrity { if let Err(e) = injector.validate_integrity() { - utils::caution!("found integrity issues: {}, db_name={}, generation={}", e, &self.db_name, &generation); + utils::caution!( + "found integrity issues: {}, db_name={}, generation={}", + e, + &self.db_name, + &generation + ); return Err(anyhow!("DB is not correct")); } } From 56eb17185a72b1dbd59db39aa31d9cbc9f97b8d2 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 22 Aug 2024 00:46:38 +0400 Subject: [PATCH 15/15] fix test --- libsql-server/src/test/bottomless.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index 8dc8fbd276..d9eb9faa12 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -486,7 +486,6 @@ async fn do_not_restore_from_corrupted_db() { .await .expect("failed to put body"); } - i += 1; } db_job.await.unwrap();