Skip to content

Commit

Permalink
introduce ReplicationLoggerWal
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Nov 9, 2023
1 parent 7ac6378 commit 7c7a3bf
Show file tree
Hide file tree
Showing 9 changed files with 869 additions and 672 deletions.
1,147 changes: 567 additions & 580 deletions bottomless/src/lib.rs

Large diffs are not rendered by default.

117 changes: 48 additions & 69 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::sync::Arc;
use metrics::histogram;
use parking_lot::{Mutex, RwLock};
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus};
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook};
use sqld_libsql_bindings::wal::CreateWal;
use tokio::sync::{watch, Notify};
use tokio::time::{Duration, Instant};

use crate::auth::{Authenticated, Authorized, Permission};
use crate::error::Error;
use crate::libsql_bindings::wal_hook::WalHook;
use crate::metrics::{READ_QUERY_COUNT, WRITE_QUERY_COUNT};
use crate::query::Query;
use crate::query_analysis::{State, StmtKind};
Expand All @@ -25,48 +24,40 @@ use super::config::DatabaseConfigStore;
use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse, DescribeResult};
use super::{MakeConnection, Program, Step, TXN_TIMEOUT};

pub struct MakeLibSqlConn<W: WalHook + 'static> {
pub struct MakeLibSqlConn<T> {
db_path: PathBuf,
hook: &'static WalMethodsHook<W>,
ctx_builder: Box<dyn Fn() -> W::Context + Sync + Send + 'static>,
create_wal: T,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
extensions: Arc<[PathBuf]>,
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<W>>,
state: Arc<TxnState<T>>,
/// In wal mode, closing the last database takes time, and causes other databases creation to
/// return sqlite busy. To mitigate that, we hold on to one connection
_db: Option<LibSqlConnection<W>>,
_db: Option<LibSqlConnection<T>>,
}

impl<W: WalHook + 'static> MakeLibSqlConn<W>
impl<T> MakeLibSqlConn<T>
where
W: WalHook + 'static + Sync + Send,
W::Context: Send + 'static,
T: CreateWal + Clone,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<F>(
pub async fn new(
db_path: PathBuf,
hook: &'static WalMethodsHook<W>,
ctx_builder: F,
create_wal: T,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
extensions: Arc<[PathBuf]>,
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
) -> Result<Self>
where
F: Fn() -> W::Context + Sync + Send + 'static,
{
) -> Result<Self> {
let mut this = Self {
db_path,
hook,
ctx_builder: Box::new(ctx_builder),
stats,
config_store,
extensions,
Expand All @@ -76,6 +67,7 @@ where
current_frame_no_receiver,
_db: None,
state: Default::default(),
create_wal,
};

let db = this.try_create_db().await?;
Expand All @@ -85,7 +77,7 @@ where
}

/// Tries to create a database, retrying if the database is busy.
async fn try_create_db(&self) -> Result<LibSqlConnection<W>> {
async fn try_create_db(&self) -> Result<LibSqlConnection<T>> {
// try 100 times to acquire initial db connection.
let mut retries = 0;
loop {
Expand Down Expand Up @@ -113,7 +105,7 @@ where
}
}

async fn make_connection(&self) -> Result<LibSqlConnection<W>> {
async fn make_connection(&self) -> Result<LibSqlConnection<T>> {
LibSqlConnection::new(
self.db_path.clone(),
self.extensions.clone(),
Expand All @@ -134,24 +126,23 @@ where
}

#[async_trait::async_trait]
impl<W> MakeConnection for MakeLibSqlConn<W>
impl<T> MakeConnection for MakeLibSqlConn<T>
where
W: WalHook + 'static + Sync + Send,
W::Context: Send + 'static,
T: CreateWal + Clone,
{
type Connection = LibSqlConnection<W>;
type Connection = LibSqlConnection<T>;

async fn create(&self) -> Result<Self::Connection, Error> {
self.make_connection().await
}
}

#[derive(Clone)]
pub struct LibSqlConnection<W: WalHook> {
inner: Arc<Mutex<Connection<W>>>,
pub struct LibSqlConnection<T> {
inner: Arc<Mutex<Connection<T>>>,
}

impl<W: WalHook> std::fmt::Debug for LibSqlConnection<W> {
impl<T> std::fmt::Debug for LibSqlConnection<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.inner.try_lock() {
Some(conn) => {
Expand All @@ -162,48 +153,44 @@ impl<W: WalHook> std::fmt::Debug for LibSqlConnection<W> {
}
}

pub fn open_conn<W>(
pub fn open_conn<T>(
path: &Path,
wal_methods: &'static WalMethodsHook<W>,
hook_ctx: W::Context,
create_wal: T,
flags: Option<OpenFlags>,
auto_checkpoint: u32,
) -> Result<sqld_libsql_bindings::Connection<W>, rusqlite::Error>
) -> Result<sqld_libsql_bindings::Connection<T>, rusqlite::Error>
where
W: WalHook,
T: CreateWal,
{
let flags = flags.unwrap_or(
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
);
sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx, auto_checkpoint)
sqld_libsql_bindings::Connection::open(path, flags, create_wal, auto_checkpoint)
}

impl<W> LibSqlConnection<W>
impl<T> LibSqlConnection<T>
where
W: WalHook,
W::Context: Send,
T: CreateWal,
{
pub async fn new(
path: impl AsRef<Path> + Send + 'static,
extensions: Arc<[PathBuf]>,
wal_hook: &'static WalMethodsHook<W>,
hook_ctx: W::Context,
create_wal: T,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<W>>,
state: Arc<TxnState<T>>,
) -> crate::Result<Self> {
let max_db_size = config_store.get().max_db_pages;
let conn = tokio::task::spawn_blocking(move || -> crate::Result<_> {
let conn = Connection::new(
path.as_ref(),
extensions,
wal_hook,
hook_ctx,
create_wal,
stats,
config_store,
builder_config,
Expand All @@ -223,19 +210,19 @@ where
}
}

struct Connection<W: WalHook = TransparentMethods> {
conn: sqld_libsql_bindings::Connection<W>,
struct Connection<T> {
conn: sqld_libsql_bindings::Connection<T>,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
// must be dropped after the connection because the connection refers to it
state: Arc<TxnState<W>>,
state: Arc<TxnState<T>>,
// current txn slot if any
slot: Option<Arc<TxnSlot<W>>>,
slot: Option<Arc<TxnSlot<T>>>,
}

impl<W: WalHook> std::fmt::Debug for Connection<W> {
impl<T> std::fmt::Debug for Connection<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("slot", &self.slot)
Expand All @@ -244,7 +231,7 @@ impl<W: WalHook> std::fmt::Debug for Connection<W> {
}

/// A slot for holding the state of a transaction lock permit
struct TxnSlot<T: WalHook> {
struct TxnSlot<T> {
/// Pointer to the connection holding the lock. Used to rollback the transaction when the lock
/// is stolen.
conn: Arc<Mutex<Connection<T>>>,
Expand All @@ -254,7 +241,7 @@ struct TxnSlot<T: WalHook> {
is_stolen: AtomicBool,
}

impl<T: WalHook> TxnSlot<T> {
impl<T> TxnSlot<T> {
#[inline]
fn expires_at(&self) -> Instant {
self.created_at + TXN_TIMEOUT
Expand All @@ -270,7 +257,7 @@ impl<T: WalHook> TxnSlot<T> {
}
}

impl<T: WalHook> std::fmt::Debug for TxnSlot<T> {
impl<T> std::fmt::Debug for TxnSlot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stolen = self.is_stolen.load(Ordering::Relaxed);
let time_left = self.expires_at().duration_since(Instant::now());
Expand All @@ -284,14 +271,14 @@ impl<T: WalHook> std::fmt::Debug for TxnSlot<T> {

/// The transaction state shared among all connections to the same database
#[derive(Debug)]
pub struct TxnState<T: WalHook> {
pub struct TxnState<T> {
/// Slot for the connection currently holding the transaction lock
slot: RwLock<Option<Arc<TxnSlot<T>>>>,
/// Notifier for when the lock gets dropped
notify: Notify,
}

impl<W: WalHook> Default for TxnState<W> {
impl<T> Default for TxnState<T> {
fn default() -> Self {
Self {
slot: Default::default(),
Expand All @@ -316,8 +303,8 @@ impl<W: WalHook> Default for TxnState<W> {
/// - If the handler waits until the txn timeout and isn't notified of the termination of the txn, it will attempt to steal the lock.
/// This is done by calling rollback on the slot's txn, and marking the slot as stolen.
/// - When a connection notices that it's slot has been stolen, it returns a timedout error to the next request.
unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_int) -> c_int {
let state = &*(state as *mut TxnState<W>);
unsafe extern "C" fn busy_handler<T>(state: *mut c_void, _retries: c_int) -> c_int {
let state = &*(state as *mut TxnState<T>);
let lock = state.slot.read();
// we take a reference to the slot we will attempt to steal. this is to make sure that we
// actually steal the correct lock.
Expand Down Expand Up @@ -362,30 +349,23 @@ unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_in
})
}

impl<W: WalHook> Connection<W> {
impl<T> Connection<T> {
fn new(
path: &Path,
extensions: Arc<[PathBuf]>,
wal_methods: &'static WalMethodsHook<W>,
hook_ctx: W::Context,
create_wal: T,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<W>>,
state: Arc<TxnState<T>>,
) -> Result<Self> {
let conn = open_conn(
path,
wal_methods,
hook_ctx,
None,
builder_config.auto_checkpoint,
)?;
let conn = open_conn(path, create_wal, None, builder_config.auto_checkpoint)?;

// register the lock-stealing busy handler
unsafe {
let ptr = Arc::as_ptr(&state) as *mut _;
rusqlite::ffi::sqlite3_busy_handler(conn.handle(), Some(busy_handler::<W>), ptr);
rusqlite::ffi::sqlite3_busy_handler(conn.handle(), Some(busy_handler::<T>), ptr);
}

let this = Self {
Expand Down Expand Up @@ -751,10 +731,9 @@ fn check_describe_auth(auth: Authenticated) -> Result<()> {
}

#[async_trait::async_trait]
impl<W> super::Connection for LibSqlConnection<W>
impl<T> super::Connection for LibSqlConnection<T>
where
W: WalHook + 'static,
W::Context: Send,
T: CreateWal,
{
async fn execute_program<B: QueryResultBuilder>(
&self,
Expand Down
11 changes: 5 additions & 6 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use parking_lot::Mutex as PMutex;
use rusqlite::types::ValueRef;
use sqld_libsql_bindings::wal_hook::{TransparentMethods, TRANSPARENT_METHODS};
use sqld_libsql_bindings::wal::Sqlite3CreateWal;
use tokio::sync::{watch, Mutex};
use tonic::metadata::BinaryMetadataValue;
use tonic::transport::Channel;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct MakeWriteProxyConn {
max_response_size: u64,
max_total_response_size: u64,
namespace: NamespaceName,
make_read_only_conn: MakeLibSqlConn<TransparentMethods>,
make_read_only_conn: MakeLibSqlConn<Sqlite3CreateWal>,
}

impl MakeWriteProxyConn {
Expand All @@ -60,8 +60,7 @@ impl MakeWriteProxyConn {
let client = ProxyClient::with_origin(channel, uri);
let make_read_only_conn = MakeLibSqlConn::new(
db_path.clone(),
&TRANSPARENT_METHODS,
|| (),
Sqlite3CreateWal::new(),
stats.clone(),
config_store.clone(),
extensions.clone(),
Expand Down Expand Up @@ -108,7 +107,7 @@ impl MakeConnection for MakeWriteProxyConn {
#[derive(Debug)]
pub struct WriteProxyConnection {
/// Lazily initialized read connection
read_conn: LibSqlConnection<TransparentMethods>,
read_conn: LibSqlConnection<Sqlite3CreateWal>,
write_proxy: ProxyClient<Channel>,
state: Mutex<State>,
client_id: Uuid,
Expand Down Expand Up @@ -176,7 +175,7 @@ impl WriteProxyConnection {
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
builder_config: QueryBuilderConfig,
namespace: NamespaceName,
read_conn: LibSqlConnection<TransparentMethods>,
read_conn: LibSqlConnection<Sqlite3CreateWal>,
) -> Result<Self> {
Ok(Self {
read_conn,
Expand Down
6 changes: 2 additions & 4 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use hyper::Uri;
use metrics::histogram;
use parking_lot::Mutex;
use rusqlite::ErrorCode;
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
use tokio::io::AsyncBufReadExt;
use tokio::sync::watch;
use tokio::task::JoinSet;
Expand All @@ -31,7 +30,7 @@ use crate::connection::write_proxy::MakeWriteProxyConn;
use crate::connection::MakeConnection;
use crate::database::{Database, PrimaryDatabase, ReplicaDatabase};
use crate::error::{Error, LoadDumpError};
use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS};
use crate::replication::primary::logger::ReplicationLoggerHookCtx;
use crate::replication::replica::Replicator;
use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger};
use crate::stats::Stats;
Expand Down Expand Up @@ -759,8 +758,7 @@ impl Namespace<PrimaryDatabase> {

let connection_maker: Arc<_> = MakeLibSqlConn::new(
db_path.clone(),
&REPLICATION_METHODS,
ctx_builder.clone(),
todo!(),
stats.clone(),
db_config_store.clone(),
config.extensions.clone(),
Expand Down
Loading

0 comments on commit 7c7a3bf

Please sign in to comment.