-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7610] Resolve issues for delete records #12122
base: master
Are you sure you want to change the base?
[HUDI-7610] Resolve issues for delete records #12122
Conversation
75557e7
to
d510d12
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments.
@@ -166,6 +167,12 @@ private List<HoodieRecord<T>> doMergedRead(Option<HoodieFileReader> baseFileRead | |||
String key = record.getRecordKey(); | |||
if (deltaRecordMap.containsKey(key)) { | |||
deltaRecordKeys.remove(key); | |||
// When internal operation exists, it means there are at least one delete in between. | |||
// Therefore, no need to merge with the base record. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we can check if value is equal to "Delete" operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record from log record reader may not be a delete record. So its operation may not be Delete
operation.
HoodieRecord finalRecord = latestHoodieRecord.copy(); | ||
|
||
// Reserve the delete information. | ||
if (prevRecord.isDelete(readerSchema, this.getPayloadProps()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see BaseHMergedLogRecordScanner L98ish, we account for payload props, and tableConfig.getPreombineKey.
why we are not accounting for tableConfig here.
If we are sure payload props is good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try to add some utils methods and use it across the board. for eg, to account for both payloadProps and tableConfig.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bcoz, we could be using 1.x hudi binary to read a 0.10.x table as well. So, payload props may not be present.
btw, can we check if payload props are set while using it as a reader?
bcoz, on the reader, only info we have is the table config. So, I would expect we read the table config and set these payload props.
If not, payload props might be empty even while reading a 1.x table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked the code logic. For fg, the payload properties are derived from table properties. For non-fg, the payload only contain one filed precombined field. I have updated the code to add table properties initially.
&& existingOrderingVal.compareTo(deleteOrderingVal) > 0; | ||
Comparable deleteOrderingVal = readerContext.getOrderingValue( | ||
Option.empty(), Collections.emptyMap(), readerSchema, orderingFieldName, orderingFieldType, orderingFieldDefault); | ||
deleteOrderingVal = deleteRecord.getOrderingValue() == null ? deleteOrderingVal : deleteRecord.getOrderingValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, if we are going to take a stand that delete will just override any previous value, why can't we just set the default ordering value (readerContext.castValue(0, orderingFieldType)) as the ordering value for delete records. and it make the code simple and easy to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default ordering value like (0) should be lower than the new inserted records with the same key, then the delete records are lost. Therefore, we have to use some extra flag to remember if a delete without valid ordering value exists.
boolean chooseExisting = !deleteOrderingVal.equals(0) | ||
&& ReflectionUtils.isSameClass(existingOrderingVal, deleteOrderingVal) | ||
&& existingOrderingVal.compareTo(deleteOrderingVal) > 0; | ||
Comparable deleteOrderingVal = readerContext.getOrderingValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a boolean value named "orderingValueMissing" and so code readabiltiy is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use default value directly, and create a function to indicate if we found a processing time delete.
Some(projection.apply(data)) | ||
|
||
// Delete records are in-between; no merge is needed. | ||
if (newRecord.getMetaDataInfo(HoodieReaderContext.INTERNAL_META_OPERATION).isPresent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add an example and say why we need this processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
val actualMinusExpected = finalDf.except(expectedDf) | ||
|
||
expectedMinusActual.show(false) | ||
actualMinusExpected.show(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for MOR, can we trigger one final compaction and ensure the expected value stays intact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
9266428
to
9f23edc
Compare
9f23edc
to
2552175
Compare
@@ -141,6 +141,7 @@ public void setShouldMergeUseRecordPosition(boolean shouldMergeUseRecordPosition | |||
public static final String INTERNAL_META_OPERATION = "_3"; | |||
public static final String INTERNAL_META_INSTANT_TIME = "_4"; | |||
public static final String INTERNAL_META_SCHEMA = "_5"; | |||
public static final String PROCESSING_TIME_BASED_DELETE_FOUND = "_6"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets call this DELETE_FOUND_WITHOUT_ORDERING_VALUE
so its more clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly minor comments
* 2. The current record's metadata contains the flag: PROCESSING_TIME_BASED_DELETE_FOUND. | ||
*/ | ||
private <T> boolean hasProcessingTimeBasedDelete(HoodieRecord<T> record) throws IOException { | ||
return (record.isDelete(readerSchema, getPayloadProps()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we flip the comparison. lets first check (record.getMetadata().isPresent() && record.getMetaDataInfo(PROCESSING_TIME_BASED_DELETE_FOUND).isPresent()
and then check for ordering value from the record.
return null; | ||
} | ||
} else { | ||
throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean that we can only support ordering values of certain data types? I am not asking to fix it in this patch. just trying to gauge whats the limitation we have around this
// Here existing record represents newer record with the same key, which can be a delete or non-delete record. | ||
// Therefore, we should use event time based merging if possible. So, the newer record is returned if | ||
// 1. the delete is processing time based, or | ||
// 2. the delete is event time based, and has higher value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor.
2. delete is event time based, and the existing record has higher value.
mode(SaveMode.Append). | ||
save(basePath) | ||
|
||
val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets do a round of validation before compaction. and once after compaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and lets validate that in case of MOR table, compaction has not kicked in while we do the first validation.
and post compaction, lets validate that 1 compaction commit is seen.
option(RECORDKEY_FIELD.key(), "key"). | ||
option(PRECOMBINE_FIELD.key(), "ts"). | ||
option(TABLE_TYPE.key(), tableType). | ||
option(OPERATION.key(), "delete"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you set HoodieCompactionConfig.INLINE_COMPACT.key() = false for these ingests. so that we know compaction does not kick in.
we added automatic compaction for spark data source writes (which is after 5 commits).
So, lets ensure we do not trigger compaction unless we explicitly want to
option(RECORDKEY_FIELD.key(), "key"). | ||
option(PRECOMBINE_FIELD.key(), "ts"). | ||
option(TABLE_TYPE.key(), tableType). | ||
option(HoodieCompactionConfig.INLINE_COMPACT.key(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may need to set hoodie.compact.inline.max.delta.commits = 1 here. so we know for sure compaction kicked in for MOR table.
just by setting HoodieCompactionConfig.INLINE_COMPACT.key() = true, compaction may not kick in.
Change Logs
The main problem we face for delete logic is that some
DeleteRecord
do not have validorderingVal
field. This is not a problem for processing time based merging, but it breaks the event time based merging. The fundamental solution is to preserve theorderingValue
field forDeleteRecord
, which may not be possible or easy in reality; we don't attempt to do that in this PR. Here we focus on making the delete logic reasonable and consistent across fg and non-fg readers, spark/avro record types. This problem is mainly for MOR table.For a given record key RK, suppose we have a series of operations on it, like insert, update, delete, update, delete, update, etc. That is, we have a series of records, i.e., br1, lfr1, lfr2, lfr3, lfr4, etc.
(1) If all records have the orderingVal field, we can successfully merge based on event time, which is the happy path.
(2) If lfr3 is a delete record without ordering value, we don't have enough information to merge it with other records based on event time.
Here a reasonable assumption is: all records before this delete record, i.e., its commit time is bigger, can be considered as processing time based. But records that are newer than the delete record, can keep merging based on event time. In this way, we combined processing time and event time in a logical way, which is universal for all Spark/Avro, COW/MOR, with FG or without FG.
To implement, we create a metadata entry "PROCESSING_TIME_BASED_DELETE_FOUND" to indicate that a processing time based delete has been found; any further merging should be skipped.
(1) For non-fg reader, we store the flag into the HoodieRecord.metadata field. For further merging, this flag is kept, which is used to skip merging with base file record.
(2) For fg reader, we store the the flag into the metadata field of the record buffer. All further merging should be skipped.
Impact
Make the delete logic consistent across different record types, and fg and non-fg readers.
Risk level (write none, low medium or high below)
Medium.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist