Skip to content

Commit

Permalink
[HUDI-6787] Implement the HoodieFileGroupReader API for Hive (#10422)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonvex authored Jun 9, 2024
1 parent 9f90647 commit 0abc00d
Show file tree
Hide file tree
Showing 26 changed files with 1,470 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void writeRecordsForPartition(SparkRDDWriteClient client, HoodieTestData
private List<GenericRecord> readRecords(String[] partitions) {
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf,
Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()),
basePath, new JobConf(storageConf.unwrap()), true, false);
basePath, new JobConf(storageConf.unwrap()), true, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths,
basePath(), new JobConf(storageConf().unwrap()), true, false);
basePath(), new JobConf(storageConf().unwrap()), true, populateMetaFields);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
try {
//disable for this test because it seems like we process mor in a different order?
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
} finally {
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
}

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
}

private long readTableTotalRecordsNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -38,6 +39,8 @@
import java.util.Map;
import java.util.function.UnaryOperator;

import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;

/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use, containing APIs for
* engine-specific implementation on reading data files, getting field values from a record,
Expand Down Expand Up @@ -197,7 +200,10 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param schema The Avro schema of the record.
* @return The record key in String.
*/
public abstract String getRecordKey(T record, Schema schema);
public String getRecordKey(T record, Schema schema) {
Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
return val.toString();
}

/**
* Gets the ordering value in particular type.
Expand All @@ -208,10 +214,23 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param props Properties.
* @return The ordering value.
*/
public abstract Comparable getOrderingValue(Option<T> recordOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props);
public Comparable getOrderingValue(Option<T> recordOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props) {
if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
}

if (!recordOption.isPresent()) {
return 0;
}

String orderingFieldName = ConfigUtils.getOrderingField(props);
Object value = getValue(recordOption.get(), schema, orderingFieldName);
return value != null ? (Comparable) value : 0;

}

/**
* Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,6 @@ public Object get(String key) {
}

public enum HoodieRecordType {
AVRO, SPARK
AVRO, SPARK, HIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;

/**
* Utility functions related to accessing the file storage on Hadoop.
*/
Expand Down Expand Up @@ -377,13 +379,24 @@ public static String getRelativePartitionPath(Path basePath, Path fullPartitionP
* the file name.
*/
public static String getFileIdFromLogPath(Path path) {
Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName());
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
}
return matcher.group(1);
}

/**
* Get the second part of the file name in the log file. That will be the delta commit time.
*/
public static String getDeltaCommitTimeFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
}
return matcher.group(2);
}

/**
* Check if the file is a base file of a log file. Then get the fileId appropriately.
*/
Expand Down
Loading

0 comments on commit 0abc00d

Please sign in to comment.