Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6787] DO NOT MERGE. Test hive3 in ghactions #11398

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
aa6e1c2
squash commits
Dec 28, 2023
3674390
refactor a bit and add some comments
Dec 28, 2023
edad863
make build properly
Dec 28, 2023
b764026
fix some of the failing tests
Dec 28, 2023
6647869
revert to old impl when schema evolution enabled
Dec 28, 2023
b43949b
disable fg reader for stupid test
Dec 28, 2023
2bc441f
fix some failing tests
Dec 28, 2023
1bcbab8
assigned the ports backwards
Dec 28, 2023
6e5fadd
verbose output bundle validation
Dec 28, 2023
dc4ac6f
add volume to docker compose
Dec 28, 2023
8669cce
need to lowercase the field names omg
Dec 28, 2023
fdeea22
don't remove partition if it's listed in the schema
Dec 28, 2023
60fe6fa
put partition cols at end of output
Dec 28, 2023
ad3f3d3
invert filter
Dec 28, 2023
6a77480
support no base file, only log. support read from non-hudi table
Dec 29, 2023
51f91c7
disable for skip merge as well
Dec 29, 2023
1da1e76
fix non hoodie path read
Dec 29, 2023
2b1f172
revert setting precombine
Jan 2, 2024
7b45027
fix no meta cols table
Jan 2, 2024
16cff40
check if no requested fields
Jan 2, 2024
e115fd5
create empty schema properly
Jan 2, 2024
211bcb9
check if metadata folder exists
Jan 2, 2024
73bedb4
handle mor with no meta fields
Jan 2, 2024
3f412a1
disable reader for a test because mor seems to work different
Jan 2, 2024
51a47e4
delete partition column from the jobconf if it is written in the file
Jan 4, 2024
a8020bd
modify data schema due to partition column madness
Jan 4, 2024
25078f3
remove unused import
Jan 4, 2024
c225f52
add some comments
Jan 4, 2024
069a5ad
don't add partition fields when the data schema doesn't have them
Jan 5, 2024
185217d
address review feedback
Jan 19, 2024
6d58742
accidently put remove in for loop for combine reader
Jan 19, 2024
325a8b7
get building again
Jan 29, 2024
a012522
address some review comments
Jan 29, 2024
50bd39f
add reviewer suggested change
Feb 2, 2024
50a308d
add missing params fg reader
Feb 5, 2024
5a33e85
address some comments
Feb 20, 2024
18fbd92
Fix post rebase
codope May 31, 2024
9487e12
fix hudi table check
codope May 31, 2024
2201cb0
Address self feedback
codope May 31, 2024
da8e132
add mor testing for hive
Jun 5, 2024
723b5a2
add another query
Jun 5, 2024
ea23061
fix syntax on second query
Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,15 @@ public HoodieRecordMerger getRecordMerger(String mergerStrategy) {

@Override
public Object getValue(InternalRow row, Schema schema, String fieldName) {
return getFieldValueFromInternalRow(row, schema, fieldName);
}

@Override
public String getRecordKey(InternalRow row, Schema schema) {
return getFieldValueFromInternalRow(row, schema, RECORD_KEY_METADATA_FIELD).toString();
}

@Override
public Comparable getOrderingValue(Option<InternalRow> rowOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props) {
if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
}

if (!rowOption.isPresent()) {
return 0;
StructType structType = getCachedSchema(schema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
if (cachedNestedFieldPath.isDefined()) {
HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, nestedFieldPath);
} else {
return null;
}

String orderingFieldName = ConfigUtils.getOrderingField(props);
Object value = getFieldValueFromInternalRow(rowOption.get(), schema, orderingFieldName);
return value != null ? (Comparable) value : 0;
}

@Override
Expand All @@ -117,25 +102,13 @@ public InternalRow seal(InternalRow internalRow) {

@Override
public long extractRecordPosition(InternalRow record, Schema recordSchema, String fieldName, long providedPositionIfNeeded) {
Object position = getFieldValueFromInternalRow(record, recordSchema, fieldName);
Object position = getValue(record, recordSchema, fieldName);
if (position != null) {
return (long) position;
}
return providedPositionIfNeeded;
}

private Object getFieldValueFromInternalRow(InternalRow row, Schema recordSchema, String fieldName) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
if (cachedNestedFieldPath.isDefined()) {
HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, nestedFieldPath);
} else {
return null;
}
}

@Override
public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
UnsafeProjection projection = HoodieInternalRowUtils.generateUnsafeProjectionAlias(getCachedSchema(from), getCachedSchema(to));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part
}

override def mergeBootstrapReaders(skeletonFileIterator: ClosableIterator[InternalRow],
dataFileIterator: ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
skeletonRequiredSchema: Schema,
dataFileIterator: ClosableIterator[InternalRow],
dataRequiredSchema: Schema): ClosableIterator[InternalRow] = {
doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
dataFileIterator.asInstanceOf[ClosableIterator[Any]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths,
basePath(), new JobConf(storageConf().unwrap()), true, false);
basePath(), new JobConf(storageConf().unwrap()), true, populateMetaFields);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
try {
//disable for this test because it seems like we process mor in a different order?
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
} finally {
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
}

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
}

private long readTableTotalRecordsNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -36,6 +37,8 @@
import java.util.Map;
import java.util.function.UnaryOperator;

import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;

/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use, containing APIs for
* engine-specific implementation on reading data files, getting field values from a record,
Expand Down Expand Up @@ -113,7 +116,10 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param schema The Avro schema of the record.
* @return The record key in String.
*/
public abstract String getRecordKey(T record, Schema schema);
public String getRecordKey(T record, Schema schema) {
Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
return val.toString();
}

/**
* Gets the ordering value in particular type.
Expand All @@ -124,10 +130,23 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param props Properties.
* @return The ordering value.
*/
public abstract Comparable getOrderingValue(Option<T> recordOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props);
public Comparable getOrderingValue(Option<T> recordOption,
Map<String, Object> metadataMap,
Schema schema,
TypedProperties props) {
if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
}

if (!recordOption.isPresent()) {
return 0;
}

String orderingFieldName = ConfigUtils.getOrderingField(props);
Object value = getValue(recordOption.get(), schema, orderingFieldName);
return value != null ? (Comparable) value : 0;

}

/**
* Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging.
Expand Down Expand Up @@ -197,17 +216,20 @@ public Map<String, Object> updateSchemaAndResetOrderingValInMetadata(Map<String,
* skeleton file iterator, followed by all columns in the data file iterator
*
* @param skeletonFileIterator iterator over bootstrap skeleton files that contain hudi metadata columns
* @param dataFileIterator iterator over data files that were bootstrapped into the hudi table
* @param dataFileIterator iterator over data files that were bootstrapped into the hudi table
* @return iterator that concatenates the skeletonFileIterator and dataFileIterator
*/
public abstract ClosableIterator<T> mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator, ClosableIterator<T> dataFileIterator);
public abstract ClosableIterator<T> mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator,
Schema skeletonRequiredSchema,
ClosableIterator<T> dataFileIterator,
Schema dataRequiredSchema);

/**
* Creates a function that will reorder records of schema "from" to schema of "to"
* all fields in "to" must be in "from", but not all fields in "from" must be in "to"
*
* @param from the schema of records to be passed into UnaryOperator
* @param to the schema of records produced by UnaryOperator
* @param to the schema of records produced by UnaryOperator
* @return a function that takes in a record and returns the record with reordered columns
*/
public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
Expand All @@ -218,6 +240,10 @@ public Map<String, Object> updateSchemaAndResetOrderingValInMetadata(Map<String,
* @return the record position in the base file.
*/
public long extractRecordPosition(T record, Schema schema, String fieldName, long providedPositionIfNeeded) {
Object position = getValue(record, schema, fieldName);
if (position != null) {
return (long) position;
}
return providedPositionIfNeeded;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,6 @@ public Object get(String key) {
}

public enum HoodieRecordType {
AVRO, SPARK
AVRO, SPARK, HIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ private ClosableIterator<T> makeBaseFileIterator() throws IOException {
}

private Schema generateRequiredSchema() {
return maybeReorderForBootstrap(generateRequiredSchemaHelper());
}

private Schema generateRequiredSchemaHelper() {
//might need to change this if other queries than mor have mandatory fields
if (logFiles.isEmpty()) {
return requestedSchema;
Expand All @@ -192,10 +196,10 @@ private Schema generateRequiredSchema() {
}

if (addedFields.isEmpty()) {
return maybeReorderForBootstrap(requestedSchema);
return requestedSchema;
}

return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, addedFields));
return appendFieldsToSchema(requestedSchema, addedFields);
}

private Schema maybeReorderForBootstrap(Schema input) {
Expand Down Expand Up @@ -229,35 +233,36 @@ private Schema createSchemaFromFields(List<Schema.Field> fields) {

private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
Pair<List<Schema.Field>, List<Schema.Field>> requiredFields =
getDataAndMetaCols(requiredSchema);
Pair<List<Schema.Field>, List<Schema.Field>> allFields = getDataAndMetaCols(dataSchema);

Option<ClosableIterator<T>> dataFileIterator =
requiredFields.getRight().isEmpty() ? Option.empty() :
Option.of(readerContext.getFileRecordIterator(
dataFile.getStoragePath(), 0,
dataFile.getFileLen(),
createSchemaFromFields(allFields.getRight()),
createSchemaFromFields(requiredFields.getRight()), storage));

Option<ClosableIterator<T>> skeletonFileIterator =
requiredFields.getLeft().isEmpty() ? Option.empty() :
Option.of(readerContext.getFileRecordIterator(
baseFile.getStoragePath(), 0,
baseFile.getFileLen(),
createSchemaFromFields(allFields.getLeft()),
createSchemaFromFields(requiredFields.getLeft()), storage));
Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = getDataAndMetaCols(requiredSchema);
Pair<List<Schema.Field>,List<Schema.Field>> allFields = getDataAndMetaCols(dataSchema);
Option<Pair<ClosableIterator<T>,Schema>> dataFileIterator =
makeBootstrapBaseFileIteratorHelper(requiredFields.getRight(), allFields.getRight(), dataFile);
Option<Pair<ClosableIterator<T>,Schema>> skeletonFileIterator =
makeBootstrapBaseFileIteratorHelper(requiredFields.getLeft(), allFields.getLeft(), baseFile);
if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
throw new IllegalStateException("should not be here if only partition cols are required");
} else if (!dataFileIterator.isPresent()) {
return skeletonFileIterator.get();
return skeletonFileIterator.get().getLeft();
} else if (!skeletonFileIterator.isPresent()) {
return dataFileIterator.get();
return dataFileIterator.get().getLeft();
} else {
return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(),
dataFileIterator.get());
return readerContext.mergeBootstrapReaders(skeletonFileIterator.get().getLeft(), skeletonFileIterator.get().getRight(),
dataFileIterator.get().getLeft(), dataFileIterator.get().getRight());
}
}

private Option<Pair<ClosableIterator<T>,Schema>> makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields,
List<Schema.Field> allFields,
BaseFile file) throws IOException {
if (requiredFields.isEmpty()) {
return Option.empty();
}
Schema requiredSchema = createSchemaFromFields(requiredFields);
return Option.of(Pair.of(readerContext.getFileRecordIterator(
file.getStoragePath(), 0,
file.getFileLen(),
createSchemaFromFields(allFields),
createSchemaFromFields(requiredFields), storage), requiredSchema));
}

/**
Expand All @@ -284,7 +289,6 @@ public T next() {
}

private void scanLogFiles() {
String path = readerState.tablePath;
HoodieMergedLogRecordReader logRecordReader = HoodieMergedLogRecordReader.newBuilder()
.withHoodieReaderContext(readerContext)
.withStorage(storage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ public IndexedRecord seal(IndexedRecord record) {
}

@Override
public ClosableIterator<IndexedRecord> mergeBootstrapReaders(
ClosableIterator<IndexedRecord> skeletonFileIterator,
ClosableIterator<IndexedRecord> dataFileIterator) {
public ClosableIterator<IndexedRecord> mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, Schema skeletonRequiredSchema,
ClosableIterator<IndexedRecord> dataFileIterator, Schema dataRequiredSchema) {
return null;
}

Expand Down
Loading
Loading