diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java index a0d1b086e035..78720f9ac0eb 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java @@ -286,10 +286,14 @@ private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object struc throw new HoodieException("Unexpected Avro schema for Binary TypeInfo: " + schema.getType()); } case DECIMAL: - HiveDecimal dec = (HiveDecimal)fieldOI.getPrimitiveJavaObject(structFieldData); - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)schema.getLogicalType(); + HiveDecimal dec = (HiveDecimal) fieldOI.getPrimitiveJavaObject(structFieldData); + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) schema.getLogicalType(); BigDecimal bd = new BigDecimal(dec.toString()).setScale(decimal.getScale()); - return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + if (schema.getType() == Schema.Type.BYTES) { + return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, decimal); + } else { + return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + } case CHAR: HiveChar ch = (HiveChar)fieldOI.getPrimitiveJavaObject(structFieldData); return new Utf8(ch.getStrippedValue()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 275ab36b82a0..c1e34392a3ab 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -168,6 +168,9 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean case STRING: return new Text(value.toString()); case BYTES: + if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { + return toHiveDecimalWritable(((ByteBuffer) value).array(), schema); + } return new BytesWritable(((ByteBuffer) value).array()); case INT: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { @@ -248,11 +251,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean } case FIXED: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); - HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(), - decimal.getScale()); - return HiveDecimalUtils.enforcePrecisionScale(writable, - new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + return toHiveDecimalWritable(((GenericFixed) value).bytes(), schema); } return new BytesWritable(((GenericFixed) value).bytes()); default: @@ -319,4 +318,11 @@ private static Schema appendNullSchemaFields(Schema schema, List newFiel } return appendFieldsToSchema(schema, newFields); } + + private static HiveDecimalWritable toHiveDecimalWritable(byte[] bytes, Schema schema) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); + HiveDecimalWritable writable = new HiveDecimalWritable(bytes, decimal.getScale()); + return HiveDecimalUtils.enforcePrecisionScale(writable, + new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + } }