Skip to content

Commit

Permalink
Merge branch 'main' into fix-hrana-row-order
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco authored Aug 25, 2023
2 parents e957d4a + 9493502 commit 8cbd684
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 33 deletions.
1 change: 1 addition & 0 deletions crates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ panic = "abort"
tonic = { git = "https://github.com/hyperium/tonic" }
tonic-web = { git = "https://github.com/hyperium/tonic", branch = "lucio/grpc-web-client" }
tonic-build = { git = "https://github.com/hyperium/tonic" }
tower-http = { git = "https://github.com/tower-rs/tower-http", branch = "lucio/grpc-defaults" }
7 changes: 6 additions & 1 deletion crates/core/examples/example_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ use libsql::v2::Database;
#[tokio::main]
async fn main() {
let db = if let Ok(url) = std::env::var("LIBSQL_HRANA_URL") {
Database::open_remote(url).unwrap()
let token = std::env::var("TURSO_AUTH_TOKEN").unwrap_or_else(|_| {
println!("TURSO_AUTH_TOKEN not set, using empty token...");
"".to_string()
});

Database::open_remote(url, token).unwrap()
} else {
Database::open_in_memory().unwrap()
};
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum Error {
ToSqlConversionFailure(crate::BoxError),
#[error("Hrana: `{0}`")]
Hrana(#[from] crate::v2::HranaError),
#[error("Sync is not supported")]
SyncNotSupported,
}

pub(crate) fn error_from_handle(raw: *mut libsql_sys::ffi::sqlite3) -> String {
Expand Down
29 changes: 24 additions & 5 deletions crates/core/src/v2/hrana.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod pipeline;
mod proto;

use hyper::header::AUTHORIZATION;
use pipeline::{
ClientMsg, Response, ServerMsg, StreamBatchReq, StreamExecuteReq, StreamRequest,
StreamResponse, StreamResponseError, StreamResponseOk,
Expand All @@ -14,6 +15,7 @@ use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
// use crate::client::Config;
use crate::{Column, Params, Result};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

use super::rows::{RowInner, RowsInner};
Expand All @@ -35,6 +37,7 @@ pub struct Client {
cookies: Arc<RwLock<HashMap<u64, Cookie>>>,
url_for_queries: String,
auth: String,
last_insert_rowid: Arc<AtomicI64>,
}

#[derive(Clone, Debug)]
Expand All @@ -54,15 +57,20 @@ impl InnerClient {
Self { inner }
}

async fn send(&self, url: String, _auth: String, body: String) -> Result<ServerMsg> {
async fn send(&self, url: String, auth: String, body: String) -> Result<ServerMsg> {
let req = hyper::Request::post(url)
.header(AUTHORIZATION, auth)
.body(hyper::Body::from(body))
.unwrap();

let res = self.inner.request(req).await.map_err(HranaError::from)?;

if res.status() != StatusCode::OK {
// TODO(lucio): Error branch!
let body = hyper::body::to_bytes(res.into_body())
.await
.map_err(HranaError::from)?;
let body = String::from_utf8(body.into()).unwrap();
return Err(HranaError::Api(body).into());
}

let body = hyper::body::to_bytes(res.into_body())
Expand All @@ -89,6 +97,8 @@ pub enum HranaError {
Json(#[from] serde_json::Error),
#[error("http error: `{0}`")]
Http(#[from] hyper::Error),
#[error("api error: `{0}`")]
Api(String),
}

impl Client {
Expand All @@ -102,6 +112,8 @@ impl Client {

let token = token.into();
let url = url.into();
// The `libsql://` protocol is an alias for `https://`.
let url = url.replace("libsql://", "https://");
// Auto-update the URL to start with https:// if no protocol was specified
let base_url = if !url.contains("://") {
format!("https://{}", &url)
Expand All @@ -114,6 +126,7 @@ impl Client {
cookies: Arc::new(RwLock::new(HashMap::new())),
url_for_queries,
auth: format!("Bearer {token}"),
last_insert_rowid: Arc::new(AtomicI64::new(0)),
}
}

Expand Down Expand Up @@ -290,7 +303,7 @@ impl Conn for Client {
}

fn last_insert_rowid(&self) -> i64 {
todo!()
self.last_insert_rowid.load(Ordering::SeqCst)
}
}

Expand All @@ -306,6 +319,9 @@ impl super::statement::Stmt for Statement {
bind_params(params.clone(), &mut stmt);

let v = self.client.execute_inner(stmt, 0).await?;
if let Some(last_insert_rowid) = v.last_insert_rowid {
self.client.last_insert_rowid.store(last_insert_rowid, Ordering::SeqCst);
}
Ok(v.affected_row_count as usize)
}

Expand All @@ -324,7 +340,6 @@ impl super::statement::Stmt for Statement {
}

fn reset(&self) {
todo!()
}

fn parameter_count(&self) -> usize {
Expand Down Expand Up @@ -365,7 +380,11 @@ impl RowsInner for Rows {
}

fn column_name(&self, idx: i32) -> Option<&str> {
todo!();
self.cols
.get(idx as usize)
.map(|c| c.name.as_ref())
.flatten()
.map(|s| s.as_str())
}
}

Expand Down
43 changes: 28 additions & 15 deletions crates/core/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub use transaction::Transaction;
enum DbType {
Memory,
File { path: String },
Sync { path: String, url: String, token: String },
Remote { url: String },
Sync { db: crate::Database },
Remote { url: String, auth_token: String },
}

pub struct Database {
Expand All @@ -43,19 +43,24 @@ impl Database {
})
}

pub fn open_with_sync(db_path: impl Into<String>, url: impl Into<String>, token: impl Into<String>) -> Result<Database> {
pub async fn open_with_sync(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
) -> Result<Database> {
let opts = crate::Opts::with_http_sync(url, token);
let db = crate::Database::open_with_opts(db_path, opts).await?;
Ok(Database {
db_type: DbType::Sync {
path: db_path.into(),
url: url.into(),
token: token.into(),
},
db_type: DbType::Sync { db },
})
}

pub fn open_remote(url: impl Into<String>) -> Result<Self> {
pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> Result<Self> {
Ok(Database {
db_type: DbType::Remote { url: url.into() },
db_type: DbType::Remote {
url: url.into(),
auth_token: auth_token.into(),
},
})
}

Expand All @@ -79,23 +84,31 @@ impl Database {
Ok(Connection { conn })
}

DbType::Sync { path, url, token } => {
let opts = crate::Opts::with_http_sync(url, token);
let db = crate::Database::open_with_opts(path, opts).await?;
DbType::Sync { db } => {
let conn = db.connect()?;

let conn = Arc::new(LibsqlConnection { conn });

Ok(Connection { conn })
}

DbType::Remote { url } => {
let conn = Arc::new(hrana::Client::new(url, ""));
DbType::Remote { url, auth_token } => {
let conn = Arc::new(hrana::Client::new(url, auth_token));

Ok(Connection { conn })
}
}
}

pub async fn sync(&self) -> Result<()> {
match &self.db_type {
DbType::Sync { db } => {
db.sync().await?;
Ok(())
}
_ => Err(crate::Error::SyncNotSupported),
}
}
}

#[async_trait::async_trait]
Expand Down
1 change: 1 addition & 0 deletions crates/replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde_json = "1.0.103"
serde = { version = "1.0.173", features = ["serde_derive"] }
tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic-web = "0.9"
tower-http = { version = "0.4", features = ["trace", "util"] }
prost = "0.11"
hyper = "0.14"
hyper-rustls = { version = "0.24", features = ["webpki-roots"] }
Expand Down
30 changes: 21 additions & 9 deletions crates/replication/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,27 @@ use std::{
task::{Context, Poll},
};

use hyper::{client::HttpConnector, Client};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper::Client;
use hyper_rustls::HttpsConnectorBuilder;
use tonic::body::BoxBody;
use tonic_web::{GrpcWebCall, GrpcWebClientService};
use tower::Service;
use tower::{util::BoxCloneService, Service, ServiceBuilder};
use tower_http::{classify, trace, ServiceBuilderExt};

type ResponseBody = trace::ResponseBody<
GrpcWebCall<hyper::Body>,
classify::GrpcEosErrorsAsFailures,
trace::DefaultOnBodyChunk,
trace::DefaultOnEos,
trace::DefaultOnFailure,
>;

#[derive(Debug, Clone)]
pub struct H2cChannel {
client: GrpcWebClientService<Client<HttpsConnector<HttpConnector>, GrpcWebCall<BoxBody>>>,
pub struct GrpcChannel {
client: BoxCloneService<http::Request<BoxBody>, http::Response<ResponseBody>, hyper::Error>,
}

impl H2cChannel {
#[allow(unused)]
impl GrpcChannel {
pub fn new() -> Self {
let https = HttpsConnectorBuilder::new()
.with_webpki_roots()
Expand All @@ -26,12 +34,16 @@ impl H2cChannel {
let client = Client::builder().build(https);
let client = GrpcWebClientService::new(client);

let svc = ServiceBuilder::new().trace_for_grpc().service(client);

let client = BoxCloneService::new(svc);

Self { client }
}
}

impl Service<http::Request<BoxBody>> for H2cChannel {
type Response = http::Response<GrpcWebCall<hyper::Body>>;
impl Service<http::Request<BoxBody>> for GrpcChannel {
type Response = http::Response<ResponseBody>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand Down
6 changes: 3 additions & 3 deletions crates/replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

use crate::client::H2cChannel;
use crate::client::GrpcChannel;
use crate::pb::HelloRequest;

type RpcClient = pb::ReplicationLogClient<InterceptedService<H2cChannel, AuthInterceptor>>;
type RpcClient = pb::ReplicationLogClient<InterceptedService<GrpcChannel, AuthInterceptor>>;

pub struct Replicator {
pub frames_sender: Sender<Frames>,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl Replicator {
//.tls_config(tonic::transport::ClientTlsConfig::new())?
//.connect_lazy();

let channel = H2cChannel::new();
let channel = GrpcChannel::new();

let mut client = pb::ReplicationLogClient::with_origin(
InterceptedService::new(channel, AuthInterceptor(auth_token)),
Expand Down

0 comments on commit 8cbd684

Please sign in to comment.