Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7436] Fix the conditions for determining whether the records need to be rewritten #10727

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ThinkerLei
Copy link
Contributor

@ThinkerLei ThinkerLei commented Feb 22, 2024

Change Logs

Fix the conditions for determining whether the records need to be rewritten

Impact

low

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@ThinkerLei
Copy link
Contributor Author

@xiarixiaoyao PTAL when you have free time, thanks a lot~

@ThinkerLei ThinkerLei changed the title HUDI-7436:Fix the conditions for determining whether the records need to be rewritten [HUDI-7436] Fix the conditions for determining whether the records need to be rewritten Feb 22, 2024
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() ||
!(SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need avoid unnecessary rewrite overhead. Thanks for your fix.

@xiarixiaoyao
Copy link
Contributor

@ThinkerLei pls fix checkstyle thanks

@ThinkerLei
Copy link
Contributor Author

@ThinkerLei pls fix checkstyle thanks

Thanks for your comment, it has been modified

@ThinkerLei
Copy link
Contributor Author

@ThinkerLei pls fix checkstyle thanks

Thanks for your comment, it has been modified

@xiarixiaoyao @danny0405 The failed CI has nothing to do with my modifications, anyone can help me to trigger the CI again.

@github-actions github-actions bot added the size:XS PR with lines of changes in <= 10 label Feb 26, 2024
@@ -202,7 +202,9 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
Schema newWriterSchema = AvroInternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName());
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|| !(SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here. If the schema is not compatible, the merging should fail here, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the incompatible avro schema be blent together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @yihua Thanks for your comment. What you said makes sense to me. Is it reasonable to use needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() && SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original logic right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original logic right?

The original logic is boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is no need for rewrite if the schema is compatible and the fields number equals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much experience on this, @xiarixiaoyao do you have any suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could find another way to determine whether the newWriterSchema is consistent with the writeSchemaFromFile. If they are consistent, then there would be no need for rewriting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic has certain performance issues
If the read-write schema is compatible, i think we no need rewrite the entire record. since we can read from old parquet file by new schema correctly.

@danny0405 @ThinkerLei

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

Nice ping for @xiarixiaoyao ~

|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
&& SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType()
== org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when the column size equals, there is no need to rewrite no matter whether the schema is compatible?

Copy link
Contributor

@xiarixiaoyao xiarixiaoyao Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @ThinkerLei
i think again.
SameCols. size() == colNamesFromWriteScheme. size() only happen in following scence
The table has new columns, while the old columns have not been changed(rename, type change).
eg:

write schema: a string, b int, c long
read schema: a string, b int, c long, d int

In this case
SameCols. size() == colNamesFromWriteScheme. size().
and, writeSchema is equivalent to a pruned readschema.

However, some versions of AVRO, such as AVRO 1.8. x , may report errors when using pruned schemas to read AVRO files. (avro 1.10x has no such problem)

Therefore, even if sameCols. size() == colNamesFromWriteScheme. size(), we still need to check the compatibility of the read-write schema. If it is compatible, we can directly use this writeSchema to read avo data.

maybe we can use following logic to avoid unnecessary rewrite.

boolean needToReWriteRecord =  sameCols.size() != colNamesFromWriteSchema.size() || 
!SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405
This place can actually raise an additional question,
Now when we are reading the MOR table, we pass the full schema when reading the AVRO log; Even if we only query one column, if this table has 100 rows of avro logs, using full schema to read data and generate BitCatstMap will consume a lot of memory, and the performance will not be good.
now our current version of Avro has been upgraded to 1.10. x. In fact, we can pass pruned schemas directly when reading logs. This way, when reading logs and generating bitcastmaps, the speed and memory consumption are much better.
Forgive me for that i can not paste test pic due to company information security reasons

presto read hudi log

pass full schema, we will see following log
Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 712,956,000
final query time: 35672ms

pass puned schema
Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 45,500,000
final query time: 13373ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point for optimization, we introduce some changes like the dynamic read schema based on write schema in release 1.x as for the HoodieFileGroupReader, but I'm not sure whether it is applied automically for all the read paths, cc @yihua for confirming this.

And anyway, I think we should have such optimization in 0.x branch and master for the legacy HoodieMergedLogRecordReader which will still be benefic to engines line Flink and Hive.

@xiarixiaoyao do you have intreast to contribute this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @ThinkerLei i think again. SameCols. size() == colNamesFromWriteScheme. size() only happen in following scence The table has new columns, while the old columns have not been changed(rename, type change). eg:

write schema: a string, b int, c long
read schema: a string, b int, c long, d int

In this case SameCols. size() == colNamesFromWriteScheme. size(). and, writeSchema is equivalent to a pruned readschema.

However, some versions of AVRO, such as AVRO 1.8. x , may report errors when using pruned schemas to read AVRO files. (avro 1.10x has no such problem)

Therefore, even if sameCols. size() == colNamesFromWriteScheme. size(), we still need to check the compatibility of the read-write schema. If it is compatible, we can directly use this writeSchema to read avo data.

maybe we can use following logic to avoid unnecessary rewrite.

boolean needToReWriteRecord =  sameCols.size() != colNamesFromWriteSchema.size() || 
!SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE

This was the logic I initially fixed. Do I still need to make changes based on this PR? cc @danny0405 @xiarixiaoyao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's change it back, we better have some test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point for optimization, we introduce some changes like the dynamic read schema based on write schema in release 1.x as for the HoodieFileGroupReader, but I'm not sure whether it is applied automically for all the read paths, cc @yihua for confirming this.

And anyway, I think we should have such optimization in 0.x branch and master for the legacy HoodieMergedLogRecordReader which will still be benefic to engines line Flink and Hive.

@xiarixiaoyao do you have intreast to contribute this?

will try

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao This info is valuable. Basically using pruned schema to read Avro records is supported on Avro 1.10 and above, not on lower versions. I see that Spark 3.2 and above and all Flink versions use Avro 1.10 and above. So for these integrations and others that rely on Avro 1.10 and above, we should use pruned schema to read log records to improve performance. I'll check the new file group reader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XS PR with lines of changes in <= 10
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants