From cb095b2f9883f05c68a7dd27d06a7715025d51d6 Mon Sep 17 00:00:00 2001 From: Raymond Zhang Date: Tue, 28 Feb 2023 19:23:07 -0700 Subject: [PATCH] Fix reordering avro nullable types in MergeHiveSchemaWithAvro --- .../apache/iceberg/avro/AvroSchemaUtil.java | 9 +++++++++ .../core/schema/MergeHiveSchemaWithAvro.java | 17 ++++++++++------ .../core/TestMergeHiveSchemaWithAvro.java | 20 +++++++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index edc658341..91c3fb69f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -138,6 +138,15 @@ public static boolean isOptionSchema(Schema schema) { return false; } + public static int getNullIndex(Schema schema) { + Preconditions.checkArgument(isOptionSchema(schema)); + if (schema.getTypes().get(0).getType() == Schema.Type.NULL) { + return 0; + } else { + return 1; + } + } + /** * This method decides whether a schema represents a single type union, i.e., a union that contains only one option * diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java index 61d51c80a..ead5f3e53 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java @@ -26,6 +26,7 @@ import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -97,12 +98,12 @@ public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Sch * If the schema is not an option schema or if there is no default value, schema is returned as-is */ private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) { - if (AvroSchemaUtil.isOptionSchema(schema) && defaultValue != null) { - boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL; - if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) { - return schema; + if (AvroSchemaUtil.isOptionSchema(schema)) { + int nullIndex = AvroSchemaUtil.getNullIndex(schema); + if (defaultValue != null && !defaultValue.equals(JsonProperties.NULL_VALUE)) { + return Schema.createUnion(schema.getTypes().get(1 - nullIndex), Schema.create(Type.NULL)); } else { - return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0)); + return Schema.createUnion(Schema.create(Type.NULL), schema.getTypes().get(1 - nullIndex)); } } else { return schema; @@ -143,10 +144,14 @@ public Schema union(UnionTypeInfo union, Schema partner, List results) { @Override public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + int nullIndex = 0; + if (partner != null && AvroSchemaUtil.isOptionSchema(partner)) { + nullIndex = AvroSchemaUtil.getNullIndex(partner); + } Schema hivePrimitive = hivePrimitiveToAvro(primitive); // if there was no matching Avro primitive, use the Hive primitive Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); - return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, nullIndex == 1) : result; } private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java index d2dceddf2..24d845660 100644 --- a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.util.internal.JacksonUtils; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -355,6 +356,25 @@ public void shouldRecoverLogicalType() { AvroSchemaUtil.toIceberg(merged); } + @Test + public void shouldNotReorderListElementType() { + String hive = "struct>"; + Schema avro = + SchemaBuilder.record("r1") + .fields() + .name("fa") + .type() + .nullable() + .array() + .items() + .nullable() + .intType() + .arrayDefault(Arrays.asList(1, 2, 3)) + .endRecord(); + + assertSchema(avro, merge(hive, avro)); + } + // TODO: tests to retain schema props // TODO: tests for explicit type compatibility check between hive and avro primitives, once we implement it // TODO: tests for error case => default value in Avro does not match with type from hive