Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5533] Support spark columns comments #8683

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ object AvroConversionUtils {
try {
val schemaConverters = sparkAdapter.getAvroSchemaConverters
schemaConverters.toSqlType(avroSchema) match {
case (dataType, _) => dataType.asInstanceOf[StructType]
case (dataType, _, _) => dataType.asInstanceOf[StructType]
}
} catch {
case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to struct type: " + avroSchema, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.DataType
*/
trait HoodieAvroSchemaConverters {

def toSqlType(avroSchema: Schema): (DataType, Boolean)
def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String])

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema

Expand Down
2 changes: 1 addition & 1 deletion hudi-common/src/test/resources/simple-test-evolved.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{"name": "field2", "type": ["null", "string"], "default": null},
{"name": "name", "type": ["null", "string"], "default": null},
{"name": "favorite_number", "type": ["null", "long"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null, "doc": "a quoted\"comment"},
{"name": "favorite_movie", "type": ["null", "string"], "default": null}
]
}
2 changes: 1 addition & 1 deletion hudi-common/src/test/resources/simple-test.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
{"name": "favorite_color", "type": "string", "doc": "a quoted\"comment"}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public static Map<String, String> translateFlinkTableProperties2Spark(
partitionKeys,
sparkVersion,
4000,
messageType);
messageType,
// flink does not support comment yet
Arrays.asList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList() ?

properties.putAll(sparkTableProperties);
return properties.entrySet().stream()
.filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.types.DataType
*/
object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {

override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
override def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) =
SchemaConverters.toSqlType(avroSchema) match {
case SchemaType(dataType, nullable) => (dataType, nullable)
case SchemaType(dataType, nullable, doc) => (dataType, nullable, doc)
}

override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[sql] object SchemaConverters {
*
* @since 2.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean)
case class SchemaType(dataType: DataType, nullable: Boolean, doc: Option[String])

/**
* Converts an Avro schema to a corresponding Spark SQL schema.
Expand All @@ -59,32 +59,32 @@ private[sql] object SchemaConverters {
private val unionFieldMemberPrefix = "member"

private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
(avroSchema.getType, Option(avroSchema.getDoc)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion tool is copied from Spark: https://github.com/apache/spark/blob/dd4db21cb69a9a9c3715360673a76e6f150303d4/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58, just noticed that Spark also does not support keeping comments from Avro fields while doing the converison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely spark also have this limitation when retrieving schema from avro. But spark don't usually infer spark schema from avro. Hudi does, and that's the reason of the patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test case for it, especially for creating table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 added a test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types, can we write another method similiar with toSqlTypeHelper which invokes toSqlTypeHelper firstly then fix the comment separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types

Any columns including nested columns also may have comments, so I don't see why we should'nt look after all avro content for doc.

which invokes toSqlTypeHelper firstly then fix the comment separately.

This would lead to walk thought the avro schema two times, and also lead to complex merge of results. Am i missing something ?

case (INT, doc) => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false, doc)
case _ => SchemaType(IntegerType, nullable = false, doc)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES | FIXED => avroSchema.getLogicalType match {
case (STRING, doc) => SchemaType(StringType, nullable = false, doc)
case (BOOLEAN, doc) => SchemaType(BooleanType, nullable = false, doc)
case (BYTES | FIXED, doc) => avroSchema.getLogicalType match {
// For FIXED type, if the precision requires more bytes than fixed size, the logical
// type will be null, which is handled by Avro library.
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
case _ => SchemaType(BinaryType, nullable = false)
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false, doc)
case _ => SchemaType(BinaryType, nullable = false, doc)
}

case DOUBLE => SchemaType(DoubleType, nullable = false)
case FLOAT => SchemaType(FloatType, nullable = false)
case LONG => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _ => SchemaType(LongType, nullable = false)
case (DOUBLE, doc) => SchemaType(DoubleType, nullable = false, doc)
case (FLOAT, doc) => SchemaType(FloatType, nullable = false, doc)
case (LONG, doc) => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false, doc)
case _ => SchemaType(LongType, nullable = false, doc)
}

case ENUM => SchemaType(StringType, nullable = false)
case (ENUM, doc) => SchemaType(StringType, nullable = false, doc)

case NULL => SchemaType(NullType, nullable = true)
case (NULL, doc) => SchemaType(NullType, nullable = true, doc)

case RECORD =>
case (RECORD, doc) =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(
s"""
Expand All @@ -95,24 +95,25 @@ private[sql] object SchemaConverters {
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
StructField(f.name, schemaType.dataType, schemaType.nullable)
val metadata = if(f.doc != null) new MetadataBuilder().putString("comment", f.doc).build() else Metadata.empty
StructField(f.name, schemaType.dataType, schemaType.nullable, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)

case ARRAY =>
case (ARRAY, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case MAP =>
case (MAP, doc) =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
nullable = false, doc)

case UNION =>
case (UNION, doc) =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
Expand All @@ -126,20 +127,21 @@ private[sql] object SchemaConverters {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
SchemaType(LongType, nullable = false, doc)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false)
SchemaType(DoubleType, nullable = false, doc)
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
// All fields are nullable because only one of them is set at a time
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true)
val metadata = if(schemaType.doc.isDefined) new MetadataBuilder().putString("comment", schemaType.doc.get).build() else Metadata.empty
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true, metadata)
}

SchemaType(StructType(fields.toSeq), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false, doc)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.functional

import org.apache.spark.sql._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.SparkContext
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.model.{HoodieTableType}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource


class TestColumnComments {
var spark : SparkSession = _
var sqlContext: SQLContext = _
var sc : SparkContext = _

def initSparkContext(): Unit = {
val sparkConf = getSparkConfForTest(getClass.getSimpleName)
spark = SparkSession.builder()
.withExtensions(new HoodieSparkSessionExtension)
.config(sparkConf)
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
}

@BeforeEach
def setUp() {
initSparkContext()
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType], names = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testColumnCommentWithSparkDatasource(tableType: HoodieTableType): Unit = {
val basePath = java.nio.file.Files.createTempDirectory("hoodie_comments_path").toAbsolutePath.toString
val opts = Map(
HoodieWriteConfig.TBL_NAME.key -> "hoodie_comments",
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.toString,
DataSourceWriteOptions.OPERATION.key -> "bulk_insert",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition"
)
val inputDF = spark.sql("select '0' as _row_key, '1' as content, '2' as partition, '3' as ts")
val struct = new StructType()
.add("_row_key", "string", true, "dummy comment")
.add("content", "string", true)
.add("partition", "string", true)
.add("ts", "string", true)
spark.createDataFrame(inputDF.rdd, struct)
.write.format("hudi")
.options(opts)
.mode(SaveMode.Overwrite)
.save(basePath)
spark.read.format("hudi").load(basePath).registerTempTable("test_tbl")

// now confirm the comment is present at read time
assertEquals(1, spark.sql("desc extended test_tbl")
.filter("col_name = '_row_key' and comment = 'dummy comment'").count)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TestAvroSerDe extends SparkAdapterSupport {
}

val avroSchema = HoodieMetadataColumnStats.SCHEMA$
val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema)
val SchemaType(catalystSchema, _, _) = SchemaConverters.toSqlType(avroSchema)

val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema)
val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestSchemaConverters {
def testAvroUnionConversion(): Unit = {
val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$

val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema)
val SchemaType(convertedStructType, _, _) = SchemaConverters.toSqlType(originalAvroSchema)
val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType)

// NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.util.ConfigUtils;
Expand Down Expand Up @@ -212,8 +213,9 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
Map<String, String> tableProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_TABLE_PROPERTIES));
Map<String, String> serdeProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_SERDE_PROPERTIES));
if (config.getBoolean(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE)) {
List<FieldSchema> fromStorage = syncClient.getStorageFieldSchemas();
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromStorage -> fieldSchema

config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema);
config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema, fromStorage);
Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH));
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public class HiveSyncConfigHolder {
.key("hoodie.datasource.hive_sync.sync_as_datasource")
.defaultValue("true")
.markAdvanced()
.withDocumentation("");
.withDocumentation("Add information to setup the spark datasource, including tables properties and spark schema."
+ " This allow spark to use optimized reader."
+ " Column comments are also added for the first level only.");
public static final ConfigProperty<Integer> HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty
.key("hoodie.datasource.hive_sync.schema_string_length_thresh")
.defaultValue(4000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
private Map<String, String> getTableProperties(MessageType schema) {
Map<String, String> tableProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_PROPERTIES));
if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
List<FieldSchema> fromStorage = syncClient.getStorageFieldSchemas();
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema);
config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema, fromStorage);
tableProperties.putAll(sparkTableProperties);
}
return tableProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
for (String sql : sqls) {
if (hiveDriver != null) {
HoodieTimer timer = HoodieTimer.start();
responses.add(hiveDriver.run(sql));
responses.add(hiveDriver.run(escapeAntiSlash(sql)));
LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, timer.endTimer()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void runSQL(String s) {
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + s);
stmt.execute(s);
stmt.execute(escapeAntiSlash(s));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the escape related with this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, both JDBC and HMS generate sql and antislash should be doubled escaped otherwise it is lost.

Antislash is used to escape double quotes in the comments DDL.

} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
Expand Down Expand Up @@ -220,5 +221,16 @@ private List<String> constructChangePartitions(String tableName, List<String> pa
}
return changePartitions;
}

/**
* SQL statement should be be escaped in order to consider anti-slash
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sentence should end up with .. Every new paragraph should start with <p>, remove the params comments if there are no comments at all.

* For eg: \foo should be transformed into \\\foo
* @param sql
* @return
*/
protected String escapeAntiSlash(String sql) {
return sql.replaceAll("\\\\", Matcher.quoteReplacement("\\\\\\"));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean us
+ "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
+ "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},"
+ "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
+ "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"a quoted\\\"comment\"}},"
+ "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n"
+ "spark.sql.sources.schema.partCol.0\tdatestr\n";
} else {
Expand All @@ -611,7 +611,7 @@ private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean us
+ "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":"
+ "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\","
+ "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,"
+ "\"metadata\":{}}]}\n"
+ "\"metadata\":{\"comment\":\"a quoted\\\"comment\"}}]}\n"
+ "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n"
+ "spark.sql.sources.schema.partCol.0\tdatestr\n";
}
Expand Down
Loading
Loading