From 5d61aa28f50b653fe7b45100cb28b2f4317b9af8 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 11:03:50 +0300 Subject: [PATCH 1/8] libsql/core: Implement hrana Statement::reset() There's no state to reset so it's a no-op. --- crates/core/src/v2/hrana.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/v2/hrana.rs b/crates/core/src/v2/hrana.rs index 9f5b5e5035..235f862449 100644 --- a/crates/core/src/v2/hrana.rs +++ b/crates/core/src/v2/hrana.rs @@ -324,7 +324,6 @@ impl super::statement::Stmt for Statement { } fn reset(&self) { - todo!() } fn parameter_count(&self) -> usize { From c32fd73e6b73dd15be85c470a41367d6f74ab351 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 11:18:04 +0300 Subject: [PATCH 2/8] libsql/core: Implement hrana Connection::last_insert_rowid() --- crates/core/src/v2/hrana.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/core/src/v2/hrana.rs b/crates/core/src/v2/hrana.rs index 9f5b5e5035..5583a83c72 100644 --- a/crates/core/src/v2/hrana.rs +++ b/crates/core/src/v2/hrana.rs @@ -14,6 +14,7 @@ use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; // use crate::client::Config; use crate::{Column, Params, Result}; use std::collections::HashMap; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use super::rows::{RowInner, RowsInner}; @@ -35,6 +36,7 @@ pub struct Client { cookies: Arc>>, url_for_queries: String, auth: String, + last_insert_rowid: Arc, } #[derive(Clone, Debug)] @@ -114,6 +116,7 @@ impl Client { cookies: Arc::new(RwLock::new(HashMap::new())), url_for_queries, auth: format!("Bearer {token}"), + last_insert_rowid: Arc::new(AtomicI64::new(0)), } } @@ -290,7 +293,7 @@ impl Conn for Client { } fn last_insert_rowid(&self) -> i64 { - todo!() + self.last_insert_rowid.load(Ordering::SeqCst) } } @@ -306,6 +309,9 @@ impl super::statement::Stmt for Statement { bind_params(params.clone(), &mut stmt); let v = self.client.execute_inner(stmt, 0).await?; + if let Some(last_insert_rowid) = v.last_insert_rowid { + self.client.last_insert_rowid.store(last_insert_rowid, Ordering::SeqCst); + } Ok(v.affected_row_count as usize) } From e5118d8804a3fef1e5939bdd0c91f04b484820de Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 11:20:12 +0300 Subject: [PATCH 3/8] libsql/core: Implement hrana Rows::column_name() --- crates/core/src/v2/hrana.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/core/src/v2/hrana.rs b/crates/core/src/v2/hrana.rs index 9f5b5e5035..5e5c944b1c 100644 --- a/crates/core/src/v2/hrana.rs +++ b/crates/core/src/v2/hrana.rs @@ -365,7 +365,11 @@ impl RowsInner for Rows { } fn column_name(&self, idx: i32) -> Option<&str> { - todo!(); + self.cols + .get(idx as usize) + .map(|c| c.name.as_ref()) + .flatten() + .map(|s| s.as_str()) } } From 166b6208b33f867a3f876d9549599c5163c48a4b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 11:32:25 +0300 Subject: [PATCH 4/8] libsql/core: Implement Database::sync() in V2 --- crates/core/src/errors.rs | 2 ++ crates/core/src/v2/mod.rs | 26 ++++++++++++++++---------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/core/src/errors.rs b/crates/core/src/errors.rs index 94381cbbc9..f603caf232 100644 --- a/crates/core/src/errors.rs +++ b/crates/core/src/errors.rs @@ -24,6 +24,8 @@ pub enum Error { ToSqlConversionFailure(crate::BoxError), #[error("Hrana: `{0}`")] Hrana(#[from] crate::v2::HranaError), + #[error("Sync is not supported")] + SyncNotSupported, } pub(crate) fn error_from_handle(raw: *mut libsql_sys::ffi::sqlite3) -> String { diff --git a/crates/core/src/v2/mod.rs b/crates/core/src/v2/mod.rs index 1e79cdb945..9b77d0bacc 100644 --- a/crates/core/src/v2/mod.rs +++ b/crates/core/src/v2/mod.rs @@ -20,7 +20,7 @@ pub use transaction::Transaction; enum DbType { Memory, File { path: String }, - Sync { path: String, url: String, token: String }, + Sync { db: crate::Database }, Remote { url: String }, } @@ -43,13 +43,11 @@ impl Database { }) } - pub fn open_with_sync(db_path: impl Into, url: impl Into, token: impl Into) -> Result { + pub async fn open_with_sync(db_path: impl Into, url: impl Into, token: impl Into) -> Result { + let opts = crate::Opts::with_http_sync(url, token); + let db = crate::Database::open_with_opts(db_path, opts).await?; Ok(Database { - db_type: DbType::Sync { - path: db_path.into(), - url: url.into(), - token: token.into(), - }, + db_type: DbType::Sync { db }, }) } @@ -79,9 +77,7 @@ impl Database { Ok(Connection { conn }) } - DbType::Sync { path, url, token } => { - let opts = crate::Opts::with_http_sync(url, token); - let db = crate::Database::open_with_opts(path, opts).await?; + DbType::Sync { db } => { let conn = db.connect()?; let conn = Arc::new(LibsqlConnection { conn }); @@ -96,6 +92,16 @@ impl Database { } } } + + pub async fn sync(&self) -> Result<()> { + match &self.db_type { + DbType::Sync { db } => { + db.sync().await?; + Ok(()) + } + _ => Err(crate::Error::SyncNotSupported), + } + } } #[async_trait::async_trait] From 9f031fcd411b28dba96d4bf7fdfd0a29a678d3ab Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 13:19:22 +0300 Subject: [PATCH 5/8] libsql/core: Add support for `libsql://` scheme in Hrana --- crates/core/src/v2/hrana.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/core/src/v2/hrana.rs b/crates/core/src/v2/hrana.rs index 9f5b5e5035..c5a8915937 100644 --- a/crates/core/src/v2/hrana.rs +++ b/crates/core/src/v2/hrana.rs @@ -102,6 +102,8 @@ impl Client { let token = token.into(); let url = url.into(); + // The `libsql://` protocol is an alias for `https://`. + let url = url.replace("libsql://", "https://"); // Auto-update the URL to start with https:// if no protocol was specified let base_url = if !url.contains("://") { format!("https://{}", &url) From 66cb23d00e78f218c21217f9cae70948c3cb7be1 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 24 Aug 2023 15:04:23 -0400 Subject: [PATCH 6/8] libsql/replication: Improve tracing for client --- crates/Cargo.toml | 1 + crates/replication/Cargo.toml | 1 + crates/replication/src/client.rs | 30 +++++++++++++++++++++--------- crates/replication/src/lib.rs | 6 +++--- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/crates/Cargo.toml b/crates/Cargo.toml index 6085357ca5..ad1f3a32f0 100644 --- a/crates/Cargo.toml +++ b/crates/Cargo.toml @@ -18,3 +18,4 @@ panic = "abort" tonic = { git = "https://github.com/hyperium/tonic" } tonic-web = { git = "https://github.com/hyperium/tonic", branch = "lucio/grpc-web-client" } tonic-build = { git = "https://github.com/hyperium/tonic" } +tower-http = { git = "https://github.com/tower-rs/tower-http", branch = "lucio/grpc-defaults" } diff --git a/crates/replication/Cargo.toml b/crates/replication/Cargo.toml index d886b1cc40..2c093d2358 100644 --- a/crates/replication/Cargo.toml +++ b/crates/replication/Cargo.toml @@ -28,6 +28,7 @@ serde_json = "1.0.103" serde = { version = "1.0.173", features = ["serde_derive"] } tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] } tonic-web = "0.9" +tower-http = { version = "0.4", features = ["trace", "util"] } prost = "0.11" hyper = "0.14" hyper-rustls = { version = "0.24", features = ["webpki-roots"] } diff --git a/crates/replication/src/client.rs b/crates/replication/src/client.rs index b1a269c0e4..60dc28414a 100644 --- a/crates/replication/src/client.rs +++ b/crates/replication/src/client.rs @@ -3,19 +3,27 @@ use std::{ task::{Context, Poll}, }; -use hyper::{client::HttpConnector, Client}; -use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use hyper::Client; +use hyper_rustls::HttpsConnectorBuilder; use tonic::body::BoxBody; use tonic_web::{GrpcWebCall, GrpcWebClientService}; -use tower::Service; +use tower::{util::BoxCloneService, Service, ServiceBuilder}; +use tower_http::{classify, trace, ServiceBuilderExt}; + +type ResponseBody = trace::ResponseBody< + GrpcWebCall, + classify::GrpcEosErrorsAsFailures, + trace::DefaultOnBodyChunk, + trace::DefaultOnEos, + trace::DefaultOnFailure, +>; #[derive(Debug, Clone)] -pub struct H2cChannel { - client: GrpcWebClientService, GrpcWebCall>>, +pub struct GrpcChannel { + client: BoxCloneService, http::Response, hyper::Error>, } -impl H2cChannel { - #[allow(unused)] +impl GrpcChannel { pub fn new() -> Self { let https = HttpsConnectorBuilder::new() .with_webpki_roots() @@ -26,12 +34,16 @@ impl H2cChannel { let client = Client::builder().build(https); let client = GrpcWebClientService::new(client); + let svc = ServiceBuilder::new().trace_for_grpc().service(client); + + let client = BoxCloneService::new(svc); + Self { client } } } -impl Service> for H2cChannel { - type Response = http::Response>; +impl Service> for GrpcChannel { + type Response = http::Response; type Error = hyper::Error; type Future = Pin> + Send>>; diff --git a/crates/replication/src/lib.rs b/crates/replication/src/lib.rs index a12fa9b564..7df5b0920e 100644 --- a/crates/replication/src/lib.rs +++ b/crates/replication/src/lib.rs @@ -31,10 +31,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::mpsc::Sender; -use crate::client::H2cChannel; +use crate::client::GrpcChannel; use crate::pb::HelloRequest; -type RpcClient = pb::ReplicationLogClient>; +type RpcClient = pb::ReplicationLogClient>; pub struct Replicator { pub frames_sender: Sender, @@ -155,7 +155,7 @@ impl Replicator { //.tls_config(tonic::transport::ClientTlsConfig::new())? //.connect_lazy(); - let channel = H2cChannel::new(); + let channel = GrpcChannel::new(); let mut client = pb::ReplicationLogClient::with_origin( InterceptedService::new(channel, AuthInterceptor(auth_token)), From 63bc63d839b5b588c026e16598ace0e8d4ff2a32 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 25 Aug 2023 13:23:22 +0300 Subject: [PATCH 7/8] libsql/core: Add auth token parameter to open_remote() in V2 --- crates/core/src/v2/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/core/src/v2/mod.rs b/crates/core/src/v2/mod.rs index 9b77d0bacc..8605266c35 100644 --- a/crates/core/src/v2/mod.rs +++ b/crates/core/src/v2/mod.rs @@ -21,7 +21,7 @@ enum DbType { Memory, File { path: String }, Sync { db: crate::Database }, - Remote { url: String }, + Remote { url: String, auth_token: String }, } pub struct Database { @@ -43,7 +43,11 @@ impl Database { }) } - pub async fn open_with_sync(db_path: impl Into, url: impl Into, token: impl Into) -> Result { + pub async fn open_with_sync( + db_path: impl Into, + url: impl Into, + token: impl Into, + ) -> Result { let opts = crate::Opts::with_http_sync(url, token); let db = crate::Database::open_with_opts(db_path, opts).await?; Ok(Database { @@ -51,9 +55,12 @@ impl Database { }) } - pub fn open_remote(url: impl Into) -> Result { + pub fn open_remote(url: impl Into, auth_token: impl Into) -> Result { Ok(Database { - db_type: DbType::Remote { url: url.into() }, + db_type: DbType::Remote { + url: url.into(), + auth_token: auth_token.into(), + }, }) } @@ -85,8 +92,8 @@ impl Database { Ok(Connection { conn }) } - DbType::Remote { url } => { - let conn = Arc::new(hrana::Client::new(url, "")); + DbType::Remote { url, auth_token } => { + let conn = Arc::new(hrana::Client::new(url, auth_token)); Ok(Connection { conn }) } From 8a191bde86604bd76d0ec2dbbf045731996dcc4d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 25 Aug 2023 11:06:13 -0400 Subject: [PATCH 8/8] libsql/hrana: Add auth and error handling --- crates/core/examples/example_v2.rs | 7 ++++++- crates/core/src/v2/hrana.rs | 12 ++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/crates/core/examples/example_v2.rs b/crates/core/examples/example_v2.rs index f190261075..62aaa1a865 100644 --- a/crates/core/examples/example_v2.rs +++ b/crates/core/examples/example_v2.rs @@ -3,7 +3,12 @@ use libsql::v2::Database; #[tokio::main] async fn main() { let db = if let Ok(url) = std::env::var("LIBSQL_HRANA_URL") { - Database::open_remote(url).unwrap() + let token = std::env::var("TURSO_AUTH_TOKEN").unwrap_or_else(|_| { + println!("TURSO_AUTH_TOKEN not set, using empty token..."); + "".to_string() + }); + + Database::open_remote(url, token).unwrap() } else { Database::open_in_memory().unwrap() }; diff --git a/crates/core/src/v2/hrana.rs b/crates/core/src/v2/hrana.rs index 3f14fb1521..517700365f 100644 --- a/crates/core/src/v2/hrana.rs +++ b/crates/core/src/v2/hrana.rs @@ -1,6 +1,7 @@ mod pipeline; mod proto; +use hyper::header::AUTHORIZATION; use pipeline::{ ClientMsg, Response, ServerMsg, StreamBatchReq, StreamExecuteReq, StreamRequest, StreamResponse, StreamResponseError, StreamResponseOk, @@ -56,15 +57,20 @@ impl InnerClient { Self { inner } } - async fn send(&self, url: String, _auth: String, body: String) -> Result { + async fn send(&self, url: String, auth: String, body: String) -> Result { let req = hyper::Request::post(url) + .header(AUTHORIZATION, auth) .body(hyper::Body::from(body)) .unwrap(); let res = self.inner.request(req).await.map_err(HranaError::from)?; if res.status() != StatusCode::OK { - // TODO(lucio): Error branch! + let body = hyper::body::to_bytes(res.into_body()) + .await + .map_err(HranaError::from)?; + let body = String::from_utf8(body.into()).unwrap(); + return Err(HranaError::Api(body).into()); } let body = hyper::body::to_bytes(res.into_body()) @@ -91,6 +97,8 @@ pub enum HranaError { Json(#[from] serde_json::Error), #[error("http error: `{0}`")] Http(#[from] hyper::Error), + #[error("api error: `{0}`")] + Api(String), } impl Client {