Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7436] Fix the conditions for determining whether the records need to be rewritten #10727

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
Schema newWriterSchema = AvroInternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName());
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
&& SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType()
== org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when the column size equals, there is no need to rewrite no matter whether the schema is compatible?

Copy link
Contributor

@xiarixiaoyao xiarixiaoyao Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @ThinkerLei
i think again.
SameCols. size() == colNamesFromWriteScheme. size() only happen in following scence
The table has new columns, while the old columns have not been changed(rename, type change).
eg:

write schema: a string, b int, c long
read schema: a string, b int, c long, d int

In this case
SameCols. size() == colNamesFromWriteScheme. size().
and, writeSchema is equivalent to a pruned readschema.

However, some versions of AVRO, such as AVRO 1.8. x , may report errors when using pruned schemas to read AVRO files. (avro 1.10x has no such problem)

Therefore, even if sameCols. size() == colNamesFromWriteScheme. size(), we still need to check the compatibility of the read-write schema. If it is compatible, we can directly use this writeSchema to read avo data.

maybe we can use following logic to avoid unnecessary rewrite.

boolean needToReWriteRecord =  sameCols.size() != colNamesFromWriteSchema.size() || 
!SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405
This place can actually raise an additional question,
Now when we are reading the MOR table, we pass the full schema when reading the AVRO log; Even if we only query one column, if this table has 100 rows of avro logs, using full schema to read data and generate BitCatstMap will consume a lot of memory, and the performance will not be good.
now our current version of Avro has been upgraded to 1.10. x. In fact, we can pass pruned schemas directly when reading logs. This way, when reading logs and generating bitcastmaps, the speed and memory consumption are much better.
Forgive me for that i can not paste test pic due to company information security reasons

presto read hudi log

pass full schema, we will see following log
Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 712,956,000
final query time: 35672ms

pass puned schema
Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 45,500,000
final query time: 13373ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point for optimization, we introduce some changes like the dynamic read schema based on write schema in release 1.x as for the HoodieFileGroupReader, but I'm not sure whether it is applied automically for all the read paths, cc @yihua for confirming this.

And anyway, I think we should have such optimization in 0.x branch and master for the legacy HoodieMergedLogRecordReader which will still be benefic to engines line Flink and Hive.

@xiarixiaoyao do you have intreast to contribute this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 @ThinkerLei i think again. SameCols. size() == colNamesFromWriteScheme. size() only happen in following scence The table has new columns, while the old columns have not been changed(rename, type change). eg:

write schema: a string, b int, c long
read schema: a string, b int, c long, d int

In this case SameCols. size() == colNamesFromWriteScheme. size(). and, writeSchema is equivalent to a pruned readschema.

However, some versions of AVRO, such as AVRO 1.8. x , may report errors when using pruned schemas to read AVRO files. (avro 1.10x has no such problem)

Therefore, even if sameCols. size() == colNamesFromWriteScheme. size(), we still need to check the compatibility of the read-write schema. If it is compatible, we can directly use this writeSchema to read avo data.

maybe we can use following logic to avoid unnecessary rewrite.

boolean needToReWriteRecord =  sameCols.size() != colNamesFromWriteSchema.size() || 
!SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE

This was the logic I initially fixed. Do I still need to make changes based on this PR? cc @danny0405 @xiarixiaoyao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's change it back, we better have some test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point for optimization, we introduce some changes like the dynamic read schema based on write schema in release 1.x as for the HoodieFileGroupReader, but I'm not sure whether it is applied automically for all the read paths, cc @yihua for confirming this.

And anyway, I think we should have such optimization in 0.x branch and master for the legacy HoodieMergedLogRecordReader which will still be benefic to engines line Flink and Hive.

@xiarixiaoyao do you have intreast to contribute this?

will try

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao This info is valuable. Basically using pruned schema to read Avro records is supported on Avro 1.10 and above, not on lower versions. I see that Spark 3.2 and above and all Flink versions use Avro 1.10 and above. So for these integrations and others that rely on Avro 1.10 and above, we should use pruned schema to read log records to improve performance. I'll check the new file group reader.

if (needToReWriteRecord) {
Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
return Option.of(record -> {
Expand Down
Loading