Skip to content

Commit

Permalink
Fix reordering avro nullable types in MergeHiveSchemaWithAvro
Browse files Browse the repository at this point in the history
  • Loading branch information
rzhang10 committed Mar 1, 2023
1 parent b9c734e commit cb095b2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}

public static int getNullIndex(Schema schema) {
Preconditions.checkArgument(isOptionSchema(schema));
if (schema.getTypes().get(0).getType() == Schema.Type.NULL) {
return 0;
} else {
return 1;
}
}

/**
* This method decides whether a schema represents a single type union, i.e., a union that contains only one option
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
Expand Down Expand Up @@ -97,12 +98,12 @@ public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Sch
* If the schema is not an option schema or if there is no default value, schema is returned as-is
*/
private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) {
if (AvroSchemaUtil.isOptionSchema(schema) && defaultValue != null) {
boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL;
if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) {
return schema;
if (AvroSchemaUtil.isOptionSchema(schema)) {
int nullIndex = AvroSchemaUtil.getNullIndex(schema);
if (defaultValue != null && !defaultValue.equals(JsonProperties.NULL_VALUE)) {
return Schema.createUnion(schema.getTypes().get(1 - nullIndex), Schema.create(Type.NULL));
} else {
return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0));
return Schema.createUnion(Schema.create(Type.NULL), schema.getTypes().get(1 - nullIndex));
}
} else {
return schema;
Expand Down Expand Up @@ -143,10 +144,14 @@ public Schema union(UnionTypeInfo union, Schema partner, List<Schema> results) {
@Override
public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) {
boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner);
int nullIndex = 0;
if (partner != null && AvroSchemaUtil.isOptionSchema(partner)) {
nullIndex = AvroSchemaUtil.getNullIndex(partner);
}
Schema hivePrimitive = hivePrimitiveToAvro(primitive);
// if there was no matching Avro primitive, use the Hive primitive
Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner);
return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result;
return shouldResultBeOptional ? AvroSchemaUtil.toOption(result, nullIndex == 1) : result;
}

private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.internal.JacksonUtils;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
Expand Down Expand Up @@ -355,6 +356,25 @@ public void shouldRecoverLogicalType() {
AvroSchemaUtil.toIceberg(merged);
}

@Test
public void shouldNotReorderListElementType() {
String hive = "struct<fa:array<int>>";
Schema avro =
SchemaBuilder.record("r1")
.fields()
.name("fa")
.type()
.nullable()
.array()
.items()
.nullable()
.intType()
.arrayDefault(Arrays.asList(1, 2, 3))
.endRecord();

assertSchema(avro, merge(hive, avro));
}

// TODO: tests to retain schema props
// TODO: tests for explicit type compatibility check between hive and avro primitives, once we implement it
// TODO: tests for error case => default value in Avro does not match with type from hive
Expand Down

0 comments on commit cb095b2

Please sign in to comment.