From 0c0f44fe57c95a9db34567078a92db025869299c Mon Sep 17 00:00:00 2001 From: Raymond Zhang Date: Mon, 11 Mar 2024 14:56:24 -0700 Subject: [PATCH 1/3] [LI] Make MergeHiveSchemaWithAvro not reordering avro optional union field when it's it's merging a hive primitive with an avro optional union --- .../apache/iceberg/avro/AvroSchemaUtil.java | 12 ++++++++ .../core/schema/MergeHiveSchemaWithAvro.java | 4 ++- .../core/TestMergeHiveSchemaWithAvro.java | 30 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index edc658341..004ba8fee 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -192,6 +192,18 @@ public static boolean nullExistInUnion(Schema schema) { return false; } + public static int getNullIndexInUnion(Schema schema) { + Preconditions.checkArgument(schema.getType() == UNION, + "Expected union schema but was passed: %s", schema); + for (int i = 0; i < schema.getTypes().size(); i++) { + if (schema.getTypes().get(i).getType() == Schema.Type.NULL) { + return i; + } + } + // which means null is not present in the union + return -1; + } + public static Schema toOption(Schema schema) { return toOption(schema, false); } diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java index 61d51c80a..40cd5d799 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java @@ -143,10 +143,12 @@ public Schema union(UnionTypeInfo union, Schema partner, List results) { @Override public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + boolean nullShouldBeSecondElementInOptionalUnionSchema = + shouldResultBeOptional && AvroSchemaUtil.getNullIndexInUnion(partner) == 1; Schema hivePrimitive = hivePrimitiveToAvro(primitive); // if there was no matching Avro primitive, use the Hive primitive Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); - return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, nullShouldBeSecondElementInOptionalUnionSchema) : result; } private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java index d2dceddf2..915be73eb 100644 --- a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestMergeHiveSchemaWithAvro.java @@ -26,6 +26,10 @@ import java.util.stream.Collectors; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.util.internal.JacksonUtils; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -277,6 +281,32 @@ public void shouldReorderOptionalSchemaToMatchDefaultValue() { assertSchema(avro, merge(hive, avro)); } + // this test record level default value is valid with regard to inner field's optional union schema order + @Test + public void shouldReorderOptionalSchemaToMatchDefaultValue2() { + String hive = "struct>"; + + Schema inner = SchemaBuilder.record("INNER").fields() + .name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))) + .noDefault() + .name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL))) + .noDefault() + .endRecord(); + + GenericData.Record recdef = new GenericRecordBuilder(inner).set("f1", null).set("f2", "foo") + .build(); + + Schema avro2 = SchemaBuilder.record("OUTER").fields() + .name("inner").type().record("INNER").fields() + .name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))) + .noDefault() + .name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL))) + .noDefault() + .endRecord().recordDefault(recdef).endRecord(); + + assertSchema(avro2, merge(hive, avro2)); + } + @Rule public ExpectedException thrown = ExpectedException.none(); From aa412e41cf12af4172231e910b454e696e45312f Mon Sep 17 00:00:00 2001 From: Raymond Zhang Date: Mon, 11 Mar 2024 15:11:33 -0700 Subject: [PATCH 2/3] fix style --- .../hivelink/core/schema/MergeHiveSchemaWithAvro.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java index 40cd5d799..406513d3a 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java @@ -147,8 +147,10 @@ public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { shouldResultBeOptional && AvroSchemaUtil.getNullIndexInUnion(partner) == 1; Schema hivePrimitive = hivePrimitiveToAvro(primitive); // if there was no matching Avro primitive, use the Hive primitive - Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); - return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, nullShouldBeSecondElementInOptionalUnionSchema) : result; + Schema result = + partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, + nullShouldBeSecondElementInOptionalUnionSchema) : result; } private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { From 580d2eb7ac4c72aeb50ad05e74a12166808b6b58 Mon Sep 17 00:00:00 2001 From: Raymond Zhang Date: Mon, 11 Mar 2024 15:55:12 -0700 Subject: [PATCH 3/3] minor fix --- .../iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java index 406513d3a..ad338fa65 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/schema/MergeHiveSchemaWithAvro.java @@ -101,6 +101,8 @@ private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) { boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL; if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) { return schema; + } else if (!isNullFirstOption && !defaultValue.equals(JsonProperties.NULL_VALUE)) { + return schema; } else { return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0)); } @@ -143,7 +145,7 @@ public Schema union(UnionTypeInfo union, Schema partner, List results) { @Override public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); - boolean nullShouldBeSecondElementInOptionalUnionSchema = + boolean nullShouldBeSecondElementInOptionalUnionSchema = partner != null && shouldResultBeOptional && AvroSchemaUtil.getNullIndexInUnion(partner) == 1; Schema hivePrimitive = hivePrimitiveToAvro(primitive); // if there was no matching Avro primitive, use the Hive primitive