From d581a6965e802c7d22f302799dc294ce1abb9ec6 Mon Sep 17 00:00:00 2001 From: "Limian (Raymond) Zhang" Date: Fri, 15 Mar 2024 11:14:38 -0700 Subject: [PATCH] =?UTF-8?q?[LI]=20Make=20MergeHiveSchemaWithAvro=20not=20r?= =?UTF-8?q?eordering=20avro=20optional=20union=20=E2=80=A6=20(#153)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [LI] Make MergeHiveSchemaWithAvro not reordering avro optional union field when it's it's merging a hive primitive with an avro optional union --- .../apache/iceberg/avro/AvroSchemaUtil.java | 12 ++++++++ .../core/schema/MergeHiveSchemaWithAvro.java | 10 +++++-- .../core/TestMergeHiveSchemaWithAvro.java | 30 +++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index edc658341..004ba8fee 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -192,6 +192,18 @@ public static boolean nullExistInUnion(Schema schema) { return false; } + public static int getNullIndexInUnion(Schema schema) { + Preconditions.checkArgument(schema.getType() == UNION, + "Expected union schema but was passed: %s", schema); + for (int i = 0; i < schema.getTypes().size(); i++) { + if (schema.getTypes().get(i).getType() == Schema.Type.NULL) { + return i; + } + } + // which means null is not present in the union + return -1; + } + public static Schema toOption(Schema schema) { return toOption(schema, false); } diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java index 61d51c80a..ad338fa65 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java @@ -101,6 +101,8 @@ private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) { boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL; if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) { return schema; + } else if (!isNullFirstOption && !defaultValue.equals(JsonProperties.NULL_VALUE)) { + return schema; } else { return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0)); } @@ -143,10 +145,14 @@ public Schema union(UnionTypeInfo union, Schema partner, List results) { @Override public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + boolean nullShouldBeSecondElementInOptionalUnionSchema = partner != null && + shouldResultBeOptional && AvroSchemaUtil.getNullIndexInUnion(partner) == 1; Schema hivePrimitive = hivePrimitiveToAvro(primitive); // if there was no matching Avro primitive, use the Hive primitive - Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); - return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + Schema result = + partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, + nullShouldBeSecondElementInOptionalUnionSchema) : result; } private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java index d2dceddf2..915be73eb 100644 --- a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java @@ -26,6 +26,10 @@ import java.util.stream.Collectors; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.util.internal.JacksonUtils; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -277,6 +281,32 @@ public void shouldReorderOptionalSchemaToMatchDefaultValue() { assertSchema(avro, merge(hive, avro)); } + // this test record level default value is valid with regard to inner field's optional union schema order + @Test + public void shouldReorderOptionalSchemaToMatchDefaultValue2() { + String hive = "struct>"; + + Schema inner = SchemaBuilder.record("INNER").fields() + .name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))) + .noDefault() + .name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL))) + .noDefault() + .endRecord(); + + GenericData.Record recdef = new GenericRecordBuilder(inner).set("f1", null).set("f2", "foo") + .build(); + + Schema avro2 = SchemaBuilder.record("OUTER").fields() + .name("inner").type().record("INNER").fields() + .name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))) + .noDefault() + .name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL))) + .noDefault() + .endRecord().recordDefault(recdef).endRecord(); + + assertSchema(avro2, merge(hive, avro2)); + } + @Rule public ExpectedException thrown = ExpectedException.none();