-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add scan planning api request and response models, parsers #11369
base: main
Are you sure you want to change the base?
Conversation
* Cleanup plan table scan response * more response cleanup
Thanks @amogh-jahagirdar for helping clean up the pr, it is greatly appreciated! |
core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java
Outdated
Show resolved
Hide resolved
import com.google.errorprone.annotations.FormatMethod; | ||
|
||
/** Exception raised when an entity is not found. */ | ||
public class EntityNotFoundException extends RESTException implements CleanableFailure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? I remember @rdblue mentioned a common exception for when a resource can't be found, but it doesn't seem like it's really being used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why we added this, I think we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say let's remove this for now since at the moment it's not being used, I think it's something we can add once there's clarity on how it would get used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequestParser.java
Outdated
Show resolved
Hide resolved
* Fix partitioned table planning * Fix condition for failing when specs by id is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're missing tests for the parsing of requests.
Edit: NVM, we do have tests for those. The names of the tests were a bit different than I expected. Discussed with @rahil-c who'll make the names more consistent
public static final Map<Integer, PartitionSpec> PARTITION_SPECS_BY_ID = Map.of(0, SPEC); | ||
|
||
public static final DataFile FILE_A = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on #11180 (comment), I'm good if we want to make these public. Would've been nice to keep package private but the original concern was that we were sort of breaking the existing pattern, but as @rahil-c mentioned, SPEC
and SCHEMA
are already public.
@@ -511,7 +511,6 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { | |||
endpoints); | |||
|
|||
trackFileIO(ops); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we undo this change, it's unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this
@nastra @amogh-jahagirdar Have addressed the comments from the main pr: https://github.com/apache/iceberg/pull/11180/files regarding the models onto to this pr. If something looks off or was not addressed please let me know. Also I will do another pass as well. |
import com.google.errorprone.annotations.FormatMethod; | ||
|
||
/** Exception raised when an entity is not found. */ | ||
public class EntityNotFoundException extends RESTException implements CleanableFailure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say let's remove this for now since at the moment it's not being used, I think it's something we can add once there's clarity on how it would get used.
// ignore the ordinal position (ContentFile#pos) of the file in a manifest, | ||
// as it isn't used and BaseFile constructor doesn't support it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need this comment, we can remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amogh-jahagirdar I actually am not the original author of that comment.
This was the pr that added it: b8db3f0. Do you still want me to remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is okay since it is pre-existing. It's also good context since the position isn't in the spec and is used to help streaming readers keep incremental state.
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.rest.RESTRequest; | ||
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see if I can get rid of this, this was for not having getter and setters for the the plan-task
I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not sure if checkStyle has a bug or if im doing something incorrect. It seems that checkStyle is complaining that the private var planTask
does not have an accessor method. However I do have the following in the class
public String planTask() {
return planTask;
}
And it seems to follow the pattern such as https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java
If i remove the @SuppressWarnings("checkstyle:VisibilityModifier")
it will continue to fail the build unfortunately so will keep this for now.
if (request.snapshotId() != null) { | ||
gen.writeNumberField(SNAPSHOT_ID, request.snapshotId()); | ||
} | ||
|
||
if (request.startSnapshotId() != null) { | ||
gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId()); | ||
} | ||
|
||
if (request.endSnapshotId() != null) { | ||
gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second look, the way this is written is a bit confusing after reading again since on the surface it seems like it's possible serializing snapshotId/startingSnapshotId/endSnapshotID even though it's either a point in time scan or an incremental scan. To be clear, I know that validate checks this, and it's not possible for the request to be in a state where all 3 are set, I'm more so talking about how a reader would interpret this code.
concretely I'd recommend replacing this with:
private void serializeSnapshotIdForScan(JsonGenerator gen, request) {
if (request.snapshotId() != null) {
gen.writeNumberField(SNAPSHOT_ID, request.snapshotId());
} else {
gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId());
gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId());
}
}
I think then you'd also be able to get rid of cyclomatic complexity override above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually when looking at this again are we sure this is what we want? When reading the spec again https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L609
If the request does not include either incremental or
point-in-time config properties, scan planning should produce a
point-in-time scan of the latest snapshot in the table's main branch
It seems possible that the client does not need to pass either a snapshotId, or start and end snapshot ids, and that the server can just plan with the latest snapshot id if nothing was provided. I would assume then at serialize time this would be an issue with the following code,
private void serializeSnapshotIdForScan(JsonGenerator gen, request) {
if (request.snapshotId() != null) {
gen.writeNumberField(SNAPSHOT_ID, request.snapshotId());
} else {
gen.writeNumberField(START_SNAPSHOT_ID, request.startSnapshotId());
gen.writeNumberField(END_SNAPSHOT_ID, request.endSnapshotId());
}
}
as it may hit an error if the start and end ids were also null. I think the way it currently works because we the null check around each of the ids. Let me know what you think though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked offline with @amogh-jahagirdar will try amending this to use a else-if
and check to see if START_SNAPSHOT_ID
is set.
import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
import org.apache.iceberg.util.JsonUtil; | ||
|
||
public class RESTFileScanTaskParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second look, shouldn't this be in the REST package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder, I think in my PR to your branch, I forgot to address this part I think it's worth seeing how we can refactor this so that we can actually move RESTFileScanTaskParser
in the REST package...it kind of stands out by not really following existing convention
+ "}"; | ||
|
||
FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); | ||
// Need to make a new response with partitionSpec set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say remove the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove it
// make an unbound json where you expect to not have partitions for the data file, | ||
// delete files as service does not send parition spec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally try and avoid the second person in comments, "create a response where the file scan tasks are unbound" . It's not entirely clear or accurate to say "not have partitions for the data file", it's more so that there's no partitions that are in the deserialized version until it's bound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this
|
||
FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); | ||
// Need to make a new response with partitionSpec set | ||
FetchScanTasksResponse copyResponse = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name copyResponse
doesn't really indicate much imo, I'd say responseWithSpecs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
FetchScanTasksResponse.builder() | ||
.withFileScanTasks(List.of(fileScanTask)) | ||
.withDeleteFiles(List.of(FILE_A_DELETES)) | ||
// assume you have set this already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, we generally don't use second person in comments, let's remove/reword instances of those
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove
+ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]" | ||
+ "}"; | ||
|
||
FetchScanTasksResponse fromResponse = FetchScanTasksResponseParser.fromJson(json); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deserializedResponse
or just response
should suffice for the purpose of this test? fromResponse
doesn't really indicate too much imo,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the naming fromResponse
since the parser is invoking fromJson
. Maybe we can make this as nit
for now?
assertThat(FetchScanTasksResponseParser.toJson(copyResponse, false)) | ||
.isEqualTo(expectedFromJson); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually on second look, I feel like this test reads a bit confusing. Shouldn't this be asserting that the deserialized JSON from line 147 is equal to the unbound JSON on line 152? What are we testing by building a copy with the specs? We already test that we serialize correctly, above so it seems duplicative to test that again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think these are asserting two different things.
The first set of asserts, show that serialization of tasks works correctly when there is a specsByID set, however the expectation is that partition
data should be present.
FetchScanTasksResponse response =
FetchScanTasksResponse.builder()
.withFileScanTasks(List.of(fileScanTask))
.withDeleteFiles(List.of(FILE_A_DELETES))
.withSpecsById(PARTITION_SPECS_BY_ID)
.build();
String expectedToJson =
"{"
+ "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+ "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+ "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}],"
+ "\"file-scan-tasks\":["
+ "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+ "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+ "\"delete-file-references\":[0],"
+ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]"
+ "}";
String json = FetchScanTasksResponseParser.toJson(response, false);
assertThat(json).isEqualTo(expectedToJson);
The second set of asserts, show that deserialization of tasks which will have unbound tasks works correctly, and that partition
data will be empty in this case.
// create a response where the file scan tasks are unbound
String expectedFromJson =
"{"
+ "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+ "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+ "\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1}],"
+ "\"file-scan-tasks\":["
+ "{\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{},"
+ "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+ "\"delete-file-references\":[0],"
+ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}]"
+ "}";
FetchScanTasksResponse deserializedResponse = FetchScanTasksResponseParser.fromJson(json);
FetchScanTasksResponse responseWithSpecs =
FetchScanTasksResponse.builder()
.withPlanTasks(deserializedResponse.planTasks())
.withDeleteFiles(deserializedResponse.deleteFiles())
.withFileScanTasks(deserializedResponse.fileScanTasks())
.withSpecsById(PARTITION_SPECS_BY_ID)
.build();
assertThat(FetchScanTasksResponseParser.toJson(responseWithSpecs, false))
.isEqualTo(expectedFromJson);
} | ||
|
||
@Test | ||
public void roundTripSerdeWithFileScanTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know roundTrip
prefix was probably copied from existing tests, but this protocol doesn't really round trip like other JSON messages due to the bound/unbound specs involved. Maybe just call this serdeWithFileScanTasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this is a good point will remove the roundTrip
naming from all the response parser tests
} | ||
|
||
@Test | ||
public void roundTripSerdeWithInvalidPlanStatusSubmittedWithDeleteFilesNoFileScanTasksPresent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think any of the failure case tests should have the prefix "round trip", nothing is really roundtripping here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this
} | ||
|
||
@Test | ||
public void roundTripSerdeWithEmptyRequestAndDefaultsPresent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, let's double check that we're not unnecessarily prefixing the failure case tests with round trip
since nothing is really round tripping in case of failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix this
@amogh-jahagirdar Was wondering if there is another issues, or if you think we can land this? |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Reviving this pr. |
ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException { | ||
Preconditions.checkArgument(contentFile != null, "Invalid content file: null"); | ||
Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); | ||
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally try to avoid exposing JsonGenerator
or other Jackson classes in public methods. Can this be package-private? See SnapshotParser
for an example of how other parsers handle visibility.
@@ -27,7 +27,7 @@ | |||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | |||
import org.apache.iceberg.util.JsonUtil; | |||
|
|||
class ContentFileParser { | |||
public class ContentFileParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it necessary to make this constructor public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the ContentFileParser
is located within package org.apache.iceberg
as opposed to something like org.apache.iceberg.rest.responses
. The reason this parser is located within this package is that it is able to access private classes such as ContentFile
, GenericDataFile
etc.
However it seems we have placed the TableScanResponseParser
which is what is used by the majority of endpoints in org.apache.iceberg.rest.responses
, and this invokes the ContentFileParser
. When i tried removing the public
from content file parser naturally got issues.
generator.writeEndObject(); | ||
} | ||
|
||
public static ContentFile<?> unboundContentFileFromJson(JsonNode jsonNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be package-private?
@@ -48,6 +48,97 @@ class ContentFileParser { | |||
|
|||
private ContentFileParser() {} | |||
|
|||
public static void unboundContentFileToJson( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using an unbound content file, what about limiting the "unbound" part to just the partition?
Also, don't we use StructLike
for partition that requests the correct Java type when accessing fields? If that's the case then we may be able to take a different approach for partition tuples.
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId()); | ||
} | ||
|
||
generator.writeEndObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this method basically duplicates toJson
below. What is the difference? Can we share the code?
|
||
long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode); | ||
Metrics metrics = metricsFromJson(jsonNode); | ||
ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't match the write side, which uses SingleValueParser
. Can you also use SingleValueParser
here? Or use JsonUtil
above?
} | ||
|
||
String schemaString = SchemaParser.toJson(spec.schema()); | ||
String specString = PartitionSpecParser.toJson(spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea to convert the schema and spec to JSON in every task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point, this would be on the hot path so we may need to pass in the schemaString and specString to the task from the outside rather than serializing those on every bind
|
||
String schemaString = SchemaParser.toJson(spec.schema()); | ||
String specString = PartitionSpecParser.toJson(spec); | ||
ResidualEvaluator boundResidual = ResidualEvaluator.of(spec, filter, caseSensitive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if filter is null?
Opening a new pr which just focues on the scan planning model and parsers based off the original pr here https://github.com/apache/iceberg/pull/11180/files#diff-eeec6df5a574c599e4bab34e926583f9e382ec55bf0519d8fbea382bdd25c6c5
cc @amogh-jahagirdar @rdblue @danielcweeks @jackye1995 @nastra @singhpk234