From 0bd2dc8d58be1f1081fd57b7fa7c7aba73f7abd9 Mon Sep 17 00:00:00 2001 From: Wang Fenjin Date: Thu, 20 Jul 2023 13:58:48 +0800 Subject: [PATCH] add append_record_batch (#194) * add append_record_batch * reset default features * add to_duckdb_type_id * refactor code --------- Co-authored-by: simon --- Cargo.toml | 3 +- README.md | 2 +- src/appender/arrow.rs | 82 ++++++++++++++++++++++++++++ src/{appender.rs => appender/mod.rs} | 3 + src/error.rs | 9 +++ src/vtab/arrow.rs | 2 + src/vtab/data_chunk.rs | 5 ++ src/vtab/mod.rs | 2 +- 8 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 src/appender/arrow.rs rename src/{appender.rs => appender/mod.rs} (99%) diff --git a/Cargo.toml b/Cargo.toml index b6198caf..18287978 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,8 @@ vtab = [] vtab-loadable = ["vtab", "duckdb-loadable-macros"] vtab-excel = ["vtab", "calamine"] vtab-arrow = ["vtab", "num"] -vtab-full = ["vtab-excel", "vtab-arrow"] +appender-arrow = ["vtab-arrow"] +vtab-full = ["vtab-excel", "vtab-arrow", "appender-arrow"] extensions-full = ["httpfs", "json", "parquet", "vtab-full"] buildtime_bindgen = ["libduckdb-sys/buildtime_bindgen"] modern-full = ["chrono", "serde_json", "url", "r2d2", "uuid", "polars"] diff --git a/README.md b/README.md index f0d81457..57eb022a 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ See to [Contributing.md](CONTRIBUTING.md) ### Checklist -- Run `cargo fmt` to ensure your Rust code is correctly formatted. +- Run `cargo +nightly fmt` to ensure your Rust code is correctly formatted. - Run `cargo clippy --fix --allow-dirty --all-targets --workspace --all-features -- -D warnings` to fix all clippy issues. - Ensure `cargo test --all-targets --workspace --features "modern-full extensions-full"` reports no failures. diff --git a/src/appender/arrow.rs b/src/appender/arrow.rs new file mode 100644 index 00000000..8aa96a2a --- /dev/null +++ b/src/appender/arrow.rs @@ -0,0 +1,82 @@ +use super::{ffi, Appender, Result}; +use crate::{ + error::result_from_duckdb_appender, + vtab::{record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, DataChunk, LogicalType}, + Error, +}; +use arrow::record_batch::RecordBatch; +use ffi::duckdb_append_data_chunk; + +impl Appender<'_> { + /// Append one record_batch + /// + /// ## Example + /// + /// ```rust,no_run + /// # use duckdb::{Connection, Result, params}; + /// use arrow::record_batch::RecordBatch; + /// fn insert_record_batch(conn: &Connection,record_batch:RecordBatch) -> Result<()> { + /// let mut app = conn.appender("foo")?; + /// app.append_record_batch(record_batch)?; + /// Ok(()) + /// } + /// ``` + /// + /// # Failure + /// + /// Will return `Err` if append column count not the same with the table schema + #[inline] + pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> { + let schema = record_batch.schema(); + let mut logical_type: Vec = vec![]; + for field in schema.fields() { + let logical_t = to_duckdb_logical_type(field.data_type()) + .map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?; + logical_type.push(logical_t); + } + + let mut data_chunk = DataChunk::new(&logical_type); + record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?; + + let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) }; + result_from_duckdb_appender(rc, self.app) + } +} + +#[cfg(test)] +mod test { + use crate::{Connection, Result}; + use arrow::{ + array::{Int8Array, StringArray}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }; + use std::sync::Arc; + + #[test] + fn test_append_record_batch() -> Result<()> { + let db = Connection::open_in_memory()?; + db.execute_batch("CREATE TABLE foo(id TINYINT not null,area TINYINT not null,name Varchar)")?; + { + let id_array = Int8Array::from(vec![1, 2, 3, 4, 5]); + let area_array = Int8Array::from(vec![11, 22, 33, 44, 55]); + let name_array = StringArray::from(vec![Some("11"), None, None, Some("44"), None]); + let schema = Schema::new(vec![ + Field::new("id", DataType::Int8, true), + Field::new("area", DataType::Int8, true), + Field::new("area", DataType::Utf8, true), + ]); + let record_batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(id_array), Arc::new(area_array), Arc::new(name_array)], + ) + .unwrap(); + let mut app = db.appender("foo")?; + app.append_record_batch(record_batch)?; + } + let mut stmt = db.prepare("SELECT id, area,name FROM foo")?; + let rbs: Vec = stmt.query_arrow([])?.collect(); + assert_eq!(rbs.iter().map(|op| op.num_rows()).sum::(), 5); + Ok(()) + } +} diff --git a/src/appender.rs b/src/appender/mod.rs similarity index 99% rename from src/appender.rs rename to src/appender/mod.rs index c704675e..195c2d8b 100644 --- a/src/appender.rs +++ b/src/appender/mod.rs @@ -13,6 +13,9 @@ pub struct Appender<'conn> { app: ffi::duckdb_appender, } +#[cfg(feature = "appender-arrow")] +mod arrow; + impl Appender<'_> { /// Append multiple rows from Iterator /// diff --git a/src/error.rs b/src/error.rs index 5f38d16c..2e6901c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,3 +1,5 @@ +use arrow::datatypes::DataType; + use super::Result; use crate::{ ffi, @@ -58,6 +60,9 @@ pub enum Error { /// Rust type. InvalidColumnType(usize, String, Type), + /// Error when datatype to duckdb type + ArrowTypeToDuckdbType(String, DataType), + /// Error when a query that was expected to insert one row did not insert /// any or insert many. StatementChangedRows(usize), @@ -170,6 +175,9 @@ impl fmt::Display for Error { Error::InvalidColumnType(i, ref name, ref t) => { write!(f, "Invalid column type {t} at index: {i}, name: {name}") } + Error::ArrowTypeToDuckdbType(ref name, ref t) => { + write!(f, "Invalid column type {t} , name: {name}") + } Error::InvalidParameterCount(i1, n1) => { write!(f, "Wrong number of parameters passed to query. Got {i1}, needed {n1}") } @@ -201,6 +209,7 @@ impl error::Error for Error { | Error::StatementChangedRows(_) | Error::InvalidQuery | Error::AppendError + | Error::ArrowTypeToDuckdbType(..) | Error::MultipleStatement => None, Error::FromSqlConversionFailure(_, _, ref err) | Error::ToSqlConversionFailure(ref err) => Some(&**err), } diff --git a/src/vtab/arrow.rs b/src/vtab/arrow.rs index 91ede585..277a85d2 100644 --- a/src/vtab/arrow.rs +++ b/src/vtab/arrow.rs @@ -119,6 +119,7 @@ impl VTab for ArrowVTab { } } +/// Convert arrow DataType to duckdb type id pub fn to_duckdb_type_id(data_type: &DataType) -> Result> { use LogicalTypeId::*; @@ -160,6 +161,7 @@ pub fn to_duckdb_type_id(data_type: &DataType) -> Result Result> { if data_type.is_primitive() || matches!( diff --git a/src/vtab/data_chunk.rs b/src/vtab/data_chunk.rs index 239ca4eb..6e472773 100644 --- a/src/vtab/data_chunk.rs +++ b/src/vtab/data_chunk.rs @@ -59,6 +59,11 @@ impl DataChunk { pub fn num_columns(&self) -> usize { unsafe { duckdb_data_chunk_get_column_count(self.ptr) as usize } } + + /// Get the ptr of duckdb_data_chunk in this [DataChunk]. + pub fn get_ptr(&self) -> duckdb_data_chunk { + self.ptr + } } impl From for DataChunk { diff --git a/src/vtab/mod.rs b/src/vtab/mod.rs index f4d3fb84..0c605b3a 100644 --- a/src/vtab/mod.rs +++ b/src/vtab/mod.rs @@ -14,7 +14,7 @@ mod arrow; #[cfg(feature = "vtab-arrow")] pub use self::arrow::{ arrow_arraydata_to_query_params, arrow_ffi_to_query_params, arrow_recordbatch_to_query_params, - record_batch_to_duckdb_data_chunk, + record_batch_to_duckdb_data_chunk, to_duckdb_logical_type, to_duckdb_type_id, }; #[cfg(feature = "vtab-excel")] mod excel;