From 3959284d51204378c7ab02fd0d06cc0a4c5e85d7 Mon Sep 17 00:00:00 2001 From: Yiqiang Ding Date: Thu, 20 Jul 2023 18:55:17 -0700 Subject: [PATCH] Support to field filtering of complex union type in Avro (#149) * Support to field filtering of complex union type in Avro Co-authored-by: Yiqiang Ding --- .../java/org/apache/iceberg/avro/Avro.java | 8 +- .../avro/AvroSchemaWithTypeVisitor.java | 94 +++++++++++- .../AvroWithPartnerByStructureVisitor.java | 23 ++- .../avro/AvroWithTypeByStructureVisitor.java | 68 +++++++++ .../avro/NameMappingWithAvroSchema.java | 130 ++++++++++++++++ .../iceberg/avro/ProjectionDatumReader.java | 17 ++- .../org/apache/iceberg/avro/SchemaToType.java | 3 +- .../avro/TestNameMappingWithAvroSchema.java | 141 ++++++++++++++++++ .../iceberg/spark/data/SparkAvroReader.java | 2 +- .../iceberg/spark/data/SparkValueReaders.java | 72 ++++++--- .../spark/data/TestSparkAvroUnions.java | 111 ++++++++++++++ 11 files changed, 625 insertions(+), 44 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java create mode 100644 core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 5a34eedc8..32d893580 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -385,12 +385,18 @@ public static class ReadBuilder { }; private Long start = null; private Long length = null; + private Schema fileSchema = null; private ReadBuilder(InputFile file) { Preconditions.checkNotNull(file, "Input file cannot be null"); this.file = file; } + public ReadBuilder setFileSchema(Schema fileSchema) { + this.fileSchema = fileSchema; + return this; + } + public ReadBuilder createReaderFunc(Function> readerFunction) { Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc"); this.createReaderFunc = readerFunction; @@ -458,7 +464,7 @@ public AvroIterable build() { } return new AvroIterable<>(file, - new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping), + new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping, fileSchema), start, length, reuseContainers); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index 615748274..9a20fa35a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -22,12 +22,15 @@ import java.util.Deque; import java.util.List; import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedFields; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public abstract class AvroSchemaWithTypeVisitor { + private static final String UNION_TAG_FIELD_NAME = "tag"; + public static T visit(org.apache.iceberg.Schema iSchema, Schema schema, AvroSchemaWithTypeVisitor visitor) { return visit(iSchema.asStruct(), schema, visitor); } @@ -97,17 +100,92 @@ private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit options.add(visit(type, branch, visitor)); } } else { // complex union case - int index = 1; - for (Schema branch : types) { - if (branch.getType() == Schema.Type.NULL) { - options.add(visit((Type) null, branch, visitor)); - } else { - options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor)); - index += 1; + visitComplexUnion(type, union, visitor, options); + } + return visitor.union(type, union, options); + } + + /* + A complex union with multiple types of Avro schema is converted into a struct with multiple fields of Iceberg schema. + Also an extra tag field is added into the struct of Iceberg schema during the conversion. + Given an example of complex union in both Avro and Iceberg: + Avro schema: {"name":"unionCol","type":["int","string"]} + Iceberg schema: struct<0: tag: required int, 1: field0: optional int, 2: field1: optional string> + The fields in the struct of Iceberg schema are expected to be stored in the same order + as the corresponding types in the union of Avro schema. + Except the tag field, the fields in the struct of Iceberg schema are the same as the types in the union of Avro schema + in the general case. In case of field projection, the fields in the struct of Iceberg schema only contains + the fields to be projected which equals to a subset of the types in the union of Avro schema. + Therefore, this function visits the complex union with the consideration of both cases. + */ + private static void visitComplexUnion(Type type, Schema union, + AvroSchemaWithTypeVisitor visitor, List options) { + boolean nullTypeFound = false; + int typeIndex = 0; + int fieldIndexInStruct = 0; + while (typeIndex < union.getTypes().size()) { + Schema schema = union.getTypes().get(typeIndex); + // in some cases, a NULL type exists in the union of Avro schema besides the actual types, + // and it affects the index of the actual types of the order in the union + if (schema.getType() == Schema.Type.NULL) { + nullTypeFound = true; + options.add(visit((Type) null, schema, visitor)); + } else { + boolean relatedFieldInStructFound = false; + Types.StructType struct = type.asStructType(); + if (fieldIndexInStruct < struct.fields().size() && + UNION_TAG_FIELD_NAME.equals(struct.fields().get(fieldIndexInStruct).name())) { + fieldIndexInStruct++; + } + + if (fieldIndexInStruct < struct.fields().size()) { + // If a NULL type is found before current type, the type index is one larger than the actual type index which + // can be used to track the corresponding field in the struct of Iceberg schema. + int actualTypeIndex = nullTypeFound ? typeIndex - 1 : typeIndex; + String structFieldName = type.asStructType().fields().get(fieldIndexInStruct).name(); + int indexFromStructFieldName = Integer.valueOf(structFieldName.substring(5)); + if (actualTypeIndex == indexFromStructFieldName) { + relatedFieldInStructFound = true; + options.add(visit(type.asStructType().fields().get(fieldIndexInStruct).type(), schema, visitor)); + fieldIndexInStruct++; + } + } + + if (!relatedFieldInStructFound) { + visitNotProjectedTypeInComplexUnion(schema, visitor, options); } } + typeIndex++; } - return visitor.union(type, union, options); + } + + // If a field is not projected, a corresponding field in the struct of Iceberg schema cannot be found + // for current type of union in Avro schema, a reader for current type still needs to be created and + // used to make the reading of Avro file successfully. In this case, an pseudo Iceberg type is converted from + // the Avro schema and is used to create the option for the reader of the current type which still can + // read the corresponding content in Avro file successfully. + private static void visitNotProjectedTypeInComplexUnion(Schema schema, + AvroSchemaWithTypeVisitor visitor, + List options) { + Type iType = AvroSchemaUtil.convert(schema); + if (schema.getType().equals(Schema.Type.RECORD)) { + // When the type of Avro schema is RECORD, the fields under it must have the property of "field-id". + // However, the "field-id" is not set in previous steps as the corresponding Iceberg type is not projected + // and no field id can be found for this field in Iceberg schema. + // Therefore, a name mapping is created based on the Avro schema and its corresponding Iceberg type. + // The field-id from the resulted name mapping is assigned as the property of "field-id" of each + // field under the Avro schema. + NameMappingWithAvroSchema nameMappingWithAvroSchema = new NameMappingWithAvroSchema(); + MappedFields nameMapping = AvroWithPartnerByStructureVisitor.visit( + iType, schema, nameMappingWithAvroSchema); + for (Schema.Field field : schema.getFields()) { + if (!AvroSchemaUtil.hasFieldId(field)) { + int fieldId = nameMapping.id(field.name()); + field.addProp(AvroSchemaUtil.FIELD_ID_PROP, fieldId); + } + } + } + options.add(visit(iType, schema, visitor)); } private static T visitArray(Type type, Schema array, AvroSchemaWithTypeVisitor visitor) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java index 2b28f54aa..9ef783bc1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java @@ -21,6 +21,7 @@ import java.util.Deque; import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -88,14 +89,22 @@ private static T visitRecord(P struct, Schema record, AvroWithPartnerBySt private static T visitUnion(P type, Schema union, AvroWithPartnerByStructureVisitor visitor) { List types = union.getTypes(); - Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union), - "Cannot visit non-option union: %s", union); List options = Lists.newArrayListWithExpectedSize(types.size()); - for (Schema branch : types) { - if (branch.getType() == Schema.Type.NULL) { - options.add(visit(visitor.nullType(), branch, visitor)); - } else { - options.add(visit(type, branch, visitor)); + if (AvroSchemaUtil.isOptionSchema(union)) { + for (Schema branch : types) { + if (branch.getType() == Schema.Type.NULL) { + options.add(visit(visitor.nullType(), branch, visitor)); + } else { + options.add(visit(type, branch, visitor)); + } + } + } else { + List nonNullTypes = + types.stream().filter(t -> t.getType() != Schema.Type.NULL).collect(Collectors.toList()); + for (int i = 0; i < nonNullTypes.size(); i++) { + // In the case of complex union, the corresponding "type" is a struct. Non-null type i in + // the union maps to struct field i + 1 because the first struct field is the "tag". + options.add(visit(visitor.fieldNameAndType(type, i + 1).second(), nonNullTypes.get(i), visitor)); } } return visitor.union(type, union, options); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java new file mode 100644 index 000000000..15f6c9534 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +/** + * This class extends {@link AvroWithPartnerByStructureVisitor} to override some functions + * related to some nested data types which help the generation of name mapping from Iceberg schema. + * + * @param Return T. + */ +public class AvroWithTypeByStructureVisitor extends AvroWithPartnerByStructureVisitor { + @Override + protected boolean isMapType(Type type) { + return type.isMapType(); + } + + @Override + protected boolean isStringType(Type type) { + return type.isPrimitiveType() && type.asPrimitiveType().typeId() == Type.TypeID.STRING; + } + + @Override + protected Type arrayElementType(Type arrayType) { + return arrayType.asListType().elementType(); + } + + @Override + protected Type mapKeyType(Type mapType) { + return mapType.asMapType().keyType(); + } + + @Override + protected Type mapValueType(Type mapType) { + return mapType.asMapType().valueType(); + } + + @Override + protected Pair fieldNameAndType(Type structType, int pos) { + Types.NestedField field = structType.asStructType().fields().get(pos); + return Pair.of(field.name(), field.type()); + } + + @Override + protected Type nullType() { + return null; + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java new file mode 100644 index 000000000..801f57881 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.List; +import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * This class extends {@link AvroWithTypeByStructureVisitor} to generate the name mapping from + * Iceberg schema and the corresponding Avro schema. + * + * param Return MappedFields + */ +public class NameMappingWithAvroSchema extends AvroWithTypeByStructureVisitor { + @Override + public MappedFields record( + Type struct, Schema record, List names, List fieldResults) { + List fields = Lists.newArrayListWithExpectedSize(fieldResults.size()); + + for (int i = 0; i < fieldResults.size(); i += 1) { + Types.NestedField field = struct.asStructType().fields().get(i); + MappedFields result = fieldResults.get(i); + fields.add(MappedField.of(field.fieldId(), field.name(), result)); + } + + return MappedFields.of(fields); + } + + @Override + public MappedFields union(Type type, Schema union, List optionResults) { + if (AvroSchemaUtil.isOptionSchema(union)) { + for (int i = 0; i < optionResults.size(); i += 1) { + if (union.getTypes().get(i).getType() != Schema.Type.NULL) { + return optionResults.get(i); + } + } + } else { // Complex union + Preconditions.checkArgument( + type instanceof Types.StructType, + "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", + type, + union); + Types.StructType struct = (Types.StructType) type; + List fields = Lists.newArrayListWithExpectedSize(optionResults.size()); + int index = 0; + // Avro spec for union types states that unions may not contain more than one schema with the + // same type, except for the named types record, fixed and enum. For example, unions + // containing two array types or two map types are not permitted, but two types with different + // names are permitted. + // Therefore, for non-named types, use the Avro type toString() as the field mapping key. For + // named types, use the record name of the Avro type as the field mapping key. + for (Schema option : union.getTypes()) { + if (option.getType() != Schema.Type.NULL) { + // Check if current option is a named type, i.e., a RECORD, ENUM, or FIXED type. If so, + // use the record name of the Avro type as the field name. Otherwise, use the Avro + // type toString(). + if (option.getType() == Schema.Type.RECORD || + option.getType() == Schema.Type.ENUM || + option.getType() == Schema.Type.FIXED) { + fields.add( + MappedField.of( + struct.fields().get(index).fieldId(), + option.getName(), + optionResults.get(index))); + } else { + fields.add( + MappedField.of( + struct.fields().get(index).fieldId(), + option.toString(), + optionResults.get(index))); + } + + // Both iStruct and optionResults do not contain an entry for the NULL type, so we need to + // increment i only + // when we encounter a non-NULL type. + index++; + } + } + return MappedFields.of(fields); + } + return null; + } + + @Override + public MappedFields array(Type list, Schema array, MappedFields elementResult) { + return MappedFields.of(MappedField.of(list.asListType().elementId(), "element", elementResult)); + } + + @Override + public MappedFields map(Type sMap, Schema map, MappedFields keyResult, MappedFields valueResult) { + return MappedFields.of( + MappedField.of(sMap.asMapType().keyId(), "key", keyResult), + MappedField.of(sMap.asMapType().valueId(), "value", valueResult)); + } + + @Override + public MappedFields map(Type sMap, Schema map, MappedFields valueResult) { + return MappedFields.of( + MappedField.of(sMap.asMapType().keyId(), "key", null), + MappedField.of(sMap.asMapType().valueId(), "value", valueResult)); + } + + @Override + public MappedFields primitive(Type type, Schema primitive) { + return null; // no mapping because primitives have no nested fields + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java index 1ee77cb9e..8a3ecfb57 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java @@ -50,6 +50,18 @@ public ProjectionDatumReader(Function> getReader, this.nameMapping = nameMapping; } + public ProjectionDatumReader(Function> getReader, + org.apache.iceberg.Schema expectedSchema, + Map renames, + NameMapping nameMapping, + Schema fileSchema) { + this.getReader = getReader; + this.expectedSchema = expectedSchema; + this.renames = renames; + this.nameMapping = nameMapping; + this.fileSchema = fileSchema; + } + @Override public void setRowPositionSupplier(Supplier posSupplier) { if (wrapped instanceof SupportsRowPosition) { @@ -59,7 +71,10 @@ public void setRowPositionSupplier(Supplier posSupplier) { @Override public void setSchema(Schema newFileSchema) { - this.fileSchema = newFileSchema; + if (this.fileSchema == null) { + this.fileSchema = newFileSchema; + } + if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) { nameMapping = MappingUtil.create(expectedSchema); } diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 3bb559b9a..c777fd9f7 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -19,7 +19,6 @@ package org.apache.iceberg.avro; -import java.util.ArrayList; import java.util.List; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -121,7 +120,7 @@ public Type union(Schema union, List options) { return options.get(0); } else { // Complex union - List newFields = new ArrayList<>(); + List newFields = Lists.newArrayListWithExpectedSize(options.size()); newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get())); int tagIndex = 0; diff --git a/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java new file mode 100644 index 000000000..9d92347e1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +public class TestNameMappingWithAvroSchema { + @Test + public void testNameMappingWithAvroSchema() { + + // Create an example Avro schema with a nested record but not using the SchemaBuilder + Schema schema = + Schema.createRecord( + "test", + null, + null, + false, + Lists.newArrayList( + new Schema.Field("id", Schema.create(Schema.Type.INT)), + new Schema.Field("data", Schema.create(Schema.Type.STRING)), + new Schema.Field( + "location", + Schema.createRecord( + "location", + null, + null, + false, + Lists.newArrayList( + new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)), + new Schema.Field("long", Schema.create(Schema.Type.DOUBLE))))), + new Schema.Field("friends", Schema.createArray(Schema.create(Schema.Type.STRING))), + new Schema.Field( + "simpleUnion", + Schema.createUnion( + Lists.newArrayList( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))), + new Schema.Field( + "complexUnion", + Schema.createUnion( + new Schema[] { + Schema.create(Schema.Type.NULL), + Schema.create(Schema.Type.STRING), + Schema.createRecord( + "innerRecord1", + null, + null, + false, + Lists.newArrayList( + new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)), + new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))), + Schema.createRecord( + "innerRecord2", + null, + null, + false, + Lists.newArrayList( + new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)), + new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))), + Schema.createRecord( + "innerRecord3", + null, + null, + false, + Lists.newArrayList( + new Schema.Field( + "innerUnion", + Schema.createUnion( + Lists.newArrayList( + Schema.create(Schema.Type.STRING), + Schema.create(Schema.Type.INT)))))), + Schema.createEnum( + "timezone", null, null, Lists.newArrayList("UTC", "PST", "EST")), + Schema.createFixed("bitmap", null, null, 1) + })))); + + NameMappingWithAvroSchema nameMappingWithAvroSchema = new NameMappingWithAvroSchema(); + + // Convert Avro schema to Iceberg schema + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + MappedFields expected = + MappedFields.of( + MappedField.of(0, "id"), + MappedField.of(1, "data"), + MappedField.of( + 2, + "location", + MappedFields.of(MappedField.of(6, "lat"), MappedField.of(7, "long"))), + MappedField.of(3, "friends", MappedFields.of(MappedField.of(8, "element"))), + MappedField.of(4, "simpleUnion"), + MappedField.of( + 5, + "complexUnion", + MappedFields.of( + MappedField.of(17, "\"string\""), + MappedField.of( + 18, + "innerRecord1", + MappedFields.of(MappedField.of(9, "lat"), MappedField.of(10, "long"))), + MappedField.of( + 19, + "innerRecord2", + MappedFields.of(MappedField.of(11, "lat"), MappedField.of(12, "long"))), + MappedField.of( + 20, + "innerRecord3", + MappedFields.of( + MappedField.of( + 16, + "innerUnion", + MappedFields.of( + MappedField.of(13, "\"string\""), + MappedField.of(14, "\"int\""))))), + MappedField.of(21, "timezone"), + MappedField.of(22, "bitmap")))); + MappedFields actual = AvroWithPartnerByStructureVisitor.visit( + icebergSchema.asStruct(), schema, nameMappingWithAvroSchema); + Assert.assertEquals(expected, actual); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 75108053e..53eb1bcb4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -83,7 +83,7 @@ public ValueReader union(Type expected, Schema union, List> op if (AvroSchemaUtil.isOptionSchema(union) || AvroSchemaUtil.isSingleTypeUnion(union)) { return ValueReaders.union(options); } else { - return SparkValueReaders.union(union, options); + return SparkValueReaders.union(union, options, expected); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 1c2223847..0412eb0ad 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -34,6 +35,7 @@ import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -86,8 +88,8 @@ static ValueReader struct(List> readers, Types.Struc return new StructReader(readers, struct, idToConstant); } - static ValueReader union(Schema schema, List> readers) { - return new UnionReader(schema, readers); + static ValueReader union(Schema schema, List> readers, Type expected) { + return new UnionReader(schema, readers, expected); } private static class StringReader implements ValueReader { @@ -302,54 +304,76 @@ protected void set(InternalRow struct, int pos, Object value) { } private static class UnionReader implements ValueReader { + private static final String UNION_TAG_FIELD_NAME = "tag"; private final Schema schema; private final ValueReader[] readers; + private final int[] projectedFieldIdsToIdxInReturnedRow; + private boolean isTagFieldProjected; + private int numOfFieldsInReturnedRow; + private int nullTypeIndex; - private UnionReader(Schema schema, List> readers) { + private UnionReader(Schema schema, List> readers, Type expected) { this.schema = schema; this.readers = new ValueReader[readers.size()]; for (int i = 0; i < this.readers.length; i += 1) { this.readers[i] = readers.get(i); } - } - @Override - public InternalRow read(Decoder decoder, Object reuse) throws IOException { - // first we need to filter out NULL alternative if it exists in the union schema - int nullIndex = -1; - List alts = schema.getTypes(); - for (int i = 0; i < alts.size(); i++) { - Schema alt = alts.get(i); + this.nullTypeIndex = -1; + for (int i = 0; i < this.schema.getTypes().size(); i++) { + Schema alt = this.schema.getTypes().get(i); if (Objects.equals(alt.getType(), Schema.Type.NULL)) { - nullIndex = i; + this.nullTypeIndex = i; break; } } + // Creating an integer array to track the mapping between the index of fields to be projected + // and the index of the value for the field stored in the returned row, + // if the value for a field equals to -1, it means the value of this field should not be stored + // in the returned row + int numberOfTypes = this.nullTypeIndex == -1 ? + this.schema.getTypes().size() : this.schema.getTypes().size() - 1; + this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes]; + Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1); + this.numOfFieldsInReturnedRow = 0; + this.isTagFieldProjected = false; + for (Types.NestedField expectedStructField : expected.asStructType().fields()) { + String fieldName = expectedStructField.name(); + if (fieldName.equals(UNION_TAG_FIELD_NAME)) { + this.isTagFieldProjected = true; + this.numOfFieldsInReturnedRow++; + continue; + } + int projectedFieldIndex = Integer.valueOf(fieldName.substring(5)); + this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] = this.numOfFieldsInReturnedRow++; + } + } + + @Override + public InternalRow read(Decoder decoder, Object reuse) throws IOException { int index = decoder.readIndex(); - if (index == nullIndex) { + if (index == nullTypeIndex) { // if it is a null data, directly return null as the whole union result // we know for sure it is a null so the casting will always work. - return (InternalRow) readers[nullIndex].read(decoder, reuse); + return (InternalRow) readers[nullTypeIndex].read(decoder, reuse); } // otherwise, we need to return an InternalRow as a struct data - InternalRow struct = new GenericInternalRow(nullIndex >= 0 ? alts.size() : alts.size() + 1); + InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow); for (int i = 0; i < struct.numFields(); i += 1) { struct.setNullAt(i); } + int fieldIndex = (nullTypeIndex < 0 || index < nullTypeIndex) ? index : index - 1; + if (isTagFieldProjected) { + struct.setInt(0, fieldIndex); + } + Object value = readers[index].read(decoder, reuse); - if (nullIndex < 0) { - struct.update(index + 1, value); - struct.setInt(0, index); - } else if (index < nullIndex) { - struct.update(index + 1, value); - struct.setInt(0, index); - } else { - struct.update(index, value); - struct.setInt(0, index - 1); + if (projectedFieldIdsToIdxInReturnedRow[fieldIndex] != -1) { + struct.update(projectedFieldIdsToIdxInReturnedRow[fieldIndex], value); } return struct; diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java index 31c2b6b9c..26902de06 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java @@ -584,4 +584,115 @@ public void testDeeplyNestedUnionSchema3() throws IOException { // making sure the rows can be read successfully Assert.assertEquals(2, rows.size()); } + + @Test + public void writeAndValidateRequiredComplexUnionWithProjection() throws IOException { + org.apache.avro.Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .intType() + .and() + .stringType() + .and() + .record("r") + .fields() + .name("rDouble") + .type() + .doubleType() + .noDefault() + .endRecord() + .and() + .array() + .items() + .stringType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record unionRecord1 = new GenericData.Record(avroSchema); + unionRecord1.put("unionCol", "foo"); + GenericData.Record unionRecord2 = new GenericData.Record(avroSchema); + unionRecord2.put("unionCol", 1); + GenericData.Record unionRecord3 = new GenericData.Record(avroSchema); + org.apache.avro.Schema avroSchema1 = avroSchema.getField("unionCol").schema().getTypes().get(2); + GenericData.Record unionRecord31 = new GenericData.Record(avroSchema1); + unionRecord31.put("rDouble", 2.2); + unionRecord3.put("unionCol", unionRecord31); + GenericData.Record unionRecord4 = new GenericData.Record(avroSchema); + unionRecord4.put("unionCol", ImmutableList.of("bar")); + + File testFile = temp.newFile(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(avroSchema, testFile); + writer.append(unionRecord1); + writer.append(unionRecord2); + writer.append(unionRecord3); + writer.append(unionRecord4); + } + + Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema).select("unionCol.field0"); + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(expectedSchema) + .build()) { + rows = Lists.newArrayList(reader); + + Assert.assertEquals(1, rows.get(0).getStruct(0, 1).numFields()); + Assert.assertTrue(rows.get(0).getStruct(0, 1).isNullAt(0)); + Assert.assertEquals(1, rows.get(1).getStruct(0, 1).getInt(0)); + } + } + + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void writeAndReadRequiredComplexUnionWithSchemaMismatch() throws IOException { + org.apache.avro.Schema avroWriteSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .intType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record unionRecord1 = new GenericData.Record(avroWriteSchema); + unionRecord1.put("unionCol", "foo"); + GenericData.Record unionRecord2 = new GenericData.Record(avroWriteSchema); + unionRecord2.put("unionCol", 1); + + File testFile = temp.newFile(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(avroWriteSchema, testFile); + writer.append(unionRecord1); + writer.append(unionRecord2); + } + + org.apache.avro.Schema avroReadSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .stringType() + .and() + .intType() + .endUnion() + .noDefault() + .endRecord(); + + Schema expectedSchema = AvroSchemaUtil.toIceberg(avroReadSchema); + + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(expectedSchema) + .setFileSchema(avroReadSchema) + .build()) { + rows = Lists.newArrayList(reader); + } + } }