Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5533] Support spark columns comments #8683

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ object AvroConversionUtils {
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
val schemaConverters = sparkAdapter.getAvroSchemaConverters
schemaConverters.toSqlType(avroSchema) match {
case (dataType, _) => dataType.asInstanceOf[StructType]
case (dataType, _, _) => dataType.asInstanceOf[StructType]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.DataType
*/
trait HoodieAvroSchemaConverters {

def toSqlType(avroSchema: Schema): (DataType, Boolean)
def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String])

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.types.DataType
*/
object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {

override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
override def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) =
SchemaConverters.toSqlType(avroSchema) match {
case SchemaType(dataType, nullable) => (dataType, nullable)
case SchemaType(dataType, nullable, doc) => (dataType, nullable, doc)
}

override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[sql] object SchemaConverters {
*
* @since 2.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean)
case class SchemaType(dataType: DataType, nullable: Boolean, doc: Option[String])

/**
* Converts an Avro schema to a corresponding Spark SQL schema.
Expand All @@ -58,33 +58,37 @@ private[sql] object SchemaConverters {

private val unionFieldMemberPrefix = "member"

private def extractDoc(doc: String): Option[String] = {
if(doc == null) Option.empty
else Option(doc)
parisni marked this conversation as resolved.
Show resolved Hide resolved
}
private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
(avroSchema.getType, extractDoc(avroSchema.getDoc)) match {
case (INT, doc) => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false, doc)
case _ => SchemaType(IntegerType, nullable = false, doc)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES | FIXED => avroSchema.getLogicalType match {
case (STRING, doc) => SchemaType(StringType, nullable = false, doc)
case (BOOLEAN, doc) => SchemaType(BooleanType, nullable = false, doc)
case (BYTES | FIXED, doc) => avroSchema.getLogicalType match {
// For FIXED type, if the precision requires more bytes than fixed size, the logical
// type will be null, which is handled by Avro library.
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
case _ => SchemaType(BinaryType, nullable = false)
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false, doc)
case _ => SchemaType(BinaryType, nullable = false, doc)
}

case DOUBLE => SchemaType(DoubleType, nullable = false)
case FLOAT => SchemaType(FloatType, nullable = false)
case LONG => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _ => SchemaType(LongType, nullable = false)
case (DOUBLE, doc) => SchemaType(DoubleType, nullable = false, doc)
case (FLOAT, doc) => SchemaType(FloatType, nullable = false, doc)
case (LONG, doc) => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false, doc)
case _ => SchemaType(LongType, nullable = false, doc)
}

case ENUM => SchemaType(StringType, nullable = false)
case (ENUM, doc) => SchemaType(StringType, nullable = false, doc)

case NULL => SchemaType(NullType, nullable = true)
case (NULL, doc) => SchemaType(NullType, nullable = true, doc)

case RECORD =>
case (RECORD, doc) =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(
s"""
Expand All @@ -95,24 +99,25 @@ private[sql] object SchemaConverters {
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
StructField(f.name, schemaType.dataType, schemaType.nullable)
val metadata = if(f.doc != null) new MetadataBuilder().putString("comment", f.doc).build() else Metadata.empty
StructField(f.name, schemaType.dataType, schemaType.nullable, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)

case ARRAY =>
case (ARRAY, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case MAP =>
case (MAP, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case UNION =>
case (UNION, doc) =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
Expand All @@ -126,20 +131,21 @@ private[sql] object SchemaConverters {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
SchemaType(LongType, nullable = false, doc)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false)
SchemaType(DoubleType, nullable = false, doc)
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
// All fields are nullable because only one of them is set at a time
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true)
val metadata = if(schemaType.doc.isDefined) new MetadataBuilder().putString("comment", schemaType.doc.get).build() else Metadata.empty
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TestAvroSerDe extends SparkAdapterSupport {
}

val avroSchema = HoodieMetadataColumnStats.SCHEMA$
val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema)
val SchemaType(catalystSchema, _, _) = SchemaConverters.toSqlType(avroSchema)

val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema)
val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestSchemaConverters {
def testAvroUnionConversion(): Unit = {
val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$

val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema)
val SchemaType(convertedStructType, _, _) = SchemaConverters.toSqlType(originalAvroSchema)
val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType)

// NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse
Expand Down