Skip to content

Commit

Permalink
upgrade delta-kernel-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickJin-db committed Nov 27, 2024
1 parent 4def0f6 commit 4d4ff68
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
6 changes: 3 additions & 3 deletions python/delta-kernel-rust-sharing-wrapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ name = "delta_kernel_rust_sharing_wrapper"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "^52.0", features = ["pyarrow"] }
delta_kernel = {version = "^0.2", features = ["cloud", "default", "default-engine"]}
arrow = { version = "53.3.0", features = ["pyarrow"] }
delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]}
openssl = { version = "0.10", features = ["vendored"] }
url = "2"

[dependencies.pyo3]
version = "0.21.0"
version = "0.22.4"
# "abi3-py38" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.8
features = ["abi3-py38"]
23 changes: 9 additions & 14 deletions python/delta-kernel-rust-sharing-wrapper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,27 @@ impl Scan {
delta_kernel::Error::Generic(format!("Could not get result schema: {e}"))
})?);
let results = self.0.execute(engine_interface.0.as_ref())?;
let record_batch_iter = RecordBatchIterator::new(
results.into_iter().map(|res| {
let data = res
.raw_data
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batches: Vec<_> = results
.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) =
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| {
ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string())
})?
.into();
if let Some(mut mask) = res.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
}),
result_schema,
);
})
.collect();
let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema);
Ok(PyArrowType(Box::new(record_batch_iter)))
}
}
Expand Down

0 comments on commit 4d4ff68

Please sign in to comment.