-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[LI] Fix SPARK ORC projection read for deeply nested union schema and… #154
[LI] Fix SPARK ORC projection read for deeply nested union schema and… #154
Conversation
… out-of-order projection fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Minor comments non blocking
@Override | ||
public Optional<NestedField> union(TypeDescription union, List<Optional<NestedField>> options) { | ||
union.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); | ||
List<Optional<NestedField>> optionsCopy = new ArrayList<>(options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
more efficient to create an immutable copy of the list directly
List<Optional<NestedField>> optionsCopy = new ArrayList<>(options); | |
List<Optional<NestedField>> optionsCopy = List.copyOf(options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying the guava api? guava doesn't have this api i just check, it only has Lists.newArrayList()
. which is the same as new arraylist
i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and, in fact, the next line, i need to insert an element at the start of the List, so it cannot be immutable.
this.readers = new OrcValueReader[readers.size()]; | ||
for (int i = 0; i < this.readers.length; i += 1) { | ||
this.readers[i] = readers.get(i); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: simplify
this.readers = new OrcValueReader[readers.size()]; | |
for (int i = 0; i < this.readers.length; i += 1) { | |
this.readers[i] = readers.get(i); | |
} | |
this.readers = readers.toArray(new OrcValueReader[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can change it to
this.readers = readers.toArray(new OrcValueReader[readers.size()]);
@@ -525,6 +531,87 @@ public void testDeepNestedSingleTypeUnion() throws IOException { | |||
} | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my learning:
this new test case seems to cover Issue 2 in the PR, do we also need additional unit test for testing single lvl union with partial projection for Issue 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's previous unit test already testing the single lv union.
Optional<Types.NestedField> icebergSchema = OrcToIcebergVisitor.visit(schema, schemaConverter); | ||
schemaConverter.afterField("field" + typeIndex, schema); | ||
options.add(visit(icebergSchema.get().type(), schema, visitor)); | ||
OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitorWithPseudoId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can OrcToIcebergVisitorWithPseudoId be merged with OrcToIcebergVisitor? or do we create a new class for a reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OrcToIcebergVisitor
is an existing native iceberg visitor. What we are doing here is linkedin specific (at least for now), to transform the orc type back to iceberg type which the orc type can contain unions. All these behavior and logic are not standard in native iceberg and we should use a separate linkedin specific subclass for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
… out-of-order projection fields
This patch fixes multiple issues currently in the Spark ORC union type projection read code #150
Issue 1:
Given an ORC schema of
struct<unionCol:uniontype<int,string>>
, the converted iceberg schema is to bestruct<0: tag: required int, 1: field0: optional int, 2: field1: optional string>
(field id not important here).So, for a fully-supported projection feature, user should be arbitrarily permutate and partially project a subset of the [tag, field0, field1].
e.g. users can project:
The current code exhibits multiple bugs when projecting a subset fields (e.g. projecting field0 works but field1 fails), projecting out-of-order fields (e.g. [field1, field0]).
Issue 2:
Given a deeply nested ORC schema with at least 2 levels of
uniontype
, e.g.struct<c1:uniontype<int,struct<c2:string,c3:uniontype<int,string>>>>
the current code completely fails with NPE.We should expect the user to do full/partial/out-of-order projections on 2/arbitrary levels of unions without any issue.
Solution
This PR holistically fixes the above 2 issues by changing the
OrcSchemaWithTypeVisitor
andSparkOrcValueReaders
, to make the first one always return a fully projected reader options to the reader code, so that the reader can determine from the expected schema, which specific reader's value to return (by juding thetag
value), theOrcSchemaWithTypeVisitor
implements its magic via a new visitorOrcToIcebergVisitorWithPseudoId
, which can convert a orc schema with uniontype to iceberg, without preexistingiceberg-id
metadata in the orc schema.