-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8340] Fixing functional index record generation using spark distributed computation #12127
base: master
Are you sure you want to change the base?
[HUDI-8340] Fixing functional index record generation using spark distributed computation #12127
Conversation
bc6dbd7
to
1426d09
Compare
try { | ||
return (GenericRecord) payload.getInsertValue(schema).get(); | ||
return (GenericRecord) (r.getData() instanceof GenericRecord ? r.getData() | ||
: ((HoodieRecordPayload) r.getData()).getInsertValue(schema, new Properties()).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passed empty properties here. How do we usually pass properties for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I get your question. We pass it in from driver only (hoodieWriteConfig.getProps())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you test this once in real cluster. just to ensure we don't run into NotSerializable exception by any chance.
try { | ||
return (GenericRecord) payload.getInsertValue(schema).get(); | ||
return (GenericRecord) (r.getData() instanceof GenericRecord ? r.getData() | ||
: ((HoodieRecordPayload) r.getData()).getInsertValue(schema, new Properties()).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I get your question. We pass it in from driver only (hoodieWriteConfig.getProps())
} | ||
}) | ||
.map(converterToRow::apply) | ||
// .map(row -> RowFactory.create(path, row)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove L 223. (the commented out line)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
...t/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
Show resolved
Hide resolved
...t/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
Show resolved
Hide resolved
...park-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
Tested functional index in spark shell |
@hudi-bot run azure |
did you try this out? |
|
||
@Ignore | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember some hive sync related test was failing and hence entire class is disabled. unless you know you fixed the hive sync test, can you revert the unintended changes
Pushed an update to address my own comments. @codope @lokeshj1703 : do we know for bloom filter based index, we only need to maintain stats just for base file or even for log file? bcoz, one of the inner most method HoodieMetadataPayload.createBloomFilterMetadataRecord accepts only base file. But in general, we have made col stats and functional index stats w/ col stats index type maintained for every file(base file and log file). |
Yes tried it out in Spark shell |
a567a5c
to
c623adc
Compare
@@ -433,7 +433,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat | |||
} | |||
ValidationUtils.checkState(functionalIndexPartitionsToInit.size() == 1, "Only one functional index at a time is supported for now"); | |||
partitionName = functionalIndexPartitionsToInit.iterator().next(); | |||
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(partitionName); | |||
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(partitionName, commitTimeForPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be consistent on using commit
or instant
: commitTimeForPartition
and instantTime
.
@@ -537,27 +537,41 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition( | |||
return Pair.of(fileGroupCount, records); | |||
} | |||
|
|||
protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, FileSlice>> partitionFileSlicePairs, | |||
protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathPairs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rename partitionFilePathPairs
to contain size or use StoragePathInfo
instead of Pair<String, Long>
to maintain the readability?
Change Logs
Fixing functional index record generation using spark distributed computation.
Impact
NA
Risk level (write none, low medium or high below)
low
Documentation Update
NA
Contributor's checklist