diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 833b1ec809..5a1be0d838 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -2,8 +2,7 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] #![allow(improper_ctypes)] -mod ffi; - +// mod ffi; mod backup; mod read; pub mod replicator; @@ -11,587 +10,575 @@ mod transaction_cache; pub mod uuid_utils; mod wal; -use crate::ffi::{ - bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal, -}; -use std::ffi::{c_char, c_void}; -use tokio::time::Instant; - -// Just heuristics, but should work for ~100% of cases -fn is_regular(vfs: *const sqlite3_vfs) -> bool { - let vfs = unsafe { std::ffi::CStr::from_ptr((*vfs).zName) } - .to_str() - .unwrap_or("[error]"); - tracing::trace!("VFS: {}", vfs); - vfs.starts_with("unix") || vfs.starts_with("win32") -} - -macro_rules! block_on { - ($runtime:expr, $e:expr) => { - $runtime.block_on(async { $e.await }) - }; -} - -fn is_local() -> bool { - std::env::var("LIBSQL_BOTTOMLESS_LOCAL").map_or(false, |local| { - local.eq_ignore_ascii_case("true") - || local.eq_ignore_ascii_case("t") - || local.eq_ignore_ascii_case("yes") - || local.eq_ignore_ascii_case("y") - || local == "1" - }) -} - -pub extern "C" fn xOpen( - vfs: *mut sqlite3_vfs, - db_file: *mut sqlite3_file, - wal_name: *const c_char, - no_shm_mode: i32, - max_size: i64, - methods: *mut libsql_wal_methods, - wal: *mut *mut Wal, -) -> i32 { - tracing::debug!("Opening WAL {}", unsafe { - std::ffi::CStr::from_ptr(wal_name).to_str().unwrap() - }); - - let orig_methods = unsafe { &*(*(methods as *mut bottomless_methods)).underlying_methods }; - let rc = unsafe { - (orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal) - }; - if rc != ffi::SQLITE_OK { - return rc; - } - - if !is_regular(vfs) { - tracing::error!("Bottomless WAL is currently only supported for regular VFS"); - return ffi::SQLITE_CANTOPEN; - } - - if is_local() { - tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; - } - - let runtime = match tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - { - Ok(runtime) => runtime, - Err(e) => { - tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; - } - }; - - let path = unsafe { - match std::ffi::CStr::from_ptr(wal_name).to_str() { - Ok(path) if path.len() >= 4 => &path[..path.len() - 4], - Ok(path) => path, - Err(e) => { - tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - }; - - let replicator = block_on!(runtime, replicator::Replicator::new(path)); - let mut replicator = match replicator { - Ok(repl) => repl, - Err(e) => { - tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; - } - }; - - let rc = block_on!(runtime, try_restore(&mut replicator)); - if rc != ffi::SQLITE_OK { - return rc; - } - - let context = replicator::Context { - replicator, - runtime, - }; - let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void; - unsafe { (*(*wal)).pMethodsData = context_ptr }; - - ffi::SQLITE_OK -} - -fn get_orig_methods(wal: *mut Wal) -> &'static libsql_wal_methods { - let wal = unsafe { &*wal }; - let methods = unsafe { &*(wal.pMethods as *const bottomless_methods) }; - unsafe { &*methods.underlying_methods } -} - -fn get_replicator_context(wal: *mut Wal) -> &'static mut replicator::Context { - unsafe { &mut *((*wal).pMethodsData as *mut replicator::Context) } -} - -pub extern "C" fn xClose( - wal: *mut Wal, - db: *mut sqlite3, - sync_flags: i32, - n_buf: i32, - z_buf: *mut u8, -) -> i32 { - tracing::debug!("Closing wal"); - let orig_methods = get_orig_methods(wal); - let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context }; - let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) }; - if rc != ffi::SQLITE_OK { - return rc; - } - if !is_local() && !methods_data.is_null() { - let _box = unsafe { Box::from_raw(methods_data) }; - } - rc -} - -pub extern "C" fn xLimit(wal: *mut Wal, limit: i64) { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xLimit.unwrap())(wal, limit) } -} - -pub extern "C" fn xBeginReadTransaction(wal: *mut Wal, changed: *mut i32) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xBeginReadTransaction.unwrap())(wal, changed) } -} - -pub extern "C" fn xEndReadTransaction(wal: *mut Wal) { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xEndReadTransaction.unwrap())(wal) } -} - -pub extern "C" fn xFindFrame(wal: *mut Wal, pgno: u32, frame: *mut u32) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xFindFrame.unwrap())(wal, pgno, frame) } -} - -pub extern "C" fn xReadFrame(wal: *mut Wal, frame: u32, n_out: i32, p_out: *mut u8) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xReadFrame.unwrap())(wal, frame, n_out, p_out) } -} - -pub extern "C" fn xDbsize(wal: *mut Wal) -> u32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xDbsize.unwrap())(wal) } -} - -pub extern "C" fn xBeginWriteTransaction(wal: *mut Wal) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xBeginWriteTransaction.unwrap())(wal) } -} - -pub extern "C" fn xEndWriteTransaction(wal: *mut Wal) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xEndWriteTransaction.unwrap())(wal) } -} - -pub extern "C" fn xUndo( - wal: *mut Wal, - func: Option i32>, - ctx: *mut c_void, -) -> i32 { - let orig_methods = get_orig_methods(wal); - let rc = unsafe { (orig_methods.xUndo.unwrap())(wal, func, ctx) }; - if is_local() || rc != ffi::SQLITE_OK { - return rc; - } - - let last_valid_frame = unsafe { (*wal).hdr.mxFrame }; - let ctx = get_replicator_context(wal); - tracing::trace!( - "Undo: rolling back from frame {} to {}", - ctx.replicator.peek_last_valid_frame(), - last_valid_frame - ); - ctx.replicator.rollback_to_frame(last_valid_frame); - - ffi::SQLITE_OK -} - -pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xSavepoint.unwrap())(wal, wal_data) } -} - -pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { - let orig_methods = get_orig_methods(wal); - let rc = unsafe { (orig_methods.xSavepointUndo.unwrap())(wal, wal_data) }; - if is_local() || rc != ffi::SQLITE_OK { - return rc; - } - - let last_valid_frame = unsafe { *wal_data }; - let ctx = get_replicator_context(wal); - tracing::trace!( - "Savepoint: rolling back from frame {} to {}", - ctx.replicator.peek_last_valid_frame(), - last_valid_frame - ); - ctx.replicator.rollback_to_frame(last_valid_frame); - - ffi::SQLITE_OK -} - -pub extern "C" fn xFrames( - wal: *mut Wal, - page_size: i32, - page_headers: *mut PgHdr, - size_after: u32, - is_commit: i32, - sync_flags: i32, -) -> i32 { - if !is_local() { - let ctx = get_replicator_context(wal); - let last_valid_frame = unsafe { (*wal).hdr.mxFrame }; - ctx.replicator.register_last_valid_frame(last_valid_frame); - // In theory it's enough to set the page size only once, but in practice - // it's a very cheap operation anyway, and the page is not always known - // upfront and can change dynamically. - // FIXME: changing the page size in the middle of operation is *not* - // supported by bottomless storage. - if let Err(e) = ctx.replicator.set_page_size(page_size as usize) { - tracing::error!("{}", e); - return ffi::SQLITE_IOERR_WRITE; - } - let frame_count = ffi::PageHdrIter::new(page_headers, page_size as usize).count(); - if size_after != 0 { - // only submit frames from committed transactions - ctx.replicator.submit_frames(frame_count as u32); - } - } - - let orig_methods = get_orig_methods(wal); - let rc = unsafe { - (orig_methods.xFrames.unwrap())( - wal, - page_size, - page_headers, - size_after, - is_commit, - sync_flags, - ) - }; - if is_local() || rc != ffi::SQLITE_OK { - return rc; - } - - ffi::SQLITE_OK -} - -extern "C" fn always_wait(_busy_param: *mut c_void) -> i32 { - std::thread::sleep(std::time::Duration::from_millis(10)); - 1 -} - -#[tracing::instrument(skip(wal, db, busy_handler, busy_arg))] -pub extern "C" fn xCheckpoint( - wal: *mut Wal, - db: *mut sqlite3, - emode: i32, - busy_handler: Option i32>, - busy_arg: *mut c_void, - sync_flags: i32, - n_buf: i32, - z_buf: *mut u8, - frames_in_wal: *mut i32, - backfilled_frames: *mut i32, -) -> i32 { - tracing::trace!("Checkpoint"); - let start = Instant::now(); - - /* In order to avoid partial checkpoints, passive checkpoint - ** mode is not allowed. Only TRUNCATE checkpoints are accepted, - ** because these are guaranteed to block writes, copy all WAL pages - ** back into the main database file and reset the frame number. - ** In order to avoid autocheckpoint on close (that's too often), - ** checkpoint attempts weaker than TRUNCATE are ignored. - */ - if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE { - tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); - return ffi::SQLITE_OK; - } - - let ctx = get_replicator_context(wal); - let last_known_frame = ctx.replicator.last_known_frame(); - ctx.replicator.request_flush(); - if last_known_frame == 0 { - tracing::debug!("No committed changes in this generation, not snapshotting"); - ctx.replicator.skip_snapshot_for_current_generation(); - return ffi::SQLITE_OK; - } - if let Err(e) = block_on!( - ctx.runtime, - ctx.replicator.wait_until_committed(last_known_frame) - ) { - tracing::error!( - "Failed to finalize frame {} replication: {}", - last_known_frame, - e - ); - return ffi::SQLITE_IOERR_WRITE; - } - if let Err(e) = block_on!(ctx.runtime, ctx.replicator.wait_until_snapshotted()) { - tracing::error!("Failed to finalize snapshot replication: {}", e); - return ffi::SQLITE_IOERR_WRITE; - } - - /* If there's no busy handler, let's provide a default one, - ** since we auto-upgrade the passive checkpoint - */ - let busy_handler = Some(busy_handler.unwrap_or_else(|| { - tracing::trace!("Falling back to the default busy handler - always wait"); - always_wait - })); - - let orig_methods = get_orig_methods(wal); - let rc = unsafe { - (orig_methods.xCheckpoint.unwrap())( - wal, - db, - emode, - busy_handler, - busy_arg, - sync_flags, - n_buf, - z_buf, - frames_in_wal, - backfilled_frames, - ) - }; - - if is_local() || rc != ffi::SQLITE_OK { - return rc; - } - - let _prev = ctx.replicator.new_generation(); - tracing::debug!("Snapshotting after checkpoint"); - match block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()) { - Ok(_handle) => { - tracing::trace!("got snapshot handle"); - } - Err(e) => { - tracing::error!( - "Failed to snapshot the main db file during checkpoint: {}", - e - ); - return ffi::SQLITE_IOERR_WRITE; - } - } - tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start); - - ffi::SQLITE_OK -} - -pub extern "C" fn xCallback(wal: *mut Wal) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xCallback.unwrap())(wal) } -} - -pub extern "C" fn xExclusiveMode(wal: *mut Wal, op: i32) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xExclusiveMode.unwrap())(wal, op) } -} - -pub extern "C" fn xHeapMemory(wal: *mut Wal) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xHeapMemory.unwrap())(wal) } -} - -pub extern "C" fn xFile(wal: *mut Wal) -> *mut sqlite3_file { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xFile.unwrap())(wal) } -} - -pub extern "C" fn xDb(wal: *mut Wal, db: *mut sqlite3) { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xDb.unwrap())(wal, db) } -} - -pub extern "C" fn xPathnameLen(orig_len: i32) -> i32 { - orig_len + 4 -} - -pub extern "C" fn xGetPathname(buf: *mut c_char, orig: *const c_char, orig_len: i32) { - unsafe { std::ptr::copy(orig, buf, orig_len as usize) } - unsafe { - std::ptr::copy( - "-wal".as_ptr() as *const _, - buf.offset(orig_len as isize), - 4, - ) - } -} - -async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { - match replicator.restore(None, None).await { - Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => { - replicator.new_generation(); - match replicator.snapshot_main_db_file().await { - Ok(Some(h)) => { - if let Err(e) = h.await { - tracing::error!("Failed to join snapshot main db file task: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - Ok(None) => {} - Err(e) => { - tracing::error!("Failed to snapshot the main db file: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - // Restoration process only leaves the local WAL file if it was - // detected to be newer than its remote counterpart. - if let Err(e) = replicator.maybe_replicate_wal().await { - tracing::error!("Failed to replicate local WAL: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - Ok((replicator::RestoreAction::ReuseGeneration(gen), _)) => { - replicator.set_generation(gen); - } - Err(e) => { - tracing::error!("Failed to restore the database: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - - ffi::SQLITE_OK -} - -pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const c_char) -> i32 { - if is_local() { - tracing::info!("Running in local-mode only, without any replication"); - return ffi::SQLITE_OK; - } - - if path.is_null() { - return ffi::SQLITE_OK; - } - let path = unsafe { - match std::ffi::CStr::from_ptr(path).to_str() { - Ok(path) => path, - Err(e) => { - tracing::error!("Failed to parse the main database path: {}", e); - return ffi::SQLITE_CANTOPEN; - } - } - }; - tracing::debug!("Main database file {} will be open soon", path); - - let runtime = match tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - { - Ok(runtime) => runtime, - Err(e) => { - tracing::error!("Failed to initialize async runtime: {}", e); - return ffi::SQLITE_CANTOPEN; - } - }; - - let options = match replicator::Options::from_env() { - Ok(options) => options, - Err(e) => { - tracing::error!("Failed to parse replicator options: {}", e); - return ffi::SQLITE_CANTOPEN; - } - }; - let replicator = block_on!(runtime, replicator::Replicator::with_options(path, options)); - let mut replicator = match replicator { - Ok(repl) => repl, - Err(e) => { - tracing::error!("Failed to initialize replicator: {}", e); - return ffi::SQLITE_CANTOPEN; - } - }; - block_on!(runtime, try_restore(&mut replicator)) -} - -#[no_mangle] -pub extern "C" fn bottomless_init() { - tracing::debug!("bottomless module initialized"); -} - -#[no_mangle] -pub extern "C" fn bottomless_tracing_init() { - tracing_subscriber::fmt::init(); -} - -#[tracing::instrument] -#[no_mangle] -pub extern "C" fn bottomless_methods( - underlying_methods: *const libsql_wal_methods, -) -> *const libsql_wal_methods { - let vwal_name: *const c_char = "bottomless\0".as_ptr() as *const _; - - Box::into_raw(Box::new(bottomless_methods { - methods: libsql_wal_methods { - iVersion: 1, - xOpen: Some(xOpen), - xClose: Some(xClose), - xLimit: Some(xLimit), - xBeginReadTransaction: Some(xBeginReadTransaction), - xEndReadTransaction: Some(xEndReadTransaction), - xFindFrame: Some(xFindFrame), - xReadFrame: Some(xReadFrame), - xDbsize: Some(xDbsize), - xBeginWriteTransaction: Some(xBeginWriteTransaction), - xEndWriteTransaction: Some(xEndWriteTransaction), - xUndo: Some(xUndo), - xSavepoint: Some(xSavepoint), - xSavepointUndo: Some(xSavepointUndo), - xFrames: Some(xFrames), - xCheckpoint: Some(xCheckpoint), - xCallback: Some(xCallback), - xExclusiveMode: Some(xExclusiveMode), - xHeapMemory: Some(xHeapMemory), - xSnapshotGet: None, - xSnapshotOpen: None, - xSnapshotRecover: None, - xSnapshotCheck: None, - xSnapshotUnlock: None, - xFramesize: None, - xFile: Some(xFile), - xWriteLock: None, - xDb: Some(xDb), - xPathnameLen: Some(xPathnameLen), - xGetWalPathname: Some(xGetPathname), - xPreMainDbOpen: Some(xPreMainDbOpen), - zName: vwal_name, - bUsesShm: 0, - pNext: std::ptr::null_mut(), - }, - underlying_methods, - })) as *const libsql_wal_methods -} - +// // Just heuristics, but should work for ~100% of cases +// fn is_regular(vfs: *const sqlite3_vfs) -> bool { +// let vfs = unsafe { std::ffi::CStr::from_ptr((*vfs).zName) } +// .to_str() +// .unwrap_or("[error]"); +// tracing::trace!("VFS: {}", vfs); +// vfs.starts_with("unix") || vfs.starts_with("win32") +// } +// +// macro_rules! block_on { +// ($runtime:expr, $e:expr) => { +// $runtime.block_on(async { $e.await }) +// }; +// } +// +// fn is_local() -> bool { +// std::env::var("LIBSQL_BOTTOMLESS_LOCAL").map_or(false, |local| { +// local.eq_ignore_ascii_case("true") +// || local.eq_ignore_ascii_case("t") +// || local.eq_ignore_ascii_case("yes") +// || local.eq_ignore_ascii_case("y") +// || local == "1" +// }) +// } + +// pub extern "C" fn xOpen( +// vfs: *mut sqlite3_vfs, +// db_file: *mut sqlite3_file, +// wal_name: *const c_char, +// no_shm_mode: i32, +// max_size: i64, +// methods: *mut libsql_wal_methods, +// wal: *mut *mut Wal, +// ) -> i32 { +// tracing::debug!("Opening WAL {}", unsafe { +// std::ffi::CStr::from_ptr(wal_name).to_str().unwrap() +// }); +// +// let orig_methods = unsafe { &*(*(methods as *mut bottomless_methods)).underlying_methods }; +// let rc = unsafe { +// (orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal) +// }; +// if rc != ffi::SQLITE_OK { +// return rc; +// } +// +// if !is_regular(vfs) { +// tracing::error!("Bottomless WAL is currently only supported for regular VFS"); +// return ffi::SQLITE_CANTOPEN; +// } +// +// if is_local() { +// tracing::info!("Running in local-mode only, without any replication"); +// return ffi::SQLITE_OK; +// } +// +// let runtime = match tokio::runtime::Builder::new_current_thread() +// .enable_all() +// .build() +// { +// Ok(runtime) => runtime, +// Err(e) => { +// tracing::error!("Failed to initialize async runtime: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// }; +// +// let path = unsafe { +// match std::ffi::CStr::from_ptr(wal_name).to_str() { +// Ok(path) if path.len() >= 4 => &path[..path.len() - 4], +// Ok(path) => path, +// Err(e) => { +// tracing::error!("Failed to parse the main database path: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// }; +// +// let replicator = block_on!(runtime, replicator::Replicator::new(path)); +// let mut replicator = match replicator { +// Ok(repl) => repl, +// Err(e) => { +// tracing::error!("Failed to initialize replicator: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// }; +// +// let rc = block_on!(runtime, try_restore(&mut replicator)); +// if rc != ffi::SQLITE_OK { +// return rc; +// } +// +// let context = replicator::Context { +// replicator, +// runtime, +// }; +// let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void; +// unsafe { (*(*wal)).pMethodsData = context_ptr }; +// +// ffi::SQLITE_OK +// } +// +// fn get_orig_methods(wal: *mut Wal) -> &'static libsql_wal_methods { +// let wal = unsafe { &*wal }; +// let methods = unsafe { &*(wal.pMethods as *const bottomless_methods) }; +// unsafe { &*methods.underlying_methods } +// } +// +// fn get_replicator_context(wal: *mut Wal) -> &'static mut replicator::Context { +// unsafe { &mut *((*wal).pMethodsData as *mut replicator::Context) } +// } +// +// pub extern "C" fn xClose( +// wal: *mut Wal, +// db: *mut sqlite3, +// sync_flags: i32, +// n_buf: i32, +// z_buf: *mut u8, +// ) -> i32 { +// tracing::debug!("Closing wal"); +// let orig_methods = get_orig_methods(wal); +// let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context }; +// let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) }; +// if rc != ffi::SQLITE_OK { +// return rc; +// } +// if !is_local() && !methods_data.is_null() { +// let _box = unsafe { Box::from_raw(methods_data) }; +// } +// rc +// } +// +// pub extern "C" fn xLimit(wal: *mut Wal, limit: i64) { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xLimit.unwrap())(wal, limit) } +// } +// +// pub extern "C" fn xBeginReadTransaction(wal: *mut Wal, changed: *mut i32) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xBeginReadTransaction.unwrap())(wal, changed) } +// } +// +// pub extern "C" fn xEndReadTransaction(wal: *mut Wal) { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xEndReadTransaction.unwrap())(wal) } +// } +// +// pub extern "C" fn xFindFrame(wal: *mut Wal, pgno: u32, frame: *mut u32) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xFindFrame.unwrap())(wal, pgno, frame) } +// } +// +// pub extern "C" fn xReadFrame(wal: *mut Wal, frame: u32, n_out: i32, p_out: *mut u8) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xReadFrame.unwrap())(wal, frame, n_out, p_out) } +// } +// +// pub extern "C" fn xDbsize(wal: *mut Wal) -> u32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xDbsize.unwrap())(wal) } +// } +// +// pub extern "C" fn xBeginWriteTransaction(wal: *mut Wal) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xBeginWriteTransaction.unwrap())(wal) } +// } +// +// pub extern "C" fn xEndWriteTransaction(wal: *mut Wal) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xEndWriteTransaction.unwrap())(wal) } +// } +// +// pub extern "C" fn xUndo( +// wal: *mut Wal, +// func: Option i32>, +// ctx: *mut c_void, +// ) -> i32 { +// let orig_methods = get_orig_methods(wal); +// let rc = unsafe { (orig_methods.xUndo.unwrap())(wal, func, ctx) }; +// if is_local() || rc != ffi::SQLITE_OK { +// return rc; +// } +// +// let last_valid_frame = unsafe { (*wal).hdr.mxFrame }; +// let ctx = get_replicator_context(wal); +// tracing::trace!( +// "Undo: rolling back from frame {} to {}", +// ctx.replicator.peek_last_valid_frame(), +// last_valid_frame +// ); +// ctx.replicator.rollback_to_frame(last_valid_frame); +// +// ffi::SQLITE_OK +// } +// +// pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xSavepoint.unwrap())(wal, wal_data) } +// } +// +// pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 { +// let orig_methods = get_orig_methods(wal); +// let rc = unsafe { (orig_methods.xSavepointUndo.unwrap())(wal, wal_data) }; +// if is_local() || rc != ffi::SQLITE_OK { +// return rc; +// } +// +// let last_valid_frame = unsafe { *wal_data }; +// let ctx = get_replicator_context(wal); +// tracing::trace!( +// "Savepoint: rolling back from frame {} to {}", +// ctx.replicator.peek_last_valid_frame(), +// last_valid_frame +// ); +// ctx.replicator.rollback_to_frame(last_valid_frame); +// +// ffi::SQLITE_OK +// } +// +// pub extern "C" fn xFrames( +// wal: *mut Wal, +// page_size: i32, +// page_headers: *mut PgHdr, +// size_after: u32, +// is_commit: i32, +// sync_flags: i32, +// ) -> i32 { +// if !is_local() { +// let ctx = get_replicator_context(wal); +// let last_valid_frame = unsafe { (*wal).hdr.mxFrame }; +// ctx.replicator.register_last_valid_frame(last_valid_frame); +// // In theory it's enough to set the page size only once, but in practice +// // it's a very cheap operation anyway, and the page is not always known +// // upfront and can change dynamically. +// // FIXME: changing the page size in the middle of operation is *not* +// // supported by bottomless storage. +// if let Err(e) = ctx.replicator.set_page_size(page_size as usize) { +// tracing::error!("{}", e); +// return ffi::SQLITE_IOERR_WRITE; +// } +// let frame_count = ffi::PageHdrIter::new(page_headers, page_size as usize).count(); +// if size_after != 0 { +// // only submit frames from committed transactions +// ctx.replicator.submit_frames(frame_count as u32); +// } +// } +// +// let orig_methods = get_orig_methods(wal); +// let rc = unsafe { +// (orig_methods.xFrames.unwrap())( +// wal, +// page_size, +// page_headers, +// size_after, +// is_commit, +// sync_flags, +// ) +// }; +// if is_local() || rc != ffi::SQLITE_OK { +// return rc; +// } +// +// ffi::SQLITE_OK +// } +// +// extern "C" fn always_wait(_busy_param: *mut c_void) -> i32 { +// std::thread::sleep(std::time::Duration::from_millis(10)); +// 1 +// } +// +// #[tracing::instrument(skip(wal, db, busy_handler, busy_arg))] +// pub extern "C" fn xCheckpoint( +// wal: *mut Wal, +// db: *mut sqlite3, +// emode: i32, +// busy_handler: Option i32>, +// busy_arg: *mut c_void, +// sync_flags: i32, +// n_buf: i32, +// z_buf: *mut u8, +// frames_in_wal: *mut i32, +// backfilled_frames: *mut i32, +// ) -> i32 { +// tracing::trace!("Checkpoint"); +// let start = Instant::now(); +// +// /* In order to avoid partial checkpoints, passive checkpoint +// ** mode is not allowed. Only TRUNCATE checkpoints are accepted, +// ** because these are guaranteed to block writes, copy all WAL pages +// ** back into the main database file and reset the frame number. +// ** In order to avoid autocheckpoint on close (that's too often), +// ** checkpoint attempts weaker than TRUNCATE are ignored. +// */ +// if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE { +// tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE"); +// return ffi::SQLITE_OK; +// } +// +// let ctx = get_replicator_context(wal); +// let last_known_frame = ctx.replicator.last_known_frame(); +// ctx.replicator.request_flush(); +// if last_known_frame == 0 { +// tracing::debug!("No committed changes in this generation, not snapshotting"); +// ctx.replicator.skip_snapshot_for_current_generation(); +// return ffi::SQLITE_OK; +// } +// if let Err(e) = block_on!( +// ctx.runtime, +// ctx.replicator.wait_until_committed(last_known_frame) +// ) { +// tracing::error!( +// "Failed to finalize frame {} replication: {}", +// last_known_frame, +// e +// ); +// return ffi::SQLITE_IOERR_WRITE; +// } +// if let Err(e) = block_on!(ctx.runtime, ctx.replicator.wait_until_snapshotted()) { +// tracing::error!("Failed to finalize snapshot replication: {}", e); +// return ffi::SQLITE_IOERR_WRITE; +// } +// +// /* If there's no busy handler, let's provide a default one, +// ** since we auto-upgrade the passive checkpoint +// */ +// let busy_handler = Some(busy_handler.unwrap_or_else(|| { +// tracing::trace!("Falling back to the default busy handler - always wait"); +// always_wait +// })); +// +// let orig_methods = get_orig_methods(wal); +// let rc = unsafe { +// (orig_methods.xCheckpoint.unwrap())( +// wal, +// db, +// emode, +// busy_handler, +// busy_arg, +// sync_flags, +// n_buf, +// z_buf, +// frames_in_wal, +// backfilled_frames, +// ) +// }; +// +// if is_local() || rc != ffi::SQLITE_OK { +// return rc; +// } +// +// let _prev = ctx.replicator.new_generation(); +// tracing::debug!("Snapshotting after checkpoint"); +// match block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()) { +// Ok(_handle) => { +// tracing::trace!("got snapshot handle"); +// } +// Err(e) => { +// tracing::error!( +// "Failed to snapshot the main db file during checkpoint: {}", +// e +// ); +// return ffi::SQLITE_IOERR_WRITE; +// } +// } +// tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start); +// +// ffi::SQLITE_OK +// } +// +// pub extern "C" fn xCallback(wal: *mut Wal) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xCallback.unwrap())(wal) } +// } +// +// pub extern "C" fn xExclusiveMode(wal: *mut Wal, op: i32) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xExclusiveMode.unwrap())(wal, op) } +// } +// +// pub extern "C" fn xHeapMemory(wal: *mut Wal) -> i32 { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xHeapMemory.unwrap())(wal) } +// } +// +// pub extern "C" fn xFile(wal: *mut Wal) -> *mut sqlite3_file { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xFile.unwrap())(wal) } +// } +// +// pub extern "C" fn xDb(wal: *mut Wal, db: *mut sqlite3) { +// let orig_methods = get_orig_methods(wal); +// unsafe { (orig_methods.xDb.unwrap())(wal, db) } +// } +// +// pub extern "C" fn xPathnameLen(orig_len: i32) -> i32 { +// orig_len + 4 +// } +// +// pub extern "C" fn xGetPathname(buf: *mut c_char, orig: *const c_char, orig_len: i32) { +// unsafe { std::ptr::copy(orig, buf, orig_len as usize) } +// unsafe { +// std::ptr::copy( +// "-wal".as_ptr() as *const _, +// buf.offset(orig_len as isize), +// 4, +// ) +// } +// } + +// async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { +// match replicator.restore(None, None).await { +// Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => { +// replicator.new_generation(); +// match replicator.snapshot_main_db_file().await { +// Ok(Some(h)) => { +// if let Err(e) = h.await { +// tracing::error!("Failed to join snapshot main db file task: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// Ok(None) => {} +// Err(e) => { +// tracing::error!("Failed to snapshot the main db file: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// // Restoration process only leaves the local WAL file if it was +// // detected to be newer than its remote counterpart. +// if let Err(e) = replicator.maybe_replicate_wal().await { +// tracing::error!("Failed to replicate local WAL: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// Ok((replicator::RestoreAction::ReuseGeneration(gen), _)) => { +// replicator.set_generation(gen); +// } +// Err(e) => { +// tracing::error!("Failed to restore the database: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// +// ffi::SQLITE_OK +// } + +// pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const c_char) -> i32 { +// if is_local() { +// tracing::info!("Running in local-mode only, without any replication"); +// return ffi::SQLITE_OK; +// } +// +// if path.is_null() { +// return ffi::SQLITE_OK; +// } +// let path = unsafe { +// match std::ffi::CStr::from_ptr(path).to_str() { +// Ok(path) => path, +// Err(e) => { +// tracing::error!("Failed to parse the main database path: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// } +// }; +// tracing::debug!("Main database file {} will be open soon", path); +// +// let runtime = match tokio::runtime::Builder::new_current_thread() +// .enable_all() +// .build() +// { +// Ok(runtime) => runtime, +// Err(e) => { +// tracing::error!("Failed to initialize async runtime: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// }; +// +// let options = match replicator::Options::from_env() { +// Ok(options) => options, +// Err(e) => { +// tracing::error!("Failed to parse replicator options: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// }; +// let replicator = block_on!(runtime, replicator::Replicator::with_options(path, options)); +// let mut replicator = match replicator { +// Ok(repl) => repl, +// Err(e) => { +// tracing::error!("Failed to initialize replicator: {}", e); +// return ffi::SQLITE_CANTOPEN; +// } +// }; +// block_on!(runtime, try_restore(&mut replicator)) +// } +// +// #[no_mangle] +// pub extern "C" fn bottomless_init() { +// tracing::debug!("bottomless module initialized"); +// } +// +// #[no_mangle] +// pub extern "C" fn bottomless_tracing_init() { +// tracing_subscriber::fmt::init(); +// } + +// #[tracing::instrument] +// #[no_mangle] +// pub extern "C" fn bottomless_methods( +// underlying_methods: *const libsql_wal_methods, +// ) -> *const libsql_wal_methods { +// let vwal_name: *const c_char = "bottomless\0".as_ptr() as *const _; +// +// Box::into_raw(Box::new(bottomless_methods { +// methods: libsql_wal_methods { +// iVersion: 1, +// xOpen: Some(xOpen), +// xClose: Some(xClose), +// xLimit: Some(xLimit), +// xBeginReadTransaction: Some(xBeginReadTransaction), +// xEndReadTransaction: Some(xEndReadTransaction), +// xFindFrame: Some(xFindFrame), +// xReadFrame: Some(xReadFrame), +// xDbsize: Some(xDbsize), +// xBeginWriteTransaction: Some(xBeginWriteTransaction), +// xEndWriteTransaction: Some(xEndWriteTransaction), +// xUndo: Some(xUndo), +// xSavepoint: Some(xSavepoint), +// xSavepointUndo: Some(xSavepointUndo), +// xFrames: Some(xFrames), +// xCheckpoint: Some(xCheckpoint), +// xCallback: Some(xCallback), +// xExclusiveMode: Some(xExclusiveMode), +// xHeapMemory: Some(xHeapMemory), +// xSnapshotGet: None, +// xSnapshotOpen: None, +// xSnapshotRecover: None, +// xSnapshotCheck: None, +// xSnapshotUnlock: None, +// xFramesize: None, +// xFile: Some(xFile), +// xWriteLock: None, +// xDb: Some(xDb), +// xPathnameLen: Some(xPathnameLen), +// xGetWalPathname: Some(xGetPathname), +// xPreMainDbOpen: Some(xPreMainDbOpen), +// zName: vwal_name, +// bUsesShm: 0, +// pNext: std::ptr::null_mut(), +// }, +// underlying_methods, +// })) as *const libsql_wal_methods +// } #[cfg(feature = "libsql_linked_statically")] pub mod static_init { - use crate::libsql_wal_methods; - - extern "C" { - fn libsql_wal_methods_find(name: *const std::ffi::c_char) -> *const libsql_wal_methods; - fn libsql_wal_methods_register(methods: *const libsql_wal_methods) -> i32; - } pub fn register_bottomless_methods() { - static INIT: std::sync::Once = std::sync::Once::new(); - INIT.call_once(|| { - crate::bottomless_init(); - let orig_methods = unsafe { libsql_wal_methods_find(std::ptr::null()) }; - if orig_methods.is_null() { - panic!("failed to locate default WAL methods") - } - let methods = crate::bottomless_methods(orig_methods); - let rc = unsafe { libsql_wal_methods_register(methods) }; - if rc != crate::ffi::SQLITE_OK { - let _box = unsafe { Box::from_raw(methods as *mut libsql_wal_methods) }; - tracing::warn!("Failed to instantiate bottomless WAL methods"); - } - }) + todo!() + // static INIT: std::sync::Once = std::sync::Once::new(); + // INIT.call_once(|| { + // crate::bottomless_init(); + // let orig_methods = unsafe { libsql_wal_methods_find(std::ptr::null()) }; + // if orig_methods.is_null() { + // panic!("failed to locate default WAL methods") + // } + // let methods = crate::bottomless_methods(orig_methods); + // let rc = unsafe { libsql_wal_methods_register(methods) }; + // if rc != crate::ffi::SQLITE_OK { + // let _box = unsafe { Box::from_raw(methods as *mut libsql_wal_methods) }; + // tracing::warn!("Failed to instantiate bottomless WAL methods"); + // } + // }) } } diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index 606d199b2b..c1abe1e3c7 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -6,13 +6,12 @@ use std::sync::Arc; use metrics::histogram; use parking_lot::{Mutex, RwLock}; use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus}; -use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook}; +use sqld_libsql_bindings::wal::CreateWal; use tokio::sync::{watch, Notify}; use tokio::time::{Duration, Instant}; use crate::auth::{Authenticated, Authorized, Permission}; use crate::error::Error; -use crate::libsql_bindings::wal_hook::WalHook; use crate::metrics::{READ_QUERY_COUNT, WRITE_QUERY_COUNT}; use crate::query::Query; use crate::query_analysis::{State, StmtKind}; @@ -25,10 +24,9 @@ use super::config::DatabaseConfigStore; use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse, DescribeResult}; use super::{MakeConnection, Program, Step, TXN_TIMEOUT}; -pub struct MakeLibSqlConn { +pub struct MakeLibSqlConn { db_path: PathBuf, - hook: &'static WalMethodsHook, - ctx_builder: Box W::Context + Sync + Send + 'static>, + create_wal: T, stats: Arc, config_store: Arc, extensions: Arc<[PathBuf]>, @@ -36,22 +34,20 @@ pub struct MakeLibSqlConn { max_total_response_size: u64, auto_checkpoint: u32, current_frame_no_receiver: watch::Receiver>, - state: Arc>, + state: Arc>, /// In wal mode, closing the last database takes time, and causes other databases creation to /// return sqlite busy. To mitigate that, we hold on to one connection - _db: Option>, + _db: Option>, } -impl MakeLibSqlConn +impl MakeLibSqlConn where - W: WalHook + 'static + Sync + Send, - W::Context: Send + 'static, + T: CreateWal + Clone, { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub async fn new( db_path: PathBuf, - hook: &'static WalMethodsHook, - ctx_builder: F, + create_wal: T, stats: Arc, config_store: Arc, extensions: Arc<[PathBuf]>, @@ -59,14 +55,9 @@ where max_total_response_size: u64, auto_checkpoint: u32, current_frame_no_receiver: watch::Receiver>, - ) -> Result - where - F: Fn() -> W::Context + Sync + Send + 'static, - { + ) -> Result { let mut this = Self { db_path, - hook, - ctx_builder: Box::new(ctx_builder), stats, config_store, extensions, @@ -76,6 +67,7 @@ where current_frame_no_receiver, _db: None, state: Default::default(), + create_wal, }; let db = this.try_create_db().await?; @@ -85,7 +77,7 @@ where } /// Tries to create a database, retrying if the database is busy. - async fn try_create_db(&self) -> Result> { + async fn try_create_db(&self) -> Result> { // try 100 times to acquire initial db connection. let mut retries = 0; loop { @@ -113,7 +105,7 @@ where } } - async fn make_connection(&self) -> Result> { + async fn make_connection(&self) -> Result> { LibSqlConnection::new( self.db_path.clone(), self.extensions.clone(), @@ -134,12 +126,11 @@ where } #[async_trait::async_trait] -impl MakeConnection for MakeLibSqlConn +impl MakeConnection for MakeLibSqlConn where - W: WalHook + 'static + Sync + Send, - W::Context: Send + 'static, + T: CreateWal + Clone, { - type Connection = LibSqlConnection; + type Connection = LibSqlConnection; async fn create(&self) -> Result { self.make_connection().await @@ -147,11 +138,11 @@ where } #[derive(Clone)] -pub struct LibSqlConnection { - inner: Arc>>, +pub struct LibSqlConnection { + inner: Arc>>, } -impl std::fmt::Debug for LibSqlConnection { +impl std::fmt::Debug for LibSqlConnection { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.inner.try_lock() { Some(conn) => { @@ -162,15 +153,14 @@ impl std::fmt::Debug for LibSqlConnection { } } -pub fn open_conn( +pub fn open_conn( path: &Path, - wal_methods: &'static WalMethodsHook, - hook_ctx: W::Context, + create_wal: T, flags: Option, auto_checkpoint: u32, -) -> Result, rusqlite::Error> +) -> Result, rusqlite::Error> where - W: WalHook, + T: CreateWal, { let flags = flags.unwrap_or( OpenFlags::SQLITE_OPEN_READ_WRITE @@ -178,32 +168,29 @@ where | OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX, ); - sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx, auto_checkpoint) + sqld_libsql_bindings::Connection::open(path, flags, create_wal, auto_checkpoint) } -impl LibSqlConnection +impl LibSqlConnection where - W: WalHook, - W::Context: Send, + T: CreateWal, { pub async fn new( path: impl AsRef + Send + 'static, extensions: Arc<[PathBuf]>, - wal_hook: &'static WalMethodsHook, - hook_ctx: W::Context, + create_wal: T, stats: Arc, config_store: Arc, builder_config: QueryBuilderConfig, current_frame_no_receiver: watch::Receiver>, - state: Arc>, + state: Arc>, ) -> crate::Result { let max_db_size = config_store.get().max_db_pages; let conn = tokio::task::spawn_blocking(move || -> crate::Result<_> { let conn = Connection::new( path.as_ref(), extensions, - wal_hook, - hook_ctx, + create_wal, stats, config_store, builder_config, @@ -223,19 +210,19 @@ where } } -struct Connection { - conn: sqld_libsql_bindings::Connection, +struct Connection { + conn: sqld_libsql_bindings::Connection, stats: Arc, config_store: Arc, builder_config: QueryBuilderConfig, current_frame_no_receiver: watch::Receiver>, // must be dropped after the connection because the connection refers to it - state: Arc>, + state: Arc>, // current txn slot if any - slot: Option>>, + slot: Option>>, } -impl std::fmt::Debug for Connection { +impl std::fmt::Debug for Connection { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Connection") .field("slot", &self.slot) @@ -244,7 +231,7 @@ impl std::fmt::Debug for Connection { } /// A slot for holding the state of a transaction lock permit -struct TxnSlot { +struct TxnSlot { /// Pointer to the connection holding the lock. Used to rollback the transaction when the lock /// is stolen. conn: Arc>>, @@ -254,7 +241,7 @@ struct TxnSlot { is_stolen: AtomicBool, } -impl TxnSlot { +impl TxnSlot { #[inline] fn expires_at(&self) -> Instant { self.created_at + TXN_TIMEOUT @@ -270,7 +257,7 @@ impl TxnSlot { } } -impl std::fmt::Debug for TxnSlot { +impl std::fmt::Debug for TxnSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let stolen = self.is_stolen.load(Ordering::Relaxed); let time_left = self.expires_at().duration_since(Instant::now()); @@ -284,14 +271,14 @@ impl std::fmt::Debug for TxnSlot { /// The transaction state shared among all connections to the same database #[derive(Debug)] -pub struct TxnState { +pub struct TxnState { /// Slot for the connection currently holding the transaction lock slot: RwLock>>>, /// Notifier for when the lock gets dropped notify: Notify, } -impl Default for TxnState { +impl Default for TxnState { fn default() -> Self { Self { slot: Default::default(), @@ -316,8 +303,8 @@ impl Default for TxnState { /// - If the handler waits until the txn timeout and isn't notified of the termination of the txn, it will attempt to steal the lock. /// This is done by calling rollback on the slot's txn, and marking the slot as stolen. /// - When a connection notices that it's slot has been stolen, it returns a timedout error to the next request. -unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) -> c_int { - let state = &*(state as *mut TxnState); +unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) -> c_int { + let state = &*(state as *mut TxnState); let lock = state.slot.read(); // we take a reference to the slot we will attempt to steal. this is to make sure that we // actually steal the correct lock. @@ -362,30 +349,23 @@ unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_in }) } -impl Connection { +impl Connection { fn new( path: &Path, extensions: Arc<[PathBuf]>, - wal_methods: &'static WalMethodsHook, - hook_ctx: W::Context, + create_wal: T, stats: Arc, config_store: Arc, builder_config: QueryBuilderConfig, current_frame_no_receiver: watch::Receiver>, - state: Arc>, + state: Arc>, ) -> Result { - let conn = open_conn( - path, - wal_methods, - hook_ctx, - None, - builder_config.auto_checkpoint, - )?; + let conn = open_conn(path, create_wal, None, builder_config.auto_checkpoint)?; // register the lock-stealing busy handler unsafe { let ptr = Arc::as_ptr(&state) as *mut _; - rusqlite::ffi::sqlite3_busy_handler(conn.handle(), Some(busy_handler::), ptr); + rusqlite::ffi::sqlite3_busy_handler(conn.handle(), Some(busy_handler::), ptr); } let this = Self { @@ -751,10 +731,9 @@ fn check_describe_auth(auth: Authenticated) -> Result<()> { } #[async_trait::async_trait] -impl super::Connection for LibSqlConnection +impl super::Connection for LibSqlConnection where - W: WalHook + 'static, - W::Context: Send, + T: CreateWal, { async fn execute_program( &self, diff --git a/libsql-server/src/connection/write_proxy.rs b/libsql-server/src/connection/write_proxy.rs index d513bc632b..1769e9fb18 100644 --- a/libsql-server/src/connection/write_proxy.rs +++ b/libsql-server/src/connection/write_proxy.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use parking_lot::Mutex as PMutex; use rusqlite::types::ValueRef; -use sqld_libsql_bindings::wal_hook::{TransparentMethods, TRANSPARENT_METHODS}; +use sqld_libsql_bindings::wal::Sqlite3CreateWal; use tokio::sync::{watch, Mutex}; use tonic::metadata::BinaryMetadataValue; use tonic::transport::Channel; @@ -40,7 +40,7 @@ pub struct MakeWriteProxyConn { max_response_size: u64, max_total_response_size: u64, namespace: NamespaceName, - make_read_only_conn: MakeLibSqlConn, + make_read_only_conn: MakeLibSqlConn, } impl MakeWriteProxyConn { @@ -60,8 +60,7 @@ impl MakeWriteProxyConn { let client = ProxyClient::with_origin(channel, uri); let make_read_only_conn = MakeLibSqlConn::new( db_path.clone(), - &TRANSPARENT_METHODS, - || (), + Sqlite3CreateWal::new(), stats.clone(), config_store.clone(), extensions.clone(), @@ -108,7 +107,7 @@ impl MakeConnection for MakeWriteProxyConn { #[derive(Debug)] pub struct WriteProxyConnection { /// Lazily initialized read connection - read_conn: LibSqlConnection, + read_conn: LibSqlConnection, write_proxy: ProxyClient, state: Mutex, client_id: Uuid, @@ -176,7 +175,7 @@ impl WriteProxyConnection { applied_frame_no_receiver: watch::Receiver>, builder_config: QueryBuilderConfig, namespace: NamespaceName, - read_conn: LibSqlConnection, + read_conn: LibSqlConnection, ) -> Result { Ok(Self { read_conn, diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 6649328072..f02271e638 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -15,7 +15,6 @@ use hyper::Uri; use metrics::histogram; use parking_lot::Mutex; use rusqlite::ErrorCode; -use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; use tokio::io::AsyncBufReadExt; use tokio::sync::watch; use tokio::task::JoinSet; @@ -31,7 +30,7 @@ use crate::connection::write_proxy::MakeWriteProxyConn; use crate::connection::MakeConnection; use crate::database::{Database, PrimaryDatabase, ReplicaDatabase}; use crate::error::{Error, LoadDumpError}; -use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS}; +use crate::replication::primary::logger::ReplicationLoggerHookCtx; use crate::replication::replica::Replicator; use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger}; use crate::stats::Stats; @@ -759,8 +758,7 @@ impl Namespace { let connection_maker: Arc<_> = MakeLibSqlConn::new( db_path.clone(), - &REPLICATION_METHODS, - ctx_builder.clone(), + todo!(), stats.clone(), db_config_store.clone(), config.extensions.clone(), diff --git a/libsql-server/src/replication/primary/logger.rs b/libsql-server/src/replication/primary/logger.rs index b191e164b3..5eae1e20db 100644 --- a/libsql-server/src/replication/primary/logger.rs +++ b/libsql-server/src/replication/primary/logger.rs @@ -11,7 +11,7 @@ use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use rusqlite::ffi::SQLITE_BUSY; -use sqld_libsql_bindings::init_static_wal_method; +use sqld_libsql_bindings::wal::Sqlite3Wal; use tokio::sync::watch; use tokio::time::{Duration, Instant}; use uuid::Uuid; @@ -22,14 +22,11 @@ use crate::libsql_bindings::ffi::{ types::{XWalCheckpointFn, XWalFrameFn, XWalSavePointUndoFn, XWalUndoFn}, PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, }; -use crate::libsql_bindings::wal_hook::WalHook; use crate::replication::frame::{Frame, FrameHeader}; use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile}; use crate::replication::{FrameNo, SnapshotCallback, CRC_64_GO_ISO, WAL_MAGIC}; use crate::LIBSQL_PAGE_SIZE; -init_static_wal_method!(REPLICATION_METHODS, ReplicationLoggerHook); - #[derive(PartialEq, Eq)] struct Version([u16; 4]); @@ -52,6 +49,7 @@ pub struct ReplicationLoggerHookCtx { bottomless_replicator: Option>>, } + /// This implementation of WalHook intercepts calls to `on_frame`, and writes them to a /// shadow wal. Writing to the shadow wal is done in three steps: /// i. append the new pages at the offset pointed by header.start_frame_no + header.frame_count @@ -454,7 +452,7 @@ impl LogFile { Ok(()) } - fn rollback(&mut self) { + pub(crate) fn rollback(&mut self) { self.uncommitted_frame_count = 0; self.uncommitted_checksum = self.commited_checksum; } @@ -579,7 +577,7 @@ impl LogFile { compact } - fn maybe_compact( + pub(crate) fn maybe_compact( &mut self, compactor: LogCompactor, size_after: u32, @@ -888,7 +886,7 @@ impl ReplicationLogger { /// Write pages to the log, without updating the file header. /// Returns the new frame count and checksum to commit - fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<()> { + pub(crate) fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<()> { let mut log_file = self.log_file.write(); for page in pages.iter() { log_file.push_page(page)?; @@ -915,7 +913,7 @@ impl ReplicationLogger { } /// commit the current transaction and returns the new top frame number - fn commit(&self) -> anyhow::Result> { + pub(crate) fn commit(&self) -> anyhow::Result> { let mut log_file = self.log_file.write(); log_file.commit()?; Ok(log_file.header().last_frame_no()) @@ -951,6 +949,14 @@ impl ReplicationLogger { log_file.do_compaction(self.compactor.clone(), size_after, &self.db_path)?; Ok(true) } + + pub(crate) fn compactor(&self) -> &LogCompactor { + &self.compactor + } + + pub(crate) fn db_path(&self) -> &Path { + &self.db_path + } } // FIXME: calling rusqlite::Connection's checkpoint here is a bug, diff --git a/libsql-server/src/replication/primary/mod.rs b/libsql-server/src/replication/primary/mod.rs index a7d0eb849c..d2afdcd344 100644 --- a/libsql-server/src/replication/primary/mod.rs +++ b/libsql-server/src/replication/primary/mod.rs @@ -1,2 +1,3 @@ pub mod frame_stream; pub mod logger; +pub mod replication_logger_wal; diff --git a/libsql-server/src/replication/primary/replication_logger_wal.rs b/libsql-server/src/replication/primary/replication_logger_wal.rs new file mode 100644 index 0000000000..78a575fc9d --- /dev/null +++ b/libsql-server/src/replication/primary/replication_logger_wal.rs @@ -0,0 +1,230 @@ +use std::sync::Arc; + +use bytes::Bytes; +use rusqlite::ffi::SQLITE_IOERR; +use sqld_libsql_bindings::{wal::{Sqlite3Wal, Sqlite3CreateWal, CreateWal}, ffi::PageHdrIter}; + +use crate::replication::ReplicationLogger; + +use super::logger::WalPage; + +pub struct CreateReplicationLoggerWal { + sqlite_create_wal: Sqlite3CreateWal, + logger: Arc, +} + +impl CreateWal for CreateReplicationLoggerWal { + type Wal = ReplicationLoggerWal; + + fn use_shared_memory(&self) -> bool { + self.sqlite_create_wal.use_shared_memory() + } + + fn open( + &self, + vfs: &mut sqld_libsql_bindings::wal::Vfs, + file: *mut rusqlite::ffi::sqlite3_file, + no_shm_mode: std::ffi::c_int, + max_log_size: i64, + db_path: &std::ffi::CStr, + ) -> Result { + let inner = self.sqlite_create_wal.open(vfs, file, no_shm_mode, max_log_size, db_path)?; + Ok(Self::Wal { + inner, + buffer: Default::default(), + logger: self.logger.clone(), + }) + } + + fn close( + &self, + wal: &mut Self::Wal, + db: *mut rusqlite::ffi::sqlite3, + sync_flags: std::ffi::c_int, + scratch: &mut [u8], + ) -> Result<()> { + self.sqlite_create_wal.close(&mut wal.inner, db, sync_flags, scratch) + } + + fn destroy_log(&self, vfs: &mut sqld_libsql_bindings::wal::Vfs, db_path: &std::ffi::CStr) -> Result<()> { + self.sqlite_create_wal.destroy_log(vfs, db_path) + } + + fn log_exists(&self, vfs: &mut sqld_libsql_bindings::wal::Vfs, db_path: &std::ffi::CStr) -> Result { + self.sqlite_create_wal.log_exists(vfs, db_path) + } + + fn destroy(self) + where + Self: Sized { + self.sqlite_create_wal.destroy() + } +} + +struct ReplicationLoggerWal { + inner: Sqlite3Wal, + buffer: Vec, + logger: Arc, +} + +impl ReplicationLoggerWal { + fn write_frame(&mut self, page_no: u32, data: &[u8]) { + let entry = WalPage { + page_no, + size_after: 0, + data: Bytes::copy_from_slice(data), + }; + self.buffer.push(entry); + } + + /// write buffered pages to the logger, without committing. + fn flush(&mut self, size_after: u32) -> anyhow::Result<()> { + if !self.buffer.is_empty() { + self.buffer.last_mut().unwrap().size_after = size_after; + self.logger.write_pages(&self.buffer)?; + self.buffer.clear(); + } + + Ok(()) + } + + fn commit(&self) -> anyhow::Result<()> { + let new_frame_no = self.logger.commit()?; + tracing::trace!("new frame committed {new_frame_no:?}"); + self.logger.new_frame_notifier.send_replace(new_frame_no); + Ok(()) + } + + fn rollback(&mut self) { + self.logger.log_file.write().rollback(); + self.buffer.clear(); + } + + pub fn logger(&self) -> &ReplicationLogger { + self.logger.as_ref() + } +} + +impl sqld_libsql_bindings::wal::Wal for ReplicationLoggerWal { + fn limit(&mut self, size: i64) { + self.inner.limit(size) + } + + fn begin_read_txn(&mut self) -> Result { + self.inner.begin_read_txn() + } + + fn end_read_txn(&mut self) { + self.inner.end_read_txn() + } + + fn find_frame(&mut self, page_no: u32) -> Result { + self.inner.find_frame(page_no) + } + + fn read_frame(&mut self, frame_no: u32, buffer: &mut [u8]) -> Result<()> { + self.inner.read_frame(frame_no, buffer) + } + + fn db_size(&self) -> u32 { + self.inner.db_size() + } + + fn begin_write_txn(&mut self) -> Result<()> { + self.inner.begin_write_txn() + } + + fn end_write_txn(&mut self) -> Result<()> { + self.inner.end_write_txn() + } + + fn undo( + &mut self, + cb: Option i32>, + cb_ctx: *mut c_void, + ) -> Result<()> { + self.rollback(); + self.inner.undo(cb, cb_ctx) + } + + fn savepoint(&mut self, rollback_data: &mut [u32]) { + self.inner.savepoint(rollback_data) + } + + fn savepoin_undo(&mut self, rollback_data: &mut [u32]) -> Result<()> { + self.inner.savepoin_undo(rollback_data) + } + + fn insert_frames( + &mut self, + page_size: c_int, + page_headers: *mut PgHdr, + size_after: u32, + is_commit: bool, + sync_flags: c_int, + ) -> Result<()> { + assert_eq!(page_size, 4096); + let mut frame_count = 0; + for (page_no, data) in PageHdrIter::new(page_headers, page_size as _) { + self.write_frame(page_no, data); + frame_count += 1; + } + if let Err(e) = self.flush(size_after) { + tracing::error!("error writing to replication log: {e}"); + // returning IO_ERR ensure that xUndo will be called by sqlite. + return Err(rusqlite::ffi::Error::new(SQLITE_IOERR)); + } + + self.inner.insert_frames(page_size, page_headers, size_after, is_commit, sync_flags)?; + + if is_commit { + if let Err(e) = self.commit() { + // If we reach this point, it means that we have committed a transaction to sqlite wal, + // but failed to commit it to the shadow WAL, which leaves us in an inconsistent state. + tracing::error!( + "fatal error: log failed to commit: inconsistent replication log: {e}" + ); + std::process::abort(); + } + + if let Err(e) = self.logger.log_file.write().maybe_compact( + self.logger.compactor().clone(), + size_after, + self.logger.db_path(), + ) { + tracing::error!("fatal error: {e}, exiting"); + std::process::abort() + } + } + + Ok(()) + } + + fn checkpoint( + &mut self, + db: *mut sqlite3, + mode: sqld_libsql_bindings::wal::CheckpointMode, + busy_handler: Option<&mut B>, + sync_flags: u32, + // temporary scratch buffer + buf: &mut [u8], + ) -> Result<(u32, u32)> { + self.inner.checkpoint(db, mode, busy_handler, sync_flags, buf) + } + + fn exclusive_mode(&mut self, op: c_int) -> Result<()> { + self.inner.exclusive_mode(op) + } + + fn uses_heap_memory(&self) -> bool { + self.inner.uses_heap_memory() + } + + fn set_db(&mut self, db: *mut sqlite3) { + self.inner.set_db(db) + } + + fn callback(&self) -> i32 { + self.inner.callback() + } +} diff --git a/libsql-server/src/replication/replica/hook.rs b/libsql-server/src/replication/replica/hook.rs index f2f661b5f7..ea2bb9e0de 100644 --- a/libsql-server/src/replication/replica/hook.rs +++ b/libsql-server/src/replication/replica/hook.rs @@ -2,9 +2,8 @@ use std::ffi::{c_int, CStr}; use std::marker::PhantomData; use rusqlite::ffi::{PgHdr, SQLITE_ERROR}; +use sqld_libsql_bindings::ffi::types::XWalFrameFn; use sqld_libsql_bindings::ffi::Wal; -use sqld_libsql_bindings::init_static_wal_method; -use sqld_libsql_bindings::{ffi::types::XWalFrameFn, wal_hook::WalHook}; use crate::replication::frame::{Frame, FrameBorrowed}; use crate::replication::FrameNo; @@ -64,8 +63,6 @@ impl Frames { } } -init_static_wal_method!(INJECTOR_METHODS, InjectorHook); - /// The injector hook hijacks a call to xframes, and replace the content of the call with it's own /// frames. /// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call diff --git a/libsql-server/src/replication/replica/injector.rs b/libsql-server/src/replication/replica/injector.rs index 28bdd3330f..e0239003f7 100644 --- a/libsql-server/src/replication/replica/injector.rs +++ b/libsql-server/src/replication/replica/injector.rs @@ -5,7 +5,7 @@ use rusqlite::OpenFlags; use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION}; -use super::hook::{InjectorHook, InjectorHookCtx, INJECTOR_METHODS}; +use super::hook::{InjectorHook, InjectorHookCtx}; pub struct FrameInjector { conn: sqld_libsql_bindings::Connection,