Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5533] Support spark columns comments #8683

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

parisni
Copy link
Contributor

@parisni parisni commented May 10, 2023

Change Logs

fixes #7531 ie: show comments within spark schemas

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

None

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@parisni
Copy link
Contributor Author

parisni commented May 14, 2023

Does Option.ofNullable work correctly here?

Scala Option does not have ofNullable (java Optinonal do have). BTW Option(value) is equivalent as what you suggest, and I have corrected

case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
(avroSchema.getType, Option(avroSchema.getDoc)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion tool is copied from Spark: https://github.com/apache/spark/blob/dd4db21cb69a9a9c3715360673a76e6f150303d4/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58, just noticed that Spark also does not support keeping comments from Avro fields while doing the converison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely spark also have this limitation when retrieving schema from avro. But spark don't usually infer spark schema from avro. Hudi does, and that's the reason of the patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test case for it, especially for creating table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 added a test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types, can we write another method similiar with toSqlTypeHelper which invokes toSqlTypeHelper firstly then fix the comment separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of feel there is no need to change each match for every data types

Any columns including nested columns also may have comments, so I don't see why we should'nt look after all avro content for doc.

which invokes toSqlTypeHelper firstly then fix the comment separately.

This would lead to walk thought the avro schema two times, and also lead to complex merge of results. Am i missing something ?

@danny0405 danny0405 self-assigned this May 15, 2023
@danny0405 danny0405 added schema-and-data-types priority:minor everything else; usability gaps; questions; feature reqs labels May 15, 2023
@parisni
Copy link
Contributor Author

parisni commented Jun 12, 2023

while spark.read.format("hudi").load("path") and spark.table("database.table") returns the comments within the schema, spark.sql("desc database.schema") won't. So I have to investigate and add a fix.

@parisni parisni requested a review from danny0405 June 12, 2023 17:54
…umns

# Conflicts:
#	hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@parisni
Copy link
Contributor Author

parisni commented Jun 29, 2023

I have investigated a bit, and here my current understanding:

Reading hudi table w/ spark has two path:

  1. if spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog (which is what hudi recommend in the documentation), then hudi will rely on the HiveSessionCatalog to get the schema. Then if it's a hive metastore implementation, spark will try to get the schema as case sensitive and thus not get it from the hive schema (which is case insensitive), and fall back fetching the table properties spark.sql.sources.schema instead. If it's a glue metastore likely the same happens. BTW, our hive_sync service currently don't propagate the comments in the spark.sql.sources.schema and that's why in this case spark.sql("desc table") or spark.table("table").schema won't return the comments. This behavior can currently be avoided by setting hoodie.datasource.hive_sync.sync_as_datasource=false, which forces spark to grab the information from hive (by letting the spark properties empty in the hms), but in a case insensitive way. I'm not sure what are the consequences of relying on hive only.
  2. if spark.sql.catalog.spark_catalog is not set or if reading hudi table by path spark.read.format("hudi").load("path"), then spark uses the path updated in this PR, by mean get the schema information from the hudi avro file. Except when using spark.sql("desc table") b/c spark fallbacks to hiveSessionCatalog in this case.

So right now, using this PR and setting bothhoodie.datasource.hive_sync.sync_comment=true and hoodie.datasource.hive_sync.sync_as_datasource=false, one will get the comments in any case (by identifier or by path). However not setting the spark datasource informations within the HMS might have some bad effects (if not, why making so much efforts to maintains two schemas within the hms?).

To fix this we could:

  1. make hive_sync populate the comments in the properties
  2. make HoodieCatalog not use anymore the HiveSessionCatalog to get the schema, but use the hudi avro in place and skip the HMS for this.

I would go for 1 b/c it keeps the current logic intact, and also cover the case spark.sql("desc tablename") when spark.sql.catalog.spark_catalog is not set.

Thought @danny0405 @bhasudha @yihua ?

@parisni parisni closed this Jul 10, 2023
@parisni parisni reopened this Jul 10, 2023
@parisni
Copy link
Contributor Author

parisni commented Jul 10, 2023

@danny0405 The PR is ready.

The way to activate comment for all engines is as below:

hoodie.datasource.hive_sync.sync_comment=true
hoodie.datasource.hive_sync.sync_as_datasource=false

To recap:

  • hoodie.datasource.hive_sync.sync_comment: ads the comments in the metastore (hive, glue ...)
  • the current PR fixes missing comment when reading the hudi table from path (not metastore) with spark
  • hoodie.datasource.hive_sync.sync_as_datasource: allow spark to get comments from the metastore

BTW, I will provide some details about comments in the doc IYW

The reason spark datasource should be disabled is our current table property builder rely on parquet types which does not know about comments. It would be painful to modify and spark table properties are useless and leads to errors.

@parisni
Copy link
Contributor Author

parisni commented Jul 23, 2023

@danny0405 just learned here why the spark data source stuff shall be kept within hms

With the above, all of Spark SQL queries will use Hudi DataSource and hence end up using the custom FileIndex for doing the listing.

Then there is a need to update the datasource code to be aware of comments within hive sync.

@prashantwason can you confirm this part is still applicable and as a result hoodie.datasource.hive_sync.sync_as_datasource is highly encouraged ?

@parisni
Copy link
Contributor Author

parisni commented Jul 29, 2023

@danny0405 implemented hive sync datasource, so to clarify about what brings this PR :

  • spark read hudi from path (recursive support for comments)
  • spark read hudi from a hive metastore with hoodie.datasource.hive_sync.sync_as_datasource=true (default) : support for first level of comments

#4960 provided:

  • spark read hudi from hive metastore with hoodie.datasource.hive_sync.sync_as_datasource=false : support for first level of comments
  • query engine (athena, trino..) access to comments from hive metastore

#8740 provided:

  • spark read hudi from glue metastore with hoodie.datasource.hive_sync.sync_as_datasource=false : support for first level of comments
  • query engine (athena, trino..) access to comments from glue metastore

Not covered yet:

  • flink support for comments
  • deltastreamer (not sure)

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

Thanks @parisni , would find some time for the review ~

@@ -212,8 +213,9 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
Map<String, String> tableProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_TABLE_PROPERTIES));
Map<String, String> serdeProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_SERDE_PROPERTIES));
if (config.getBoolean(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE)) {
List<FieldSchema> fromStorage = syncClient.getStorageFieldSchemas();
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromStorage -> fieldSchema

@@ -65,7 +65,7 @@ public void runSQL(String s) {
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + s);
stmt.execute(s);
stmt.execute(escapeAntiSlash(s));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the escape related with this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, both JDBC and HMS generate sql and antislash should be doubled escaped otherwise it is lost.

Antislash is used to escape double quotes in the comments DDL.


/**
* SQL statement should be be escaped in order to consider anti-slash
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sentence should end up with .. Every new paragraph should start with <p>, remove the params comments if there are no comments at all.

private static String getComment(String name, List<FieldSchema> fromStorage) {
return fromStorage.stream()
.filter(f -> name.equals(f.getName()))
.filter(f -> f.getComment().isPresent())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we only match from the first level for the fields by name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's the current limitation of comments support. It's a flatten list of fields.


private static String escapeQuote(String s) {
return s.replaceAll("\"", Matcher.quoteReplacement("\\\""));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need a escape.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see UT. I added a double quote in the column comment, and for some hive sync ddl exectutors, this is needed

@@ -133,7 +154,7 @@ private static String convertPrimitiveType(PrimitiveType field) {

private static String convertGroupField(GroupType field) {
if (field.getOriginalType() == null) {
return convertToSparkSchemaJson(field);
return convertToSparkSchemaJson(field, Arrays.asList());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean composition data type is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, currently in hive sync we don't have the composition data type List<FieldSchema> is flat

@parisni parisni requested a review from danny0405 August 14, 2023 19:37
messageType);
messageType,
// flink does not support comment yet
Arrays.asList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList() ?

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:minor everything else; usability gaps; questions; feature reqs schema-and-data-types size:M PR with lines of changes in (100, 300]
Projects
Status: 🏗 Under discussion
Development

Successfully merging this pull request may close these issues.

[SUPPORT] table comments not fully supported
3 participants