Skip to content

Commit

Permalink
Handle lost deletes in between
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Oct 17, 2024
1 parent 392acec commit 2de66be
Show file tree
Hide file tree
Showing 20 changed files with 89 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public FailOnFirstErrorWriteStatus(Boolean trackSuccessRecords, Double failureFr
}

@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, Object>> optionalRecordMetadata) {
LOG.error(String.format("Error writing record %s with data %s and optionalRecordMetadata %s", record, record.getData(),
optionalRecordMetadata.orElse(Collections.emptyMap())), t);
throw new HoodieException("Error writing record " + record, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public WriteStatus() {
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation.
*/
public void markSuccess(HoodieRecord record, Option<Map<String, String>> optionalRecordMetadata) {
public void markSuccess(HoodieRecord record, Option<Map<String, Object>> optionalRecordMetadata) {
if (trackSuccessRecords) {
writtenRecordDelegates.add(HoodieRecordDelegate.fromHoodieRecord(record));
}
Expand All @@ -108,18 +108,18 @@ public void markSuccess(HoodieRecord record, Option<Map<String, String>> optiona
* @see WriteStatus#markSuccess(HoodieRecord, Option)
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public void markSuccess(HoodieRecordDelegate recordDelegate, Option<Map<String, String>> optionalRecordMetadata) {
public void markSuccess(HoodieRecordDelegate recordDelegate, Option<Map<String, Object>> optionalRecordMetadata) {
if (trackSuccessRecords) {
writtenRecordDelegates.add(Objects.requireNonNull(recordDelegate));
}
updateStatsForSuccess(optionalRecordMetadata);
}

private void updateStatsForSuccess(Option<Map<String, String>> optionalRecordMetadata) {
private void updateStatsForSuccess(Option<Map<String, Object>> optionalRecordMetadata) {
totalRecords++;

// get the min and max event time for calculating latency and freshness
String eventTimeVal = optionalRecordMetadata.orElse(Collections.emptyMap())
String eventTimeVal = (String) optionalRecordMetadata.orElse(Collections.emptyMap())
.getOrDefault(METADATA_EVENT_TIME_KEY, null);
if (isNullOrEmpty(eventTimeVal)) {
return;
Expand Down Expand Up @@ -151,7 +151,7 @@ private void updateStatsForSuccess(Option<Map<String, String>> optionalRecordMet
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation.
*/
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, Object>> optionalRecordMetadata) {
if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
// Guaranteed to have at-least one error
failedRecords.add(Pair.of(HoodieRecordDelegate.fromHoodieRecord(record), t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
}

private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
Option<Map<String, Object>> recordMetadata = hoodieRecord.getMetadata();
Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
try {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
Expand Down Expand Up @@ -489,7 +489,7 @@ public boolean canWrite(HoodieRecord record) {

@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
Option<Map<String, String>> recordMetadata = record.getMetadata();
Option<Map<String, Object>> recordMetadata = record.getMetadata();
try {
init(record);
flushToDiskIfRequired(record, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public boolean canWrite(HoodieRecord record) {
*/
@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
Option<Map<String, String>> recordMetadata = record.getMetadata();
Option<Map<String, Object>> recordMetadata = record.getMetadata();
try {
if (!HoodieOperation.isDelete(record.getOperation()) && !record.isDelete(schema, config.getProps())) {
if (record.shouldIgnore(schema, config.getProps())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Set;

import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;

Expand Down Expand Up @@ -166,6 +167,11 @@ private List<HoodieRecord<T>> doMergedRead(Option<HoodieFileReader> baseFileRead
String key = record.getRecordKey();
if (deltaRecordMap.containsKey(key)) {
deltaRecordKeys.remove(key);
// This orderingValue stored in metadata means deletes are in between.
if (deltaRecordMap.get(key).getMetaDataInfo(INTERNAL_META_ORDERING_FIELD).isPresent()) {
mergedRecords.add(deltaRecordMap.get(key));
continue;
}
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
.merge(record, readerSchema, deltaRecordMap.get(key), readerSchema, config.getPayloadConfig().getProps());
if (!mergeResult.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@
*/
public class MetadataMergeWriteStatus extends WriteStatus {

private Map<String, String> mergedMetadataMap = new HashMap<>();
private Map<String, Object> mergedMetadataMap = new HashMap<>();

public MetadataMergeWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
}

public static Map<String, String> mergeMetadataForWriteStatuses(List<WriteStatus> writeStatuses) {
Map<String, String> allWriteStatusMergedMetadataMap = new HashMap<>();
public static Map<String, Object> mergeMetadataForWriteStatuses(List<WriteStatus> writeStatuses) {
Map<String, Object> allWriteStatusMergedMetadataMap = new HashMap<>();
for (WriteStatus writeStatus : writeStatuses) {
MetadataMergeWriteStatus.mergeMetadataMaps(((MetadataMergeWriteStatus) writeStatus).getMergedMetadataMap(),
allWriteStatusMergedMetadataMap);
}
return allWriteStatusMergedMetadataMap;
}

private static void mergeMetadataMaps(Map<String, String> mergeFromMap, Map<String, String> mergeToMap) {
for (Entry<String, String> entry : mergeFromMap.entrySet()) {
private static void mergeMetadataMaps(Map<String, Object> mergeFromMap, Map<String, Object> mergeToMap) {
for (Entry<String, Object> entry : mergeFromMap.entrySet()) {
String key = entry.getKey();
if (!mergeToMap.containsKey(key)) {
mergeToMap.put(key, "0");
}
mergeToMap.put(key, addStrsAsInt(entry.getValue(), mergeToMap.get(key)));
mergeToMap.put(key, addStrsAsInt((String) entry.getValue(), (String) mergeToMap.get(key)));
}
}

Expand All @@ -64,22 +64,22 @@ private static String addStrsAsInt(String a, String b) {
}

@Override
public void markSuccess(HoodieRecord record, Option<Map<String, String>> recordMetadata) {
public void markSuccess(HoodieRecord record, Option<Map<String, Object>> recordMetadata) {
super.markSuccess(record, recordMetadata);
if (recordMetadata.isPresent()) {
mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap);
}
}

@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> recordMetadata) {
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, Object>> recordMetadata) {
super.markFailure(record, t, recordMetadata);
if (recordMetadata.isPresent()) {
mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap);
}
}

private Map<String, String> getMergedMetadataMap() {
private Map<String, Object> getMergedMetadataMap() {
return mergedMetadataMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void testMetadataAggregateFromWriteStatus() throws Exception {
actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator())
.forEachRemaining(x -> writeStatuses.addAll((List<WriteStatus>)x));

Map<String, String> allWriteStatusMergedMetadataMap =
Map<String, Object> allWriteStatusMergedMetadataMap =
MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(writeStatuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P
}

@Override
public Option<Map<String, String>> getMetadata() {
public Option<Map<String, Object>> getMetadata() {
// TODO HUDI-5282 support metaData
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testSuccessWithEventTime() {
WriteStatus status = new WriteStatus(false, 1.0);
status.setStat(new HoodieWriteStat());
for (int i = 0; i < 1000; i++) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, "");
status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
}
Expand All @@ -84,7 +84,7 @@ public void testSuccessWithEventTime() {
status = new WriteStatus(false, 1.0);
status.setStat(new HoodieWriteStat());
for (int i = 0; i < 1000; i++) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, null);
status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
}
Expand All @@ -99,7 +99,7 @@ public void testSuccessWithEventTime() {
long minSeconds = 0L;
long maxSeconds = 0L;
for (int i = 0; i < 1000; i++) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
long eventTime = System.currentTimeMillis() / 1000;
if (i == 0) {
minSeconds = eventTime;
Expand All @@ -120,7 +120,7 @@ public void testSuccessWithEventTime() {
minSeconds = 0L;
maxSeconds = 0L;
for (int i = 0; i < 1000; i++) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
long eventTime = System.currentTimeMillis();
if (i == 0) {
minSeconds = eventTime;
Expand All @@ -139,7 +139,7 @@ public void testSuccessWithEventTime() {
status = new WriteStatus(false, 1.0);
status.setStat(new HoodieWriteStat());
for (int i = 0; i < 1000; i++) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(i));
status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testMetadataAggregateFromWriteStatus() throws Exception {

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
Map<String, String> allWriteStatusMergedMetadataMap =
Map<String, Object> allWriteStatusMergedMetadataMap =
MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(statuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void testMetadataAggregateFromWriteStatus() throws Exception {
return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
}).flatMap(Transformations::flattenAsIterator).collect();

Map<String, String> allWriteStatusMergedMetadataMap =
Map<String, Object> allWriteStatusMergedMetadataMap =
MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(writeStatuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public final UnaryOperator<T> projectRecord(Schema from, Schema to) {

public abstract Comparable castValue(Comparable value, Schema.Type newType);

public Comparable maxValue(Schema.Type schemaType) {
public static Comparable maxValue(Schema.Type schemaType) {
switch (schemaType) {
case INT:
return Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public HoodieAvroIndexedRecord(
HoodieKey key,
IndexedRecord data,
HoodieOperation operation,
Option<Map<String, String>> metaData) {
Option<Map<String, Object>> metaData) {
super(key, data, operation, metaData);
}

Expand Down Expand Up @@ -189,7 +189,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
}

@Override
public Option<Map<String, String>> getMetadata() {
public Option<Map<String, Object>> getMetadata() {
return Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
throw new UnsupportedOperationException();
}

public Option<Map<String, String>> getMetadata() {
public Option<Map<String, Object>> getMetadata() {
return getData().getMetadata();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Prop
}

@Override
public Option<Map<String, String>> getMetadata() {
public Option<Map<String, Object>> getMetadata() {
return Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -153,13 +154,13 @@ public String getFieldName() {
/**
* The metaData of the record.
*/
protected Option<Map<String, String>> metaData;
protected Option<Map<String, Object>> metaData;

public HoodieRecord(HoodieKey key, T data) {
this(key, data, null, Option.empty());
}

public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Option<Map<String, String>> metaData) {
public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Option<Map<String, Object>> metaData) {
this.key = key;
this.data = data;
this.currentLocation = null;
Expand Down Expand Up @@ -425,7 +426,25 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p
*/
public abstract HoodieRecord<T> copy();

public abstract Option<Map<String, String>> getMetadata();
public abstract Option<Map<String, Object>> getMetadata();

private void initMetadata() {
this.metaData = Option.of(new HashMap<>());
}

public void addMetadata(String key, Object value) {
if (metaData.isEmpty()) {
initMetadata();
}
metaData.get().put(key, value);
}

public Option<Object> getMetaDataInfo(String key) {
if (metaData.isPresent()) {
return Option.ofNullable(metaData.get().getOrDefault(key, null));
}
return Option.empty();
}

public static String generateSequenceId(String instantTime, int partitionId, long recordIndex) {
return instantTime + "_" + partitionId + "_" + recordIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public abstract class AbstractHoodieLogRecordReader {
protected final String preCombineField;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
private final TypedProperties payloadProps;
protected final TypedProperties payloadProps;
// Log File Paths
protected final List<String> logFilePaths;
// Reverse reader - Not implemented yet (NA -> Why do we need ?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
Expand All @@ -35,13 +36,15 @@
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.table.cdc.HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand Down Expand Up @@ -99,7 +102,13 @@ protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOExcepti
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
HoodieRecord finalRecord = latestHoodieRecord.copy();

// Handle delete lost for MOR tables.
if (prevRecord.isDelete(readerSchema, this.getPayloadProps())) {
finalRecord.addMetadata(INTERNAL_META_ORDERING_FIELD, HoodieReaderContext.maxValue(orderingFieldType));
}
records.put(key, finalRecord);
}
} else {
// Put the record as is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public HoodieRecord<ArrayWritable> copy() {
}

@Override
public Option<Map<String, String>> getMetadata() {
public Option<Map<String, Object>> getMetadata() {
// TODO HUDI-5282 support metaData
return Option.empty();
}
Expand Down
Loading

0 comments on commit 2de66be

Please sign in to comment.