From fb3ab3e70651df2276de0233cde6d7f81120ef3a Mon Sep 17 00:00:00 2001 From: jaredzhou Date: Wed, 26 Jan 2022 22:15:19 +0800 Subject: [PATCH 1/4] add r2d2 connection pool --- Cargo.toml | 2 + src/lib.rs | 2 + src/r2d2.rs | 231 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 src/r2d2.rs diff --git a/Cargo.toml b/Cargo.toml index b60cca31..678e1a26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ cast = { version = "0.3", features = ["std"] } arrow = { version = "6.5.0", default-features = false, features = ["prettyprint"] } rust_decimal = "1.14" strum = { version = "0.23", features = ["derive"] } +r2d2 = "0.8.9" [dev-dependencies] doc-comment = "0.3" @@ -57,6 +58,7 @@ regex = "1.3" uuid = { version = "0.8", features = ["v4"] } unicase = "2.6.0" rand = "0.8.3" +tempdir = "0.3.7" # criterion = "0.3" # [[bench]] diff --git a/src/lib.rs b/src/lib.rs index 281b537f..432e1ad4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ pub use crate::row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows}; pub use crate::statement::Statement; pub use crate::transaction::{DropBehavior, Savepoint, Transaction, TransactionBehavior}; pub use crate::types::ToSql; +pub use crate::r2d2::DuckdbConnectionManager; #[macro_use] mod error; @@ -97,6 +98,7 @@ mod raw_statement; mod row; mod statement; mod transaction; +mod r2d2; pub mod types; diff --git a/src/r2d2.rs b/src/r2d2.rs new file mode 100644 index 00000000..97c0175f --- /dev/null +++ b/src/r2d2.rs @@ -0,0 +1,231 @@ +#![deny(warnings)] +//! # Duckdb-rs support for the `r2d2` connection pool. +//! +//! +//! Integrated with: [r2d2](https://crates.io/crates/r2d2) +//! +//! +//! ## Example +//! +//! ```rust,no_run +//! extern crate r2d2; +//! extern crate duckdb; +//! +//! +//! use std::thread; +//! use duckdb::{DuckdbConnectionManager, params}; +//! +//! +//! fn main() { +//! let manager = DuckdbConnectionManager::file("file.db").unwrap(); +//! let pool = r2d2::Pool::new(manager).unwrap(); +//! pool.get() +//! .unwrap() +//! .execute("CREATE TABLE IF NOT EXISTS foo (bar INTEGER)", params![]) +//! .unwrap(); +//! +//! (0..10) +//! .map(|i| { +//! let pool = pool.clone(); +//! thread::spawn(move || { +//! let conn = pool.get().unwrap(); +//! conn.execute("INSERT INTO foo (bar) VALUES (?)", &[&i]) +//! .unwrap(); +//! }) +//! }) +//! .collect::>() +//! .into_iter() +//! .map(thread::JoinHandle::join) +//! .collect::>() +//! .unwrap() +//! } +//! ``` +use std::{path::Path, sync::{Mutex, Arc}}; +use crate::{Result, Connection, Error, Config}; + +/// An `r2d2::ManageConnection` for `duckdb::Connection`s. +pub struct DuckdbConnectionManager { + connection: Arc>, +} + +impl DuckdbConnectionManager { + + /// Creates a new `DuckdbConnectionManager` from file. + pub fn file>(path: P) -> Result { + Ok(Self { + connection: Arc::new(Mutex::new(Connection::open(path)?)), + }) + } + /// Creates a new `DuckdbConnectionManager` from file with flags. + pub fn file_with_flags>(path: P, config: Config) -> Result { + Ok(Self { + connection: Arc::new(Mutex::new(Connection::open_with_flags(path, config)?)), + }) + } + + /// Creates a new `DuckdbConnectionManager` from memory. + pub fn memory() -> Result { + Ok(Self { + connection: Arc::new(Mutex::new(Connection::open_in_memory()?)), + }) + } + + /// Creates a new `DuckdbConnectionManager` from memory with flags. + pub fn memory_with_flags(config: Config) -> Result { + Ok(Self { + connection: Arc::new(Mutex::new(Connection::open_in_memory_with_flags(config)?)), + }) + } +} + +impl r2d2::ManageConnection for DuckdbConnectionManager { + type Connection = Connection; + type Error = Error; + + fn connect(&self) -> Result { + let conn = self.connection.lock().unwrap(); + Ok(conn.clone()) + } + + fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + conn.execute_batch("").map_err(Into::into) + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + + +#[cfg(test)] +mod test { + extern crate r2d2; + use super::*; + use crate::Result; + use crate::types::Value; + use std::{sync::mpsc, thread}; + + use tempdir::TempDir; + + + #[test] + fn test_basic() -> Result<()>{ + let manager = DuckdbConnectionManager::file("file.db")?; + let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); + + let (s1, r1) = mpsc::channel(); + let (s2, r2) = mpsc::channel(); + + let pool1 = pool.clone(); + let t1 = thread::spawn(move || { + let conn = pool1.get().unwrap(); + s1.send(()).unwrap(); + r2.recv().unwrap(); + drop(conn); + }); + + let pool2 = pool.clone(); + let t2 = thread::spawn(move || { + let conn = pool2.get().unwrap(); + s2.send(()).unwrap(); + r1.recv().unwrap(); + drop(conn); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + pool.get().unwrap(); + Ok(()) + } + + #[test] + fn test_file() -> Result<()>{ + let manager = DuckdbConnectionManager::file("file.db")?; + let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); + + let (s1, r1) = mpsc::channel(); + let (s2, r2) = mpsc::channel(); + + let pool1 = pool.clone(); + let t1 = thread::spawn(move || { + let conn = pool1.get().unwrap(); + let conn1: &Connection = &*conn; + s1.send(()).unwrap(); + r2.recv().unwrap(); + drop(conn1); + }); + + let pool2 = pool.clone(); + let t2 = thread::spawn(move || { + let conn = pool2.get().unwrap(); + s2.send(()).unwrap(); + r1.recv().unwrap(); + drop(conn); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + pool.get().unwrap(); + Ok(()) + } + + #[test] + fn test_is_valid() -> Result<()>{ + let manager = DuckdbConnectionManager::file("file.db")?; + let pool = r2d2::Pool::builder() + .max_size(1) + .test_on_check_out(true) + .build(manager) + .unwrap(); + + pool.get().unwrap(); + Ok(()) + } + + #[test] + fn test_error_handling() -> Result<()> { + //! We specify a directory as a database. This is bound to fail. + let dir = TempDir::new("r2d2-duckdb").expect("Could not create temporary directory"); + let dirpath = dir.path().to_str().unwrap(); + assert!(DuckdbConnectionManager::file(dirpath).is_err()); + Ok(()) + } + + #[test] + fn test_with_flags() -> Result<()> { + let config = Config::default() + .access_mode(crate::AccessMode::ReadWrite)? + .default_null_order(crate::DefaultNullOrder::NullsLast)? + .default_order(crate::DefaultOrder::Desc)? + .enable_external_access(true)? + .enable_object_cache(false)? + .max_memory("2GB")? + .threads(4)?; + let manager = DuckdbConnectionManager::file_with_flags("file.db", config)?; + let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); + let conn = pool.get().unwrap(); + conn.execute_batch("CREATE TABLE foo(x Text)")?; + + let mut stmt = conn.prepare("INSERT INTO foo(x) VALUES (?)")?; + stmt.execute(&[&"a"])?; + stmt.execute(&[&"b"])?; + stmt.execute(&[&"c"])?; + stmt.execute([Value::Null])?; + + let val: Result>> = conn + .prepare("SELECT x FROM foo ORDER BY x")? + .query_and_then([], |row| row.get(0))? + .collect(); + let val = val?; + let mut iter = val.iter(); + assert_eq!(iter.next().unwrap().as_ref().unwrap(), "c"); + assert_eq!(iter.next().unwrap().as_ref().unwrap(), "b"); + assert_eq!(iter.next().unwrap().as_ref().unwrap(), "a"); + assert!(iter.next().unwrap().is_none()); + assert_eq!(iter.next(), None); + + Ok(()) + } +} \ No newline at end of file From 78f29ee375466651aa8069ae6a0795b9bba3633c Mon Sep 17 00:00:00 2001 From: jaredzhou Date: Wed, 26 Jan 2022 22:29:51 +0800 Subject: [PATCH 2/4] cargo fmt --- src/lib.rs | 4 ++-- src/r2d2.rs | 68 ++++++++++++++++++++++++++--------------------------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 432e1ad4..1deaf5ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,11 +78,11 @@ pub use crate::config::{AccessMode, Config, DefaultNullOrder, DefaultOrder}; pub use crate::error::Error; pub use crate::ffi::ErrorCode; pub use crate::params::{params_from_iter, Params, ParamsFromIter}; +pub use crate::r2d2::DuckdbConnectionManager; pub use crate::row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows}; pub use crate::statement::Statement; pub use crate::transaction::{DropBehavior, Savepoint, Transaction, TransactionBehavior}; pub use crate::types::ToSql; -pub use crate::r2d2::DuckdbConnectionManager; #[macro_use] mod error; @@ -94,11 +94,11 @@ mod config; mod inner_connection; mod params; mod pragma; +mod r2d2; mod raw_statement; mod row; mod statement; mod transaction; -mod r2d2; pub mod types; diff --git a/src/r2d2.rs b/src/r2d2.rs index 97c0175f..ac754e0b 100644 --- a/src/r2d2.rs +++ b/src/r2d2.rs @@ -10,11 +10,11 @@ //! ```rust,no_run //! extern crate r2d2; //! extern crate duckdb; -//! +//! //! //! use std::thread; //! use duckdb::{DuckdbConnectionManager, params}; -//! +//! //! //! fn main() { //! let manager = DuckdbConnectionManager::file("file.db").unwrap(); @@ -40,8 +40,11 @@ //! .unwrap() //! } //! ``` -use std::{path::Path, sync::{Mutex, Arc}}; -use crate::{Result, Connection, Error, Config}; +use crate::{Config, Connection, Error, Result}; +use std::{ + path::Path, + sync::{Arc, Mutex}, +}; /// An `r2d2::ManageConnection` for `duckdb::Connection`s. pub struct DuckdbConnectionManager { @@ -49,7 +52,6 @@ pub struct DuckdbConnectionManager { } impl DuckdbConnectionManager { - /// Creates a new `DuckdbConnectionManager` from file. pub fn file>(path: P) -> Result { Ok(Self { @@ -96,26 +98,24 @@ impl r2d2::ManageConnection for DuckdbConnectionManager { } } - #[cfg(test)] mod test { extern crate r2d2; use super::*; - use crate::Result; use crate::types::Value; + use crate::Result; use std::{sync::mpsc, thread}; use tempdir::TempDir; - #[test] - fn test_basic() -> Result<()>{ + fn test_basic() -> Result<()> { let manager = DuckdbConnectionManager::file("file.db")?; let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - + let (s1, r1) = mpsc::channel(); let (s2, r2) = mpsc::channel(); - + let pool1 = pool.clone(); let t1 = thread::spawn(move || { let conn = pool1.get().unwrap(); @@ -123,7 +123,7 @@ mod test { r2.recv().unwrap(); drop(conn); }); - + let pool2 = pool.clone(); let t2 = thread::spawn(move || { let conn = pool2.get().unwrap(); @@ -131,22 +131,22 @@ mod test { r1.recv().unwrap(); drop(conn); }); - + t1.join().unwrap(); t2.join().unwrap(); - + pool.get().unwrap(); Ok(()) } - + #[test] - fn test_file() -> Result<()>{ + fn test_file() -> Result<()> { let manager = DuckdbConnectionManager::file("file.db")?; let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - + let (s1, r1) = mpsc::channel(); let (s2, r2) = mpsc::channel(); - + let pool1 = pool.clone(); let t1 = thread::spawn(move || { let conn = pool1.get().unwrap(); @@ -155,7 +155,7 @@ mod test { r2.recv().unwrap(); drop(conn1); }); - + let pool2 = pool.clone(); let t2 = thread::spawn(move || { let conn = pool2.get().unwrap(); @@ -163,27 +163,27 @@ mod test { r1.recv().unwrap(); drop(conn); }); - + t1.join().unwrap(); t2.join().unwrap(); - + pool.get().unwrap(); Ok(()) } - + #[test] - fn test_is_valid() -> Result<()>{ + fn test_is_valid() -> Result<()> { let manager = DuckdbConnectionManager::file("file.db")?; let pool = r2d2::Pool::builder() .max_size(1) .test_on_check_out(true) .build(manager) .unwrap(); - + pool.get().unwrap(); Ok(()) } - + #[test] fn test_error_handling() -> Result<()> { //! We specify a directory as a database. This is bound to fail. @@ -192,17 +192,17 @@ mod test { assert!(DuckdbConnectionManager::file(dirpath).is_err()); Ok(()) } - + #[test] fn test_with_flags() -> Result<()> { let config = Config::default() - .access_mode(crate::AccessMode::ReadWrite)? - .default_null_order(crate::DefaultNullOrder::NullsLast)? - .default_order(crate::DefaultOrder::Desc)? - .enable_external_access(true)? - .enable_object_cache(false)? - .max_memory("2GB")? - .threads(4)?; + .access_mode(crate::AccessMode::ReadWrite)? + .default_null_order(crate::DefaultNullOrder::NullsLast)? + .default_order(crate::DefaultOrder::Desc)? + .enable_external_access(true)? + .enable_object_cache(false)? + .max_memory("2GB")? + .threads(4)?; let manager = DuckdbConnectionManager::file_with_flags("file.db", config)?; let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); let conn = pool.get().unwrap(); @@ -228,4 +228,4 @@ mod test { Ok(()) } -} \ No newline at end of file +} From 04d9bb24742b60a8ca63a9273b549eb5e20df579 Mon Sep 17 00:00:00 2001 From: jaredzhou Date: Thu, 27 Jan 2022 10:00:38 +0800 Subject: [PATCH 3/4] remove clippy error --- src/r2d2.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/r2d2.rs b/src/r2d2.rs index ac754e0b..0f747246 100644 --- a/src/r2d2.rs +++ b/src/r2d2.rs @@ -150,10 +150,9 @@ mod test { let pool1 = pool.clone(); let t1 = thread::spawn(move || { let conn = pool1.get().unwrap(); - let conn1: &Connection = &*conn; s1.send(()).unwrap(); r2.recv().unwrap(); - drop(conn1); + drop(conn); }); let pool2 = pool.clone(); From 6687923163acac2ccd1e80b9b9b867d9b78ceaec Mon Sep 17 00:00:00 2001 From: jaredzhou Date: Thu, 27 Jan 2022 10:25:22 +0800 Subject: [PATCH 4/4] make r2d2 as a feature --- Cargo.toml | 3 ++- src/lib.rs | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 678e1a26..a1e7e0b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ modern-full = [ "chrono", "serde_json", "url", + "r2d2", ] [dependencies] @@ -48,7 +49,7 @@ cast = { version = "0.3", features = ["std"] } arrow = { version = "6.5.0", default-features = false, features = ["prettyprint"] } rust_decimal = "1.14" strum = { version = "0.23", features = ["derive"] } -r2d2 = "0.8.9" +r2d2 = { version = "0.8.9", optional = true } [dev-dependencies] doc-comment = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 1deaf5ab..c389a715 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ pub use crate::config::{AccessMode, Config, DefaultNullOrder, DefaultOrder}; pub use crate::error::Error; pub use crate::ffi::ErrorCode; pub use crate::params::{params_from_iter, Params, ParamsFromIter}; +#[cfg(feature = "r2d2")] pub use crate::r2d2::DuckdbConnectionManager; pub use crate::row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows}; pub use crate::statement::Statement; @@ -94,6 +95,7 @@ mod config; mod inner_connection; mod params; mod pragma; +#[cfg(feature = "r2d2")] mod r2d2; mod raw_statement; mod row;