Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6787] Integrate the new file group reader with Hive query engine #10422

Merged
merged 43 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
aa6e1c2
squash commits
Dec 28, 2023
3674390
refactor a bit and add some comments
Dec 28, 2023
edad863
make build properly
Dec 28, 2023
b764026
fix some of the failing tests
Dec 28, 2023
6647869
revert to old impl when schema evolution enabled
Dec 28, 2023
b43949b
disable fg reader for stupid test
Dec 28, 2023
2bc441f
fix some failing tests
Dec 28, 2023
1bcbab8
assigned the ports backwards
Dec 28, 2023
6e5fadd
verbose output bundle validation
Dec 28, 2023
dc4ac6f
add volume to docker compose
Dec 28, 2023
8669cce
need to lowercase the field names omg
Dec 28, 2023
fdeea22
don't remove partition if it's listed in the schema
Dec 28, 2023
60fe6fa
put partition cols at end of output
Dec 28, 2023
ad3f3d3
invert filter
Dec 28, 2023
6a77480
support no base file, only log. support read from non-hudi table
Dec 29, 2023
51f91c7
disable for skip merge as well
Dec 29, 2023
1da1e76
fix non hoodie path read
Dec 29, 2023
2b1f172
revert setting precombine
Jan 2, 2024
7b45027
fix no meta cols table
Jan 2, 2024
16cff40
check if no requested fields
Jan 2, 2024
e115fd5
create empty schema properly
Jan 2, 2024
211bcb9
check if metadata folder exists
Jan 2, 2024
73bedb4
handle mor with no meta fields
Jan 2, 2024
3f412a1
disable reader for a test because mor seems to work different
Jan 2, 2024
51a47e4
delete partition column from the jobconf if it is written in the file
Jan 4, 2024
a8020bd
modify data schema due to partition column madness
Jan 4, 2024
25078f3
remove unused import
Jan 4, 2024
c225f52
add some comments
Jan 4, 2024
069a5ad
don't add partition fields when the data schema doesn't have them
Jan 5, 2024
185217d
address review feedback
Jan 19, 2024
6d58742
accidently put remove in for loop for combine reader
Jan 19, 2024
325a8b7
get building again
Jan 29, 2024
a012522
address some review comments
Jan 29, 2024
50bd39f
add reviewer suggested change
Feb 2, 2024
50a308d
add missing params fg reader
Feb 5, 2024
5a33e85
address some comments
Feb 20, 2024
18fbd92
Fix post rebase
codope May 31, 2024
9487e12
fix hudi table check
codope May 31, 2024
2201cb0
Address self feedback
codope May 31, 2024
b29ff63
Merge branch 'master' into use_fg_reader_hive
Jun 7, 2024
33249cc
fix failing test
Jun 7, 2024
89a4a8c
Merge branch 'master' into use_fg_reader_hive
Jun 8, 2024
e95bcb8
fix failing test
Jun 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void writeRecordsForPartition(SparkRDDWriteClient client, HoodieTestData
private List<GenericRecord> readRecords(String[] partitions) {
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf,
Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()),
basePath, new JobConf(storageConf.unwrap()), true, false);
basePath, new JobConf(storageConf.unwrap()), true, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths,
basePath(), new JobConf(storageConf().unwrap()), true, false);
basePath(), new JobConf(storageConf().unwrap()), true, populateMetaFields);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
try {
//disable for this test because it seems like we process mor in a different order?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, what do you mean by this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/HUDI-7610 Delete behavior is inconsistent and imo undefined. This is one of the advantages of unifying all the readers with FGReader is that we can remove the inconsistency between engines.

jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
} finally {
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
}

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
}

private long readTableTotalRecordsNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -38,6 +39,8 @@
import java.util.Map;
import java.util.function.UnaryOperator;

import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;

/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use, containing APIs for
* engine-specific implementation on reading data files, getting field values from a record,
Expand Down Expand Up @@ -187,7 +190,10 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param schema The Avro schema of the record.
* @return The record key in String.
*/
public abstract String getRecordKey(T record, Schema schema);
public String getRecordKey(T record, Schema schema) {
Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
return val.toString();
}

/**
* Gets the ordering value in particular type.
Expand All @@ -198,10 +204,23 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param props Properties.
* @return The ordering value.
*/
public abstract Comparable getOrderingValue(Option<T> recordOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props);
public Comparable getOrderingValue(Option<T> recordOption,
codope marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props) {
if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
}

if (!recordOption.isPresent()) {
return 0;
}

String orderingFieldName = ConfigUtils.getOrderingField(props);
Object value = getValue(recordOption.get(), schema, orderingFieldName);
return value != null ? (Comparable) value : 0;

}

/**
* Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging.
Expand Down Expand Up @@ -301,6 +320,10 @@ public final UnaryOperator<T> projectRecord(Schema from, Schema to) {
* @return the record position in the base file.
*/
public long extractRecordPosition(T record, Schema schema, String fieldName, long providedPositionIfNeeded) {
Object position = getValue(record, schema, fieldName);
if (position != null) {
return (long) position;
}
return providedPositionIfNeeded;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,6 @@ public Object get(String key) {
}

public enum HoodieRecordType {
AVRO, SPARK
AVRO, SPARK, HIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;

/**
* Utility functions related to accessing the file storage on Hadoop.
*/
Expand Down Expand Up @@ -377,13 +379,24 @@ public static String getRelativePartitionPath(Path basePath, Path fullPartitionP
* the file name.
*/
public static String getFileIdFromLogPath(Path path) {
Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName());
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
}
return matcher.group(1);
}

/**
* Get the second part of the file name in the log file. That will be the delta commit time.
*/
public static String getDeltaCommitTimeFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.find()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
}
return matcher.group(2);
}

/**
* Check if the file is a base file of a log file. Then get the fileId appropriately.
*/
Expand Down
Loading
Loading