Skip to content

Commit

Permalink
Merge pull request #603 from tursodatabase/replication-savepoint
Browse files Browse the repository at this point in the history
add test for savepoint with replication
  • Loading branch information
MarinPostma authored Nov 9, 2023
2 parents 9d7bf27 + b2d2250 commit 3790007
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
89 changes: 89 additions & 0 deletions libsql-server/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,12 @@ pub fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {

#[cfg(test)]
mod test {
use std::collections::HashSet;

use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;

use super::*;
use crate::connection::libsql::open_conn;
use crate::DEFAULT_AUTO_CHECKPOINT;

#[tokio::test]
Expand Down Expand Up @@ -1155,4 +1160,88 @@ mod test {
log_file.commit().unwrap();
assert_eq!(log_file.frames_iter().unwrap().count(), 6);
}

#[tokio::test]
async fn savepoint_and_rollback() {
let tmp = tempfile::tempdir().unwrap();
let logger = Arc::new(
ReplicationLogger::open(
tmp.path(),
100000000,
None,
false,
100000,
Box::new(|_| Ok(())),
)
.unwrap(),
);
let mut conn = open_conn(
tmp.path(),
&REPLICATION_METHODS,
ReplicationLoggerHookCtx::new(logger.clone(), None),
None,
10000,
)
.unwrap();
conn.execute("BEGIN", ()).unwrap();

conn.execute("CREATE TABLE test (x)", ()).unwrap();
let mut savepoint = conn.savepoint().unwrap();
// try to write a few pages
for i in 0..10000 {
savepoint
.execute(&format!("INSERT INTO test values (\"foobar{i}\")"), ())
.unwrap();
// force a flush
savepoint.cache_flush().unwrap();
}

// rollback savepoint and write a singular value
savepoint.rollback().unwrap();
drop(savepoint);

conn.execute("INSERT INTO test VALUES (42)", ()).unwrap();
conn.execute("COMMIT", ()).unwrap();

// now we restore from the log and make sure the two db are consistent.
let tmp2 = tempfile::tempdir().unwrap();
let f = File::open(tmp.path().join("wallog")).unwrap();
let logfile = LogFile::new(f, 1000000000, None).unwrap();
let mut seen = HashSet::new();
let mut new_db_file = File::create(tmp2.path().join("data")).unwrap();
for frame in logfile.rev_frames_iter_mut().unwrap() {
let frame = frame.unwrap();
let page_no = frame.header().page_no;
if !seen.contains(&page_no) {
seen.insert(page_no);
new_db_file
.write_all_at(frame.page(), (page_no as u64 - 1) * LIBSQL_PAGE_SIZE)
.unwrap();
}
}

new_db_file.flush().unwrap();

let conn2 = open_conn(tmp2.path(), &TRANSPARENT_METHODS, (), None, 10000).unwrap();

conn2
.query_row("SELECT count(*) FROM test", (), |row| {
assert_eq!(row.get_ref(0).unwrap().as_i64().unwrap(), 1);
Ok(())
})
.unwrap();

conn2
.pragma_query(None, "page_count", |r| {
assert_eq!(r.get_ref(0).unwrap().as_i64().unwrap(), 2);
Ok(())
})
.unwrap();

conn.query_row("SELECT count(*) FROM test", (), |row| {
assert_eq!(row.get_ref(0).unwrap().as_i64().unwrap(), 1);
Ok(())
})
.unwrap();
}
}
12 changes: 11 additions & 1 deletion libsql-sys-tmp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
pub mod ffi;
pub mod wal_hook;

use std::{ffi::CString, ops::Deref, time::Duration};
use std::{
ffi::CString,
ops::{Deref, DerefMut},
time::Duration,
};

pub use crate::wal_hook::WalMethodsHook;
pub use once_cell::sync::Lazy;
Expand Down Expand Up @@ -41,6 +45,12 @@ impl<W: WalHook> Deref for Connection<W> {
}
}

impl<W: WalHook> DerefMut for Connection<W> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.conn
}
}

impl Connection<TransparentMethods> {
/// returns a dummy, in-memory connection. For testing purposes only
pub fn test() -> Self {
Expand Down

0 comments on commit 3790007

Please sign in to comment.