From 563e7d92b9a3e76e1bde201c556347a2b42c983e Mon Sep 17 00:00:00 2001 From: Yiqiang Ding Date: Fri, 13 Oct 2023 14:20:49 -0700 Subject: [PATCH] Support to field filtering of complex union type in ORC (#150) * Support to field filtering of complex union type in Orc * using a constant variable instead of literal * fix typos * address comments --------- Co-authored-by: Yiqiang Ding --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 40 ++++++++-- .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 71 ++++++++++++++++- .../iceberg/spark/data/SparkOrcReader.java | 2 +- .../spark/data/SparkOrcValueReaders.java | 52 +++++++++++-- .../vectorized/VectorizedSparkOrcReaders.java | 32 ++++++-- .../spark/data/TestSparkOrcUnions.java | 76 +++++++++++++++++++ 6 files changed, 246 insertions(+), 27 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index db14c1f1c..ccafc8853 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -81,6 +81,8 @@ public TypeDescription type() { public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type"; static final String ICEBERG_FIELD_LENGTH = "iceberg.length"; + public static final String ICEBERG_UNION_TAG_FIELD_NAME = "tag"; + public static final int ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH = 5; private static final ImmutableMultimap TYPE_MAPPING = ImmutableMultimap.builder() .put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN) @@ -381,9 +383,9 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isRequired, Map mapping, OrcField orcField) { - TypeDescription orcType; + if (orcField.type.getChildren().size() == 1) { // single type union - orcType = TypeDescription.createUnion(); + TypeDescription orcType = TypeDescription.createUnion(); TypeDescription childOrcStructType = TypeDescription.createStruct(); for (Types.NestedField nestedField : type.asStructType().fields()) { @@ -399,13 +401,35 @@ private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isReq } orcType.addUnionChild(childOrcStructType); + return orcType; } else { // complex union - orcType = TypeDescription.createUnion(); - List nestedFields = type.asStructType().fields(); - for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) { - TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), - isRequired && nestedField.isRequired(), mapping); - orcType.addUnionChild(childType); + return getOrcSchemaForComplexUnionType(type, isRequired, mapping, orcField); + } + } + + private static TypeDescription getOrcSchemaForComplexUnionType(Type type, boolean isRequired, + Map mapping, + OrcField orcField) { + TypeDescription orcType = TypeDescription.createUnion(); + List nestedFieldsFromIcebergSchema = type.asStructType().fields(); + for (int i = 0; i < orcField.type.getChildren().size(); ++i) { + TypeDescription childOrcType = orcField.type.getChildren().get(i); + boolean typeProjectedInIcebergSchema = false; + for (Types.NestedField nestedField : nestedFieldsFromIcebergSchema) { + // the name of the field in the struct of Iceberg schema is in the pattern of "fieldx" + // where x is an integer index starting from 0 + if (!nestedField.name().equals(ICEBERG_UNION_TAG_FIELD_NAME) && + Integer.parseInt(nestedField.name().substring(ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)) == i) { + // child type is projected in Iceberg schema + TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), + isRequired && nestedField.isRequired(), mapping); + orcType.addUnionChild(childType); + typeProjectedInIcebergSchema = true; + break; + } + } + if (!typeProjectedInIcebergSchema) { + orcType.addUnionChild(childOrcType); } } return orcType; diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 8ce309e6c..85d7f90ec 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -20,13 +20,15 @@ package org.apache.iceberg.orc; import java.util.List; +import java.util.Optional; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; - public abstract class OrcSchemaWithTypeVisitor { + private static final String PSEUDO_ICEBERG_FIELD_ID = "-1"; + public static T visit( org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { return visit(iSchema.asStruct(), schema, visitor); @@ -78,14 +80,75 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito if (types.size() == 1) { // single type union options.add(visit(type, types.get(0), visitor)); } else { // complex union - for (int i = 0; i < types.size(); i += 1) { - options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor)); - } + visitComplexUnion(type, union, visitor, options); } return visitor.union(type, union, options); } + /* + A complex union with multiple types of Orc schema is converted into a struct with multiple fields of Iceberg schema. + Also an extra tag field is added into the struct of Iceberg schema during the conversion. + Given an example of complex union in both Orc and Iceberg: + Orc schema: {"name":"unionCol","type":["int","string"]} + Iceberg schema: struct<0: tag: required int, 1: field0: optional int, 2: field1: optional string> + The fields in the struct of Iceberg schema are expected to be stored in the same order + as the corresponding types in the union of Orc schema. + Except the tag field, the fields in the struct of Iceberg schema are the same as the types in the union of Orc schema + in the general case. In case of field projection, the fields in the struct of Iceberg schema only contains + the fields to be projected which equals to a subset of the types in the union of ORC schema. + Therefore, this function visits the complex union with the consideration of both cases. + Noted that null value and default value for complex union is not a consideration in case of ORC + */ + private void visitComplexUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor, + List options) { + int typeIndex = 0; + int fieldIndexInStruct = 0; + while (typeIndex < union.getChildren().size()) { + TypeDescription schema = union.getChildren().get(typeIndex); + boolean relatedFieldInStructFound = false; + Types.StructType struct = type.asStructType(); + if (fieldIndexInStruct < struct.fields().size() && + ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME + .equals(struct.fields().get(fieldIndexInStruct).name())) { + fieldIndexInStruct++; + } + + if (fieldIndexInStruct < struct.fields().size()) { + String structFieldName = type.asStructType().fields().get(fieldIndexInStruct).name(); + int indexFromStructFieldName = Integer.parseInt(structFieldName + .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); + if (typeIndex == indexFromStructFieldName) { + relatedFieldInStructFound = true; + T option = visit(type.asStructType().fields().get(fieldIndexInStruct).type(), schema, visitor); + options.add(option); + fieldIndexInStruct++; + } + } + if (!relatedFieldInStructFound) { + visitNotProjectedTypeInComplexUnion(schema, visitor, options, typeIndex); + } + typeIndex++; + } + } + + // If a field is not projected, a corresponding field in the struct of Iceberg schema cannot be found + // for current type of union in Orc schema, a reader for current type still needs to be created and + // used to make the reading of Orc file successfully. In this case, a pseudo Iceberg type is converted from + // the Orc schema and is used to create the option for the reader of the current type which still can + // read the corresponding content in Orc file successfully. + private static void visitNotProjectedTypeInComplexUnion(TypeDescription schema, + OrcSchemaWithTypeVisitor visitor, + List options, + int typeIndex) { + OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(); + schemaConverter.beforeField("field" + typeIndex, schema); + schema.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID); + Optional icebergSchema = OrcToIcebergVisitor.visit(schema, schemaConverter); + schemaConverter.afterField("field" + typeIndex, schema); + options.add(visit(icebergSchema.get().type(), schema, visitor)); + } + public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { return null; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 6a428777a..9c2f19399 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -77,7 +77,7 @@ public OrcValueReader record( @Override public OrcValueReader union(Type expected, TypeDescription union, List> options) { - return SparkOrcValueReaders.union(options); + return SparkOrcValueReaders.union(options, expected); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index eaede8262..1d097a95e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -20,12 +20,15 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -71,8 +74,8 @@ static OrcValueReader struct( return new StructReader(readers, struct, idToConstant); } - static OrcValueReader union(List> readers) { - return new UnionReader(readers); + static OrcValueReader union(List> readers, Type expected) { + return new UnionReader(readers, expected); } static OrcValueReader array(OrcValueReader elementReader) { @@ -166,12 +169,40 @@ protected void set(InternalRow struct, int pos, Object value) { static class UnionReader implements OrcValueReader { private final OrcValueReader[] readers; + private final Type expectedIcebergSchema; + private int[] projectedFieldIdsToIdxInReturnedRow; + private boolean isTagFieldProjected; + private int numOfFieldsInReturnedRow; - private UnionReader(List> readers) { + private UnionReader(List> readers, Type expected) { this.readers = new OrcValueReader[readers.size()]; for (int i = 0; i < this.readers.length; i += 1) { this.readers[i] = readers.get(i); } + this.expectedIcebergSchema = expected; + + if (this.readers.length > 1) { + // Creating an integer array to track the mapping between the index of fields to be projected + // and the index of the value for the field stored in the returned row, + // if the value for a field equals to Integer.MIN_VALUE, it means the value of this field should not be stored + // in the returned row + this.projectedFieldIdsToIdxInReturnedRow = new int[readers.size()]; + Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, Integer.MIN_VALUE); + this.numOfFieldsInReturnedRow = 0; + this.isTagFieldProjected = false; + + for (Types.NestedField expectedStructField : expectedIcebergSchema.asStructType().fields()) { + String fieldName = expectedStructField.name(); + if (fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + this.isTagFieldProjected = true; + this.numOfFieldsInReturnedRow++; + continue; + } + int projectedFieldIndex = Integer.valueOf(fieldName + .substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)); + this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] = this.numOfFieldsInReturnedRow++; + } + } } @Override @@ -183,12 +214,17 @@ public Object nonNullRead(ColumnVector vector, int row) { if (readers.length == 1) { return value; } else { - InternalRow struct = new GenericInternalRow(readers.length + 1); - for (int i = 0; i < readers.length; i += 1) { - struct.setNullAt(i + 1); + InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow); + for (int i = 0; i < struct.numFields(); i += 1) { + struct.setNullAt(i); + } + if (this.isTagFieldProjected) { + struct.update(0, fieldIndex); + } + + if (this.projectedFieldIdsToIdxInReturnedRow[fieldIndex] != Integer.MIN_VALUE) { + struct.update(this.projectedFieldIdsToIdxInReturnedRow[fieldIndex], value); } - struct.update(0, fieldIndex); - struct.update(fieldIndex + 1, value); return struct; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 26cc731cb..9e4526f80 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -25,6 +25,7 @@ import java.util.stream.IntStream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcBatchReader; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueReader; @@ -434,10 +435,19 @@ public ColumnVector getChild(int ordinal) { private static class UnionConverter implements Converter { private final Type type; private final List optionConverters; + private boolean isTagFieldProjected; private UnionConverter(Type type, List optionConverters) { this.type = type; this.optionConverters = optionConverters; + if (optionConverters.size() > 1) { + for (Types.NestedField field : type.asStructType().fields()) { + if (field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + this.isTagFieldProjected = true; + break; + } + } + } } @Override @@ -449,13 +459,23 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v List fields = type.asStructType().fields(); List fieldVectors = Lists.newArrayListWithExpectedSize(fields.size()); - LongColumnVector longColumnVector = new LongColumnVector(); - longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray(); + // Adding ColumnVector for tag field into fieldVectors when the tag field is projected in Iceberg schema + if (isTagFieldProjected) { + LongColumnVector longColumnVector = new LongColumnVector(); + longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray(); - fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector, - OrcValueReaders.ints(), batchOffsetInFile)); - for (int i = 0; i < fields.size() - 1; i += 1) { - fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile)); + fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector, + OrcValueReaders.ints(), batchOffsetInFile)); + } + + // Adding ColumnVector for each field projected in Iceberg schema into fieldVectors + for (int i = 0; i < fields.size(); ++i) { + Types.NestedField field = fields.get(i); + if (!field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) { + int typeIdx = Integer.parseInt(field.name().substring(5)); + fieldVectors.add(optionConverters.get(typeIdx) + .convert(unionColumnVector.fields[typeIdx], batchSize, batchOffsetInFile)); + } } return new BaseOrcColumnVector(type.asStructType(), batchSize, vector) { diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java index 93e95750e..1cbc4f67f 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java @@ -143,6 +143,82 @@ public void testComplexUnion() throws IOException { } } + @Test + public void testComplexUnionWithColumnProjection() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString("struct>"); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "unionCol", Types.StructType.of( + Types.NestedField.optional(1, "field0", Types.IntegerType.get()))) + ); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow field1 = new GenericInternalRow(1); + field1.update(0, 0); + expectedFirstRow.update(0, field1); + + final InternalRow expectedSecondRow = new GenericInternalRow(1); + final InternalRow field2 = new GenericInternalRow(1); + field2.update(0, null); + expectedSecondRow.update(0, field2); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); + BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector); + + complexUnion.init(); + + for (int i = 0; i < NUM_OF_ROWS; i += 1) { + complexUnion.tags[i] = i % 2; + longColumnVector.vector[i] = i; + String stringValue = "foo-" + i; + bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8)); + } + + batch.size = NUM_OF_ROWS; + batch.cols[0] = complexUnion; + + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + + // Test non-vectorized reader + List actualRows = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(actualRows::add); + + Assert.assertEquals(actualRows.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, actualRows.get(0)); + assertEquals(expectedSchema, expectedSecondRow, actualRows.get(1)); + } + + // Test vectorized reader + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + final Iterator actualRowsIt = batchesToRows(reader.iterator()); + + assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); + assertEquals(expectedSchema, expectedSecondRow, actualRowsIt.next()); + } + } + @Test public void testDeeplyNestedUnion() throws IOException { TypeDescription orcSchema =