Skip to content

Commit

Permalink
libsql: Initial offline write support
Browse files Browse the repository at this point in the history
This patch add initial support for offline writes. To open a local
database that can sync to remote server, use the
Builder::new_synced_database() API.
  • Loading branch information
penberg committed Nov 4, 2024
1 parent 8645504 commit 2a1fa88
Show file tree
Hide file tree
Showing 9 changed files with 716 additions and 23 deletions.
376 changes: 362 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fallible-iterator = { version = "0.3", optional = true }

libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }
reqwest = { version = "0.12.9", features = [ "rustls-tls" ] }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
Expand Down
85 changes: 85 additions & 0 deletions libsql/examples/offline_writes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Example of using a offline writes with libSQL.

use libsql::{params, Builder};

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

// The local database path where the data will be stored.
let db_path = std::env::var("LIBSQL_DB_PATH")
.map_err(|_| {
eprintln!(
"Please set the LIBSQL_DB_PATH environment variable to set to local database path."
)
})
.unwrap();

// The remote sync URL to use.
let sync_url = std::env::var("LIBSQL_SYNC_URL")
.map_err(|_| {
eprintln!(
"Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL."
)
})
.unwrap();

let namespace = std::env::var("LIBSQL_NAMESPACE").ok();

// The authentication token to use.
let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string());

let db_builder = if let Some(ns) = namespace {
Builder::new_synced_database(db_path, sync_url, auth_token).namespace(&ns)
} else {
Builder::new_synced_database(db_path, sync_url, auth_token)
};

let db = match db_builder.build().await {
Ok(db) => db,
Err(error) => {
eprintln!("Error connecting to remote sync server: {}", error);
return;
}
};

let conn = db.connect().unwrap();

conn.execute(
r#"
CREATE TABLE IF NOT EXISTS guest_book_entries (
text TEXT
)"#,
(),
)
.await
.unwrap();

let mut input = String::new();
println!("Please write your entry to the guestbook:");
match std::io::stdin().read_line(&mut input) {
Ok(_) => {
println!("You entered: {}", input);
let params = params![input.as_str()];
conn.execute("INSERT INTO guest_book_entries (text) VALUES (?)", params)
.await
.unwrap();
}
Err(error) => {
eprintln!("Error reading input: {}", error);
}
}
let mut results = conn
.query("SELECT * FROM guest_book_entries", ())
.await
.unwrap();
println!("Guest book entries:");
while let Some(row) = results.next().await.unwrap() {
let text: String = row.get(0).unwrap();
println!(" {}", text);
}

print!("Syncing database to remote...");
db.sync().await.unwrap();
println!(" done");
}
23 changes: 19 additions & 4 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ enum DbType {
db: crate::local::Database,
encryption_config: Option<EncryptionConfig>,
},
#[cfg(feature = "replication")]
Offline { db: crate::local::Database },
#[cfg(feature = "remote")]
Remote {
url: String,
Expand All @@ -66,6 +68,8 @@ impl fmt::Debug for DbType {
Self::File { .. } => write!(f, "File"),
#[cfg(feature = "replication")]
Self::Sync { .. } => write!(f, "Sync"),
#[cfg(feature = "replication")]
Self::Offline { .. } => write!(f, "Offline"),
#[cfg(feature = "remote")]
Self::Remote { .. } => write!(f, "Remote"),
_ => write!(f, "no database type set"),
Expand Down Expand Up @@ -333,10 +337,10 @@ cfg_replication! {
/// Sync database from remote, and returns the committed frame_no after syncing, if
/// applicable.
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync().await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
match &self.db_type {
DbType::Sync { db, encryption_config: _ } => db.sync().await,
DbType::Offline { db } => db.push().await,
_ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
}
}

Expand Down Expand Up @@ -595,6 +599,17 @@ impl Database {
Ok(Connection { conn })
}

#[cfg(feature = "replication")]
DbType::Offline { db } => {
use crate::local::impls::LibsqlConnection;

let conn = db.connect()?;

let conn = std::sync::Arc::new(LibsqlConnection { conn });

Ok(Connection { conn })
}

#[cfg(feature = "remote")]
DbType::Remote {
url,
Expand Down
120 changes: 120 additions & 0 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use super::DbType;
/// it does no networking and does not connect to any remote database.
/// - `new_remote_replica`/`RemoteReplica` creates an embedded replica database that will be able
/// to sync from the remote url and delegate writes to the remote primary.
/// - `new_synced_database`/`SyncedDatabase` creates an embedded replica database that supports
/// offline writes.
/// - `new_local_replica`/`LocalReplica` creates an embedded replica similar to the remote version
/// except you must use `Database::sync_frames` to sync with the remote. This version also
/// includes the ability to delegate writes to a remote primary.
Expand Down Expand Up @@ -66,6 +68,30 @@ impl Builder<()> {
}
}

cfg_replication! {
/// Create a new offline embedded replica.
pub fn new_synced_database(
path: impl AsRef<std::path::Path>,
url: String,
auth_token: String,
) -> Builder<SyncedDatabase> {
Builder {
inner: SyncedDatabase {
path: path.as_ref().to_path_buf(),
flags: crate::OpenFlags::default(),
remote: Remote {
url,
auth_token,
connector: None,
version: None,
},
http_request_callback: None,
namespace: None
},
}
}
}

/// Create a new local replica.
pub fn new_local_replica(path: impl AsRef<std::path::Path>) -> Builder<LocalReplica> {
Builder {
Expand Down Expand Up @@ -172,6 +198,15 @@ cfg_replication! {
namespace: Option<String>,
}

/// Remote replica configuration type in [`Builder`].
pub struct SyncedDatabase {
path: std::path::PathBuf,
flags: crate::OpenFlags,
remote: Remote,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
}

/// Local replica configuration type in [`Builder`].
pub struct LocalReplica {
path: std::path::PathBuf,
Expand Down Expand Up @@ -298,6 +333,91 @@ cfg_replication! {
}
}

impl Builder<SyncedDatabase> {
/// Provide a custom http connector that will be used to create http connections.
pub fn connector<C>(mut self, connector: C) -> Builder<SyncedDatabase>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
self.inner.remote = self.inner.remote.connector(connector);
self
}

pub fn http_request_callback<F>(mut self, f: F) -> Builder<SyncedDatabase>
where
F: Fn(&mut http::Request<()>) + Send + Sync + 'static
{
self.inner.http_request_callback = Some(std::sync::Arc::new(f));
self

}

/// Set the namespace that will be communicated to remote replica in the http header.
pub fn namespace(mut self, namespace: impl Into<String>) -> Builder<SyncedDatabase>
{
self.inner.namespace = Some(namespace.into());
self
}

#[doc(hidden)]
pub fn version(mut self, version: String) -> Builder<SyncedDatabase> {
self.inner.remote = self.inner.remote.version(version);
self
}

/// Build a connection to a local database that can be synced to remote server.
pub async fn build(self) -> Result<Database> {
let SyncedDatabase {
path,
flags,
remote:
Remote {
url,
auth_token,
connector,
version,
},
http_request_callback,
namespace
} = self.inner;

let connector = if let Some(connector) = connector {
connector
} else {
let https = super::connector()?;
use tower::ServiceExt;

let svc = https
.map_err(|e| e.into())
.map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);

crate::util::ConnectorService::new(svc)
};

let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();

let db = crate::local::Database::open_local_with_offline_writes(
connector,
path,
flags,
url,
auth_token,
version,
http_request_callback,
namespace,
)
.await?;

Ok(Database {
db_type: DbType::Offline { db },
max_write_replication_index: Default::default(),
})
}
}

impl Builder<LocalReplica> {
/// Set [`OpenFlags`] for this database.
pub fn flags(mut self, flags: crate::OpenFlags) -> Builder<LocalReplica> {
Expand Down
1 change: 1 addition & 0 deletions libsql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ cfg_parser! {

mod rows;
mod statement;
mod sync;
mod transaction;
mod value;

Expand Down
17 changes: 13 additions & 4 deletions libsql/src/local/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};

use crate::TransactionBehavior;

use libsql_sys::ffi;
use libsql_sys::{ffi, wal};
use std::{ffi::c_int, fmt, path::Path, sync::Arc};

/// A connection to a libSQL database.
Expand Down Expand Up @@ -57,13 +57,22 @@ impl Connection {
)));
}
}

Ok(Connection {
let conn = Connection {
raw,
drop_ref: Arc::new(()),
#[cfg(feature = "replication")]
writer: db.writer()?,
})
};
if let Some(_) = db.sync_ctx {
// We need to make sure database is in WAL mode with checkpointing
// disabled so that we can sync our changes back to a remote
// server.
conn.query("PRAGMA journal_mode = WAL", Params::None)?;
unsafe {
ffi::libsql_wal_disable_checkpoint(conn.raw);
}
}
Ok(conn)
}

/// Get a raw handle to the underlying libSQL connection
Expand Down
Loading

0 comments on commit 2a1fa88

Please sign in to comment.