Skip to content

Commit

Permalink
Merge pull request #607 from Horusiath/libsql-execute-batch
Browse files Browse the repository at this point in the history
Libsql client: execute batch
  • Loading branch information
LucioFranco authored Nov 13, 2023
2 parents c3372a8 + e74b4bb commit a30080f
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 22 deletions.
59 changes: 59 additions & 0 deletions libsql-server/tests/hrana/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use insta::assert_json_snapshot;
use libsql::{params, Database};
use sqld::hrana_proto::{Batch, BatchStep, Stmt};

use crate::common::http::Client;
use crate::common::net::TurmoilConnector;

#[test]
fn sample_request() {
Expand Down Expand Up @@ -34,3 +36,60 @@ fn sample_request() {

sim.run().unwrap();
}

#[test]
fn execute_individual_statements() {
let mut sim = turmoil::Builder::new().build();
sim.host("primary", super::make_standalone_server);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

conn.execute("create table t(x text)", ()).await?;
conn.execute("insert into t(x) values(?)", params!["hello"])
.await?;
let mut rows = conn
.query("select * from t where x = ?", params!["hello"])
.await?;

assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert_eq!(rows.next()?.unwrap().get::<String>(0)?, "hello");
assert!(rows.next()?.is_none());

Ok(())
});

sim.run().unwrap();
}

#[test]
fn execute_batch() {
let mut sim = turmoil::Builder::new().build();
sim.host("primary", super::make_standalone_server);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

conn.execute_batch(
r#"
begin;
create table t(x text);
insert into t(x) values('hello; world');
end;"#,
)
.await?;
let mut rows = conn
.query("select * from t where x = ?", params!["hello; world"])
.await?;

assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert_eq!(rows.next()?.unwrap().get::<String>(0)?, "hello; world");
assert!(rows.next()?.is_none());

Ok(())
});

sim.run().unwrap();
}
8 changes: 6 additions & 2 deletions libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ core = [
"dep:bitflags",
"dep:futures"
]
parser = [
"dep:sqlite3-parser",
"dep:fallible-iterator"
]
replication = [
"core",
"parser",
"serde",
"dep:tower",
"dep:hyper",
Expand All @@ -70,10 +75,8 @@ replication = [
"dep:bincode",
"dep:bytemuck",
"dep:bytes",
"dep:fallible-iterator",
"dep:uuid",
"dep:tokio-stream",
"dep:sqlite3-parser",
"dep:parking_lot",
"dep:tokio",
"dep:tonic",
Expand All @@ -85,6 +88,7 @@ replication = [
"dep:libsql-replication"
]
hrana = [
"parser",
"serde",
"dep:base64",
"dep:serde_json",
Expand Down
14 changes: 12 additions & 2 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::hrana::connection::HttpConnection;
use crate::hrana::pipeline::ServerMsg;
use crate::hrana::proto::Stmt;
use crate::hrana::{HranaError, HttpSend, Result};
use crate::params::Params;
use crate::util::ConnectorService;
Expand Down Expand Up @@ -94,8 +95,17 @@ impl crate::connection::Conn for HttpConnection<HttpSender> {
Ok(rows as u64)
}

async fn execute_batch(&self, _sql: &str) -> crate::Result<()> {
todo!()
async fn execute_batch(&self, sql: &str) -> crate::Result<()> {
let mut statements = Vec::new();
let stmts = crate::parser::Statement::parse(sql);
for s in stmts {
let s = s?;
statements.push(Stmt::new(s.stmt, false));
}
self.raw_batch(statements)
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
Ok(())
}

async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
Expand Down
18 changes: 16 additions & 2 deletions libsql/src/hrana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,22 @@ impl RowsInner for Rows {
.map(|s| s.as_str())
}

fn column_type(&self, _idx: i32) -> crate::Result<ValueType> {
todo!("implement")
fn column_type(&self, idx: i32) -> crate::Result<ValueType> {
let row = match self.rows.get(0) {
None => return Err(crate::Error::QueryReturnedNoRows),
Some(row) => row,
};
let cell = match row.get(idx as usize) {
None => return Err(crate::Error::ColumnNotFound(idx)),
Some(cell) => cell,
};
Ok(match cell {
proto::Value::Null => ValueType::Null,
proto::Value::Integer { .. } => ValueType::Integer,
proto::Value::Float { .. } => ValueType::Real,
proto::Value::Text { .. } => ValueType::Text,
proto::Value::Blob { .. } => ValueType::Blob,
})
}
}

Expand Down
3 changes: 3 additions & 0 deletions libsql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub use params::params_from_iter;

mod connection;
mod database;
cfg_parser! {
mod parser;
}
mod rows;
mod statement;
mod transaction;
Expand Down
10 changes: 10 additions & 0 deletions libsql/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ macro_rules! cfg_replication {
}
}

macro_rules! cfg_parser {
($($item:item)*) => {
$(
#[cfg(feature = "parser")]
#[cfg_attr(docsrs, doc(cfg(feature = "parser")))]
$item
)*
}
}

macro_rules! cfg_hrana {
($($item:item)*) => {
$(
Expand Down
2 changes: 2 additions & 0 deletions libsql/src/replication/parser.rs → libsql/src/parser.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use crate::{Error, Result};
use fallible_iterator::FallibleIterator;
use sqlite3_parser::ast::{Cmd, PragmaBody, QualifiedName, Stmt, TransactionType};
Expand Down
25 changes: 13 additions & 12 deletions libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
use std::str::FromStr;
use std::sync::Arc;

use parking_lot::Mutex;
use libsql_replication::rpc::proxy::{
describe_result, State as RemoteState, query_result::RowResult, DescribeResult,
ExecuteResults, ResultRows
describe_result, query_result::RowResult, DescribeResult, ExecuteResults, ResultRows,
State as RemoteState,
};
use parking_lot::Mutex;

use crate::parser;
use crate::parser::StmtKind;
use crate::rows::{RowInner, RowsInner};
use crate::statement::Stmt;
use crate::transaction::Tx;
Expand All @@ -21,8 +23,6 @@ use crate::{Column, Row, Rows, Value};
use crate::connection::Conn;
use crate::local::impls::LibsqlConnection;

use super::parser::{self, StmtKind};

#[derive(Clone)]
pub struct RemoteConnection {
pub(self) local: LibsqlConnection,
Expand Down Expand Up @@ -175,7 +175,9 @@ impl RemoteConnection {
params: Params,
) -> Result<ExecuteResults> {
let Some(ref writer) = self.writer else {
return Err(Error::Misuse("Cannot delegate write in local replica mode.".into()));
return Err(Error::Misuse(
"Cannot delegate write in local replica mode.".into(),
));
};
let res = writer
.execute_program(stmts, params)
Expand All @@ -194,7 +196,9 @@ impl RemoteConnection {

pub(self) async fn describe(&self, stmt: impl Into<String>) -> Result<DescribeResult> {
let Some(ref writer) = self.writer else {
return Err(Error::Misuse("Cannot describe in local replica mode.".into()));
return Err(Error::Misuse(
"Cannot describe in local replica mode.".into(),
));
};
let res = writer
.describe(stmt)
Expand Down Expand Up @@ -566,10 +570,7 @@ impl Stmt for RemoteStatement {
}
}

pub(crate) struct RemoteRows(
pub(crate) ResultRows,
pub(crate) usize,
);
pub(crate) struct RemoteRows(pub(crate) ResultRows, pub(crate) usize);

impl RowsInner for RemoteRows {
fn next(&mut self) -> Result<Option<Row>> {
Expand Down Expand Up @@ -685,7 +686,7 @@ impl Tx for RemoteTx {

#[cfg(test)]
mod tests {
use crate::replication::parser::Statement;
use crate::parser::Statement;

use super::{should_execute_local, State};

Expand Down
10 changes: 6 additions & 4 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use libsql_replication::rpc::proxy::{DescribeRequest, DescribeResult, ExecuteResults, Positional, Program, ProgramReq, Query, Step, query::Params};
use libsql_replication::frame::Frame;
use libsql_replication::rpc::proxy::{
query::Params, DescribeRequest, DescribeResult, ExecuteResults, Positional, Program,
ProgramReq, Query, Step,
};
use libsql_replication::snapshot::SnapshotFile;

use parser::Statement;
use crate::parser::Statement;

pub use connection::RemoteConnection;

pub(crate) mod client;
mod connection;
mod parser;
pub(crate) mod remote_client;
pub(crate) mod local_client;
pub(crate) mod remote_client;

pub enum Frames {
/// A set of frames, in increasing frame_no.
Expand Down
14 changes: 14 additions & 0 deletions libsql/src/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ where
Ok(rows as u64)
}

pub async fn execute_batch(&self, sql: &str) -> crate::Result<()> {
let mut statements = Vec::new();
let stmts = crate::parser::Statement::parse(sql);
for s in stmts {
let s = s?;
statements.push(crate::hrana::proto::Stmt::new(s.stmt, false));
}
self.conn
.raw_batch(statements)
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
Ok(())
}

pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
tracing::trace!("querying `{}`", sql);
let mut stmt = crate::hrana::Statement::new(self.conn.clone(), sql.to_string(), true);
Expand Down

0 comments on commit a30080f

Please sign in to comment.