Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade delta-kernel-rs to 0.5 #607

Conversation

PatrickJin-db
Copy link
Collaborator

@PatrickJin-db PatrickJin-db commented Nov 27, 2024

Existing python tests should be sufficient for correctness. Is any performance testing necessary?

let data = res
.raw_data
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
results.collect::<Vec<_>>().into_iter().map(|res| {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to avoid doing this but ran into lifetime issues without the collect().into_iter(). Though I don't think this should be a performance regression either since the execute returned a Vec in the previous delta-kernel-rs version so it was already copying.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error with just results.map(...)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way we currently do scan::execute is it takes a reference to the engine, and returns an iterator with the same lifetime as the engine. They share lifetimes because you need a reference to the engine for the duration of iteration for all the processing.

One solution is to map, then collect at the very end:

        let result_iter = self.0.execute(engine_interface.0.as_ref())?;
        let record_batches: Vec<_> = result_iter
            .map(|res| {
                let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
                let (mask, data) =
                    scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
                let record_batch: RecordBatch = data
                    .into_any()
                    .downcast::<ArrowEngineData>()
                    .map_err(|_| {
                        ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string())
                    })?
                    .into();
                if let Some(mask) = mask {
                    let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
                    Ok(filtered_batch)
                } else {
                    Ok(record_batch)
                }
            })
            .collect();
        let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd encourage you to submit an issue with delta-kernel-rs on the lifetime issue. I think it'll important to address eventually :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation and suggestion. The exact error was

error: lifetime may not live long enough
   --> src/lib.rs:115:9
    |
87  |         &self,
    |         - let's call the lifetime of this reference `'1`
...
115 |         Ok(PyArrowType(Box::new(record_batch_iter)))
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'static`
    |
help: to declare that the trait object captures data from argument `self`, you can add an explicit `'_` lifetime bound
    |
89  |     ) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send + '_>>> {
    |                                                                     ++++

error: lifetime may not live long enough
   --> src/lib.rs:115:9
    |
88  |         engine_interface: &PythonInterface,
    |                           - let's call the lifetime of this reference `'2`
...
115 |         Ok(PyArrowType(Box::new(record_batch_iter)))
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'2` must outlive `'static`
    |
help: to declare that the trait object captures data from argument `engine_interface`, you can add an explicit `'_` lifetime bound
    |
89  |     ) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send + '_>>> {
    |                                                                     ++++

error: could not compile `delta-kernel-rust-sharing-wrapper` (lib) due to 2 previous errors

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @PatrickJin-db thanks for looping me in! happy to help. just wanted to flag we've just released version 0.5 so let's change to that and also do a minimal amount of debug on that iterator bit

@@ -10,12 +10,12 @@ name = "delta_kernel_rust_sharing_wrapper"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "^52.0", features = ["pyarrow"] }
delta_kernel = {version = "^0.2", features = ["cloud", "default", "default-engine"]}
arrow = { version = ">=53, <54", features = ["pyarrow"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think as a consumer it makes more sense picking a specific version

Suggested change
arrow = { version = ">=53, <54", features = ["pyarrow"] }
arrow = { version = "53.3.0", features = ["pyarrow"] }

arrow = { version = "^52.0", features = ["pyarrow"] }
delta_kernel = {version = "^0.2", features = ["cloud", "default", "default-engine"]}
arrow = { version = ">=53, <54", features = ["pyarrow"] }
delta_kernel = {version = "^0.4", features = ["cloud", "default", "default-engine"]}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also FYI the ^ is implied

Suggested change
delta_kernel = {version = "^0.4", features = ["cloud", "default", "default-engine"]}
delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]}

let data = res
.raw_data
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
results.collect::<Vec<_>>().into_iter().map(|res| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error with just results.map(...)?

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a follow-up to optimize that collect then into_iter()!

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-stamping the last didn't seem to say stamped?

LGTM with a follow-up to optimize that collect then into_iter()!

@PatrickJin-db PatrickJin-db changed the title Upgrade delta-kernel-rs to 0.4 Upgrade delta-kernel-rs to 0.5 Nov 27, 2024
@PatrickJin-db PatrickJin-db merged commit f0fc8cc into delta-io:main Nov 27, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants