diff --git a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml index 5eb7948e2..ae48fd0fb 100644 --- a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml +++ b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml @@ -10,12 +10,12 @@ name = "delta_kernel_rust_sharing_wrapper" crate-type = ["cdylib"] [dependencies] -arrow = { version = "^52.0", features = ["pyarrow"] } -delta_kernel = {version = "^0.2", features = ["cloud", "default", "default-engine"]} +arrow = { version = "53.3.0", features = ["pyarrow"] } +delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]} openssl = { version = "0.10", features = ["vendored"] } url = "2" [dependencies.pyo3] -version = "0.21.0" +version = "0.22.4" # "abi3-py38" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.8 features = ["abi3-py38"] diff --git a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs index 7a8c3b11f..94a8dd23e 100644 --- a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs +++ b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs @@ -92,11 +92,11 @@ impl Scan { delta_kernel::Error::Generic(format!("Could not get result schema: {e}")) })?); let results = self.0.execute(engine_interface.0.as_ref())?; - let record_batch_iter = RecordBatchIterator::new( - results.into_iter().map(|res| { - let data = res - .raw_data - .map_err(|e| ArrowError::from_external_error(Box::new(e)))?; + let record_batches: Vec<_> = results + .map(|res| { + let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); + let (mask, data) = + scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; let record_batch: RecordBatch = data .into_any() .downcast::() @@ -104,20 +104,15 @@ impl Scan { ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()) })? .into(); - if let Some(mut mask) = res.mask { - let extra_rows = record_batch.num_rows() - mask.len(); - if extra_rows > 0 { - // we need to extend the mask here in case it's too short - mask.extend(std::iter::repeat(true).take(extra_rows)); - } + if let Some(mask) = mask { let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; Ok(filtered_batch) } else { Ok(record_batch) } - }), - result_schema, - ); + }) + .collect(); + let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema); Ok(PyArrowType(Box::new(record_batch_iter))) } }