diff --git a/crates/duckdb/src/arrow_batch.rs b/crates/duckdb/src/arrow_batch.rs index 15892f90..a2d4cba6 100644 --- a/crates/duckdb/src/arrow_batch.rs +++ b/crates/duckdb/src/arrow_batch.rs @@ -3,7 +3,7 @@ use super::{ Statement, }; -/// An handle for the resulting RecordBatch of a query. +/// A handle for the resulting RecordBatch of a query. #[must_use = "Arrow is lazy and will do nothing unless consumed"] pub struct Arrow<'stmt> { pub(crate) stmt: Option<&'stmt Statement<'stmt>>, @@ -29,3 +29,34 @@ impl<'stmt> Iterator for Arrow<'stmt> { Some(RecordBatch::from(&self.stmt?.step()?)) } } + +/// A handle for the resulting RecordBatch of a query in streaming +#[must_use = "Arrow stream is lazy and will not fetch data unless consumed"] +pub struct ArrowStream<'stmt> { + pub(crate) stmt: Option<&'stmt Statement<'stmt>>, + pub(crate) schema: SchemaRef, +} + +impl<'stmt> ArrowStream<'stmt> { + #[inline] + pub(crate) fn new(stmt: &'stmt Statement<'stmt>, schema: SchemaRef) -> ArrowStream<'stmt> { + ArrowStream { + stmt: Some(stmt), + schema, + } + } + + /// return arrow schema + #[inline] + pub fn get_schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl<'stmt> Iterator for ArrowStream<'stmt> { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + Some(RecordBatch::from(&self.stmt?.stream_step(self.get_schema())?)) + } +} diff --git a/crates/duckdb/src/lib.rs b/crates/duckdb/src/lib.rs index 6bc59316..d8caf81d 100644 --- a/crates/duckdb/src/lib.rs +++ b/crates/duckdb/src/lib.rs @@ -73,7 +73,7 @@ pub use crate::r2d2::DuckdbConnectionManager; pub use crate::{ appender::Appender, appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter}, - arrow_batch::Arrow, + arrow_batch::{Arrow, ArrowStream}, cache::CachedStatement, column::Column, config::{AccessMode, Config, DefaultNullOrder, DefaultOrder}, diff --git a/crates/duckdb/src/raw_statement.rs b/crates/duckdb/src/raw_statement.rs index a99f8022..280fa158 100644 --- a/crates/duckdb/src/raw_statement.rs +++ b/crates/duckdb/src/raw_statement.rs @@ -1,4 +1,4 @@ -use std::{ffi::CStr, ptr, rc::Rc, sync::Arc}; +use std::{ffi::CStr, ops::Deref, ptr, rc::Rc, sync::Arc}; use arrow::{ array::StructArray, @@ -9,7 +9,7 @@ use arrow::{ use super::{ffi, Result}; #[cfg(feature = "polars")] use crate::arrow2; -use crate::error::result_from_duckdb_arrow; +use crate::{error::result_from_duckdb_arrow, Error}; // Private newtype for raw sqlite3_stmts that finalize themselves when dropped. // TODO: destroy statement and result @@ -17,6 +17,7 @@ use crate::error::result_from_duckdb_arrow; pub struct RawStatement { ptr: ffi::duckdb_prepared_statement, result: Option, + duckdb_result: Option, schema: Option, // Cached SQL (trimmed) that we use as the key when we're in the statement // cache. This is None for statements which didn't come from the statement @@ -38,6 +39,7 @@ impl RawStatement { ptr: stmt, result: None, schema: None, + duckdb_result: None, statement_cache_key: None, } } @@ -110,6 +112,39 @@ impl RawStatement { } } + #[inline] + pub fn streaming_step(&self, schema: SchemaRef) -> Option { + if let Some(result) = self.duckdb_result { + unsafe { + let mut out = ffi::duckdb_stream_fetch_chunk(result); + + if out.is_null() { + return None; + } + + let mut arrays = FFI_ArrowArray::empty(); + ffi::duckdb_result_arrow_array( + result, + out, + &mut std::ptr::addr_of_mut!(arrays) as *mut _ as *mut ffi::duckdb_arrow_array, + ); + + ffi::duckdb_destroy_data_chunk(&mut out); + + if arrays.is_empty() { + return None; + } + + let schema = FFI_ArrowSchema::try_from(schema.deref()).ok()?; + let array_data = from_ffi(arrays, &schema).expect("ok"); + let struct_array = StructArray::from(array_data); + return Some(struct_array); + } + } + + None + } + #[cfg(feature = "polars")] #[inline] pub fn step2(&self) -> Option { @@ -242,6 +277,22 @@ impl RawStatement { } } + pub fn execute_streaming(&mut self) -> Result<()> { + self.reset_result(); + unsafe { + let mut out: ffi::duckdb_result = std::mem::zeroed(); + + let rc = ffi::duckdb_execute_prepared_streaming(self.ptr, &mut out); + if rc != ffi::DuckDBSuccess { + return Err(Error::DuckDBFailure(ffi::Error::new(rc), None)); + } + + self.duckdb_result = Some(out); + + Ok(()) + } + } + #[inline] pub fn reset_result(&mut self) { self.schema = None; @@ -251,6 +302,12 @@ impl RawStatement { } self.result = None; } + if let Some(mut result) = self.duckdb_result { + unsafe { + ffi::duckdb_destroy_result(&mut result); + } + self.duckdb_result = None; + } } #[inline] diff --git a/crates/duckdb/src/statement.rs b/crates/duckdb/src/statement.rs index 1b0ceca8..69570d78 100644 --- a/crates/duckdb/src/statement.rs +++ b/crates/duckdb/src/statement.rs @@ -6,7 +6,7 @@ use super::{ffi, AndThenRows, Connection, Error, MappedRows, Params, RawStatemen #[cfg(feature = "polars")] use crate::{arrow2, polars_dataframe::Polars}; use crate::{ - arrow_batch::Arrow, + arrow_batch::{Arrow, ArrowStream}, error::result_from_duckdb_prepare, types::{TimeUnit, ToSql, ToSqlOutput}, }; @@ -109,6 +109,30 @@ impl Statement<'_> { Ok(Arrow::new(self)) } + /// Execute the prepared statement, returning a handle to the resulting + /// vector of arrow RecordBatch in streaming way + /// + /// ## Example + /// + /// ```rust,no_run + /// # use duckdb::{Result, Connection}; + /// # use arrow::record_batch::RecordBatch; + /// # use arrow::datatypes::SchemaRef; + /// fn get_arrow_data(conn: &Connection, schema: SchemaRef) -> Result> { + /// Ok(conn.prepare("SELECT * FROM test")?.stream_arrow([], schema)?.collect()) + /// } + /// ``` + /// + /// # Failure + /// + /// Will return `Err` if binding parameters fails. + #[inline] + pub fn stream_arrow(&mut self, params: P, schema: SchemaRef) -> Result> { + params.__bind_in(self)?; + self.stmt.execute_streaming()?; + Ok(ArrowStream::new(self, schema)) + } + /// Execute the prepared statement, returning a handle to the resulting /// vector of polars DataFrame. /// @@ -337,6 +361,12 @@ impl Statement<'_> { self.stmt.step() } + /// Get next batch records in arrow-rs in a streaming way + #[inline] + pub fn stream_step(&self, schema: SchemaRef) -> Option { + self.stmt.streaming_step(schema) + } + #[cfg(feature = "polars")] /// Get next batch records in arrow2 #[inline]