Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table changes constructor #505

Merged

Conversation

OussamaSaoudi-db
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db commented Nov 18, 2024

What changes are proposed in this pull request?

This PR introduces the TableChanges struct which represents a Change Data Feed scan. TableChanges is constructed from a Table, and performs 2 protocol and metadata scans.

  1. The first is a P&M scan from start version, and ensures that CDF is enabled at the beginning version.
  2. The second P&M scan is for the end version. This one is used to extract the schema at the end version and ensure that the final version has CDF enabled.

I also add the logic for converting the end version's schema into the cdf schema.

Note that the behaviour to fail early in table changes constructor aligns with spark's behaviour. Only the CDF range is returned in spark's error. No specific commit version that causes the failure is provided.

How was this change tested?

  • Ensure that TableChanges::try_new checks the start and end version
  • Ensure that the schema at the start and end versions are the same
  • Ensure that the table_changes.schema() method returns the CDF schema.

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Nov 18, 2024
@OussamaSaoudi-db OussamaSaoudi-db marked this pull request as ready for review November 19, 2024 00:11
Copy link

codecov bot commented Nov 19, 2024

Codecov Report

Attention: Patch coverage is 87.05036% with 18 lines in your changes missing coverage. Please review.

Project coverage is 80.31%. Comparing base (e450c05) to head (96a4b71).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/table_changes/mod.rs 86.06% 12 Missing and 5 partials ⚠️
ffi/src/lib.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #505      +/-   ##
==========================================
+ Coverage   80.24%   80.31%   +0.06%     
==========================================
  Files          61       62       +1     
  Lines       13402    13541     +139     
  Branches    13402    13541     +139     
==========================================
+ Hits        10755    10876     +121     
- Misses       2093     2106      +13     
- Partials      554      559       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

pub fn table_changes(
&self,
engine: &dyn Engine,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not impl Into<Version>?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't considered that. Tho this change seems to cause issues:

154 |         let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
    |                                       -------------                  ^ the trait `From<i32>` is not implemented for `u64`, which is required by `{integer}: Into<u64>

Integers are treated as i32 by default, and i32 can't be converted into u64. By using just start_version: Version, seems that the compiler treats it as a u64 from the get go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what situation would produce an Into<Version> that is not already Version, that we need the fancy arg passing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe my question should actually have been, "why not Option<Version> for end_version"? When do we need the into there? I just erred on the side of flexibility

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh using end_version: impl Into<Option<Version>> lets us pass in either Version or Option<Version> to the function. This is a trick I learned from @scovich
So the following are legal:
table.table_changes(engine.as_ref(), 3, 4);
table.table_changes(engine.as_ref(), Some(3), 4);
table.table_changes(engine.as_ref(), None, 4);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh cool, that's nice. Sorry for the long aside :)

.get(ENABLE_CDF_FLAG)
.is_some_and(|val| val == "true")
};
if !is_cdf_enabled(&start_snapshot) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check at every point along the way, not just start and end.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we could leave that till later. I was hoping to do some checking at this stage since we can catch that error earlier. Should I just remove this check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No extra cost since we needed to get both snapshots anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add comments here for clarity? (e.g. "we check start/end to fail early if needed but at each point yielding CDF batches we do schema compat check" or something like that)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments addressing enable cdf flag, schema, and protocol.

/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`,
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the
/// `end_version`.
pub fn table_changes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to be able to specify:

  1. A schema? Does CDF always return the full table schema?
  2. A predicate? Can't you get a CDF with a predicate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually done in the Scan Builder. The plan is to specify the schema and predicate when building a TableChangesScan.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?

(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)

@OussamaSaoudi-db
Copy link
Collaborator Author

@nicklan @zachschuermann I wanted your opinion on the TableChanges schema. I added the code to generate the CDF schema. I think it makes sense to do it here instead of in the scan builder. This is because the user would likely project the schema from the table, and use that as the scan schema.

let schema = table_changes.schema().project(...);
let scan_builder = scan_builder.with_schema(schema);

@OussamaSaoudi-db
Copy link
Collaborator Author

Also I wanted to flag this: I have the field end_snapshot, but we are really interested in column mapping mode, partition columns. We would additionally need the schema if we decide to create the cdf schema in the scan builder (as opposed to the current implementation).

I'm leaning towards just storing column mapping mode and partition columns directly in TableChanges. Do you foresee that we'll need to store the end version's metadata, protocol, or other fields?

@scovich
Copy link
Collaborator

scovich commented Nov 19, 2024

Also I wanted to flag this: I have the field end_snapshot, but we are really interested in column mapping mode, partition columns. We would additionally need the schema if we decide to create the cdf schema in the scan builder (as opposed to the current implementation).

I'm leaning towards just storing column mapping mode and partition columns directly in TableChanges. Do you foresee that we'll need to store the end version's metadata, protocol, or other fields?

AFAIK, streaming in delta-spark does a complicated dance to process metadata changes in a commit separately from the data changes of that commit, and an incompatible metadata change causes the stream to restart. We should probably use that code as inspiration so we don't reinvent the wheel? (but it's also somewhat messy code due to its organic development over years, so we should probably not copy blindly)

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, mostly fine, just one small thing

pub fn table_changes(
&self,
engine: &dyn Engine,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe my question should actually have been, "why not Option<Version> for end_version"? When do we need the into there? I just erred on the side of flexibility

.get(ENABLE_CDF_FLAG)
.is_some_and(|val| val == "true")
};
if !is_cdf_enabled(&start_snapshot) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should

pub log_segment: LogSegment,
table_root: Url,
end_snapshot: Snapshot,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, we won't have to re-find the start snapshot when we actually go to return results right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't. Henceforth all we are interested in are the commits in the LogSegment, and the schema we got from the end_snapshot.

}
if start_snapshot.schema() != end_snapshot.schema() {
return Err(Error::generic(
"Failed to build TableChanges: Start and end version schemas are different.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put the schemas in the output to help with debugging

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the print logs. How's it look?l

@zachschuermann
Copy link
Collaborator

Also I wanted to flag this: I have the field end_snapshot, but we are really interested in column mapping mode, partition columns. We would additionally need the schema if we decide to create the cdf schema in the scan builder (as opposed to the current implementation).

I'm leaning towards just storing column mapping mode and partition columns directly in TableChanges. Do you foresee that we'll need to store the end version's metadata, protocol, or other fields?

let's do the minimal thing for now and only include the data we need immediately

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(copying from slack thread so we don't lose it)

You mentioned

Note that the behaviour to fail early in table changes constructor aligns with spark's behaviour. Only the CDF range is returned in spark's error. No specific commit version that causes the failure is provided.

can we add that to PR description and/or docs?

generally PR looks good, awesome work! mostly nits and will come back for final review after those are addressed

/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`,
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the
/// `end_version`.
pub fn table_changes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?

kernel/src/table_changes/mod.rs Outdated Show resolved Hide resolved
/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`,
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the
/// `end_version`.
pub fn table_changes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?

(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)

ffi/src/lib.rs Outdated Show resolved Hide resolved
kernel/src/error.rs Outdated Show resolved Hide resolved
self.start_version
}
/// The end version of the `TableChanges`. If no end_version was specified in
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
/// [`TableChanges::try_new`], this returns the newest version as of the call to [`try_new`].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes issues with cargo doc, and I think it's a lot of visual clutter to repeat the full [`TableChanges::try_new`]

kernel/src/table_changes/mod.rs Show resolved Hide resolved
kernel/src/table_changes/mod.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/mod.rs Show resolved Hide resolved
kernel/src/table_changes/mod.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

kernel/src/error.rs Outdated Show resolved Hide resolved
Comment on lines +15 to +17
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: delta-spark does some hand-wringing about table schemas that already provide columns with these names. I think the solution was to block enabling CDF on such tables, and to block creating columns with those names on tables that are already CDF-enabled? (both are writer issues, not reader, but it's prob worth tracking)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #524. Thx!

Comment on lines +65 to +71
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Relating to the optimization we've discussed a few times -- the end snapshot should be able to use the first snapshot as a starting point for listing (and P&M), if the two versions aren't too far apart?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would like that to be the case eventually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a note to #489

.metadata()
.configuration
.get(ENABLE_CDF_FLAG)
.is_some_and(|val| val == "true")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this will simplify once #453 merges?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

} else if !is_cdf_enabled(&end_snapshot) {
return Err(Error::table_changes_disabled(end_snapshot.version()));
}
if start_snapshot.schema() != end_snapshot.schema() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Technically users can stuff whatever metadata they want into the schema fields; should we track an item to ignore unknown metadata entries when comparing schemas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we could do that. I think schema checking needs to be changed a lot anyway to support at least adding columns and changing nullability, so there's more work to do in that department.

Copy link
Collaborator Author

@OussamaSaoudi-db OussamaSaoudi-db Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added link to issue to track schema compatibility checks. #523

.schema()
.fields()
.cloned()
.chain(CDF_FIELDS.clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDF fields are generated columns right? (not read directly from parquet)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. I was planning on making them a special case only in CDF code. If you feel that we can legitimately treat these as generated column, we could add a new column type ColumnType::GeneratedColumn.

ffi/src/lib.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/mod.rs Outdated Show resolved Hide resolved
Comment on lines +65 to +71
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a note to #489

pub fn table_changes(
&self,
engine: &dyn Engine,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh cool, that's nice. Sorry for the long aside :)

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ship ship ship

@OussamaSaoudi-db OussamaSaoudi-db merged commit d146b80 into delta-io:main Nov 22, 2024
20 checks passed
@OussamaSaoudi-db OussamaSaoudi-db deleted the table_changes_constructor_2 branch November 22, 2024 02:37
OussamaSaoudi-db added a commit that referenced this pull request Nov 22, 2024
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md
2. Run `cargo t --all-features --all-targets` to get started testing,
and run `cargo fmt`.
  3. Ensure you have added or run the appropriate tests for your PR.
4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  5. Be sure to keep the PR description updated to reflect all changes.
-->

## What changes are proposed in this pull request?
<!--
Please clarify what changes you are proposing and why the changes are
needed.
The purpose of this section is to outline the changes, why they are
needed, and how this PR fixes the issue.
If the reason for the change is already explained clearly in an issue,
then it does not need to be restated here.
1. If you propose a new API or feature, clarify the use case for a new
API or feature.
  2. If you fix a bug, you can clarify why it is a bug.
-->
This removes files that were accidentally added in prior PRs that were
un-reviewed in #505 and #506.

<!--
Uncomment this section if there are any changes affecting public APIs:
### This PR affects the following public APIs

If there are breaking changes, please ensure the `breaking-changes`
label gets added by CI, and describe why the changes are needed.

Note that _new_ public APIs are not considered breaking.
-->


## How was this change tested?
<!--
Please make sure to add test cases that check the changes thoroughly
including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested, ideally via a reproducible test documented in
the PR description.
-->

Co-authored-by: Zach Schuermann <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants