From b2d2250aa10d85d510c6303edaf975481251d50b Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 9 Nov 2023 17:20:52 +0100 Subject: [PATCH] add test for savepoint with replication --- Cargo.lock | 3 + .../src/replication/primary/logger.rs | 89 +++++++++++++++++++ libsql-sys-tmp/src/lib.rs | 12 ++- 3 files changed, 103 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a832baa344..2519d17f4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -817,6 +817,7 @@ dependencies = [ "aws-sdk-s3", "bytes", "chrono", + "futures-core", "rand", "sqld-libsql-bindings", "tokio", @@ -831,10 +832,12 @@ name = "bottomless-cli" version = "0.1.14" dependencies = [ "anyhow", + "async-compression 0.4.4", "aws-config", "aws-sdk-s3", "aws-smithy-types", "bottomless", + "bytes", "chrono", "clap 4.4.7", "rusqlite", diff --git a/libsql-server/src/replication/primary/logger.rs b/libsql-server/src/replication/primary/logger.rs index 4e4c0f7f73..772ef80ba5 100644 --- a/libsql-server/src/replication/primary/logger.rs +++ b/libsql-server/src/replication/primary/logger.rs @@ -1034,7 +1034,12 @@ pub fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { #[cfg(test)] mod test { + use std::collections::HashSet; + + use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; + use super::*; + use crate::connection::libsql::open_conn; use crate::DEFAULT_AUTO_CHECKPOINT; #[tokio::test] @@ -1155,4 +1160,88 @@ mod test { log_file.commit().unwrap(); assert_eq!(log_file.frames_iter().unwrap().count(), 6); } + + #[tokio::test] + async fn savepoint_and_rollback() { + let tmp = tempfile::tempdir().unwrap(); + let logger = Arc::new( + ReplicationLogger::open( + tmp.path(), + 100000000, + None, + false, + 100000, + Box::new(|_| Ok(())), + ) + .unwrap(), + ); + let mut conn = open_conn( + tmp.path(), + &REPLICATION_METHODS, + ReplicationLoggerHookCtx::new(logger.clone(), None), + None, + 10000, + ) + .unwrap(); + conn.execute("BEGIN", ()).unwrap(); + + conn.execute("CREATE TABLE test (x)", ()).unwrap(); + let mut savepoint = conn.savepoint().unwrap(); + // try to write a few pages + for i in 0..10000 { + savepoint + .execute(&format!("INSERT INTO test values (\"foobar{i}\")"), ()) + .unwrap(); + // force a flush + savepoint.cache_flush().unwrap(); + } + + // rollback savepoint and write a singular value + savepoint.rollback().unwrap(); + drop(savepoint); + + conn.execute("INSERT INTO test VALUES (42)", ()).unwrap(); + conn.execute("COMMIT", ()).unwrap(); + + // now we restore from the log and make sure the two db are consistent. + let tmp2 = tempfile::tempdir().unwrap(); + let f = File::open(tmp.path().join("wallog")).unwrap(); + let logfile = LogFile::new(f, 1000000000, None).unwrap(); + let mut seen = HashSet::new(); + let mut new_db_file = File::create(tmp2.path().join("data")).unwrap(); + for frame in logfile.rev_frames_iter_mut().unwrap() { + let frame = frame.unwrap(); + let page_no = frame.header().page_no; + if !seen.contains(&page_no) { + seen.insert(page_no); + new_db_file + .write_all_at(frame.page(), (page_no as u64 - 1) * LIBSQL_PAGE_SIZE) + .unwrap(); + } + } + + new_db_file.flush().unwrap(); + + let conn2 = open_conn(tmp2.path(), &TRANSPARENT_METHODS, (), None, 10000).unwrap(); + + conn2 + .query_row("SELECT count(*) FROM test", (), |row| { + assert_eq!(row.get_ref(0).unwrap().as_i64().unwrap(), 1); + Ok(()) + }) + .unwrap(); + + conn2 + .pragma_query(None, "page_count", |r| { + assert_eq!(r.get_ref(0).unwrap().as_i64().unwrap(), 2); + Ok(()) + }) + .unwrap(); + + conn.query_row("SELECT count(*) FROM test", (), |row| { + assert_eq!(row.get_ref(0).unwrap().as_i64().unwrap(), 1); + Ok(()) + }) + .unwrap(); + } } diff --git a/libsql-sys-tmp/src/lib.rs b/libsql-sys-tmp/src/lib.rs index f33e96f548..d1157207fa 100644 --- a/libsql-sys-tmp/src/lib.rs +++ b/libsql-sys-tmp/src/lib.rs @@ -3,7 +3,11 @@ pub mod ffi; pub mod wal_hook; -use std::{ffi::CString, ops::Deref, time::Duration}; +use std::{ + ffi::CString, + ops::{Deref, DerefMut}, + time::Duration, +}; pub use crate::wal_hook::WalMethodsHook; pub use once_cell::sync::Lazy; @@ -41,6 +45,12 @@ impl Deref for Connection { } } +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.conn + } +} + impl Connection { /// returns a dummy, in-memory connection. For testing purposes only pub fn test() -> Self {