diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java index cda76154ca6a..f4e9d206f06b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java @@ -182,7 +182,7 @@ private void writeRecordsForPartition(SparkRDDWriteClient client, HoodieTestData private List readRecords(String[] partitions) { return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf, Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()), - basePath, new JobConf(storageConf.unwrap()), true, false); + basePath, new JobConf(storageConf.unwrap()), true, true); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b0876d061037..ae81a310190c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -213,7 +213,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) .collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths, - basePath(), new JobConf(storageConf().unwrap()), true, false); + basePath(), new JobConf(storageConf().unwrap()), true, populateMetaFields); // Wrote 20 records in 2 batches assertEquals(40, recordsRead.size(), "Must contain 40 records"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index e2ba56f94a35..ef28980d9cf9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; @@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException { @ParameterizedTest @MethodSource("writeLogTest") public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException { - Properties props = getPropertiesForKeyGen(true); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder() - .forTable("test-trip-table") - .withPath(basePath()) - .withSchema(TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withAutoCommit(true) - .withEmbeddedTimelineServerEnabled(enableTimelineServer) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withLayoutConfig(HoodieLayoutConfig.newBuilder() - .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) - .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) - .build(); - props.putAll(config.getProps()); - - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); - client = getHoodieWriteClient(config); - - final List records = dataGen.generateInserts("001", 100); - JavaRDD writeRecords = jsc().parallelize(records, 2); + try { + //disable for this test because it seems like we process mor in a different order? + jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); + Properties props = getPropertiesForKeyGen(true); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .forTable("test-trip-table") + .withPath(basePath()) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(true) + .withEmbeddedTimelineServerEnabled(enableTimelineServer) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withLayoutConfig(HoodieLayoutConfig.newBuilder() + .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) + .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); + client = getHoodieWriteClient(config); + + final List records = dataGen.generateInserts("001", 100); + JavaRDD writeRecords = jsc().parallelize(records, 2); + + // initialize 100 records + client.upsert(writeRecords, client.startCommit()); + // update 100 records + client.upsert(writeRecords, client.startCommit()); + // schedule compaction + client.scheduleCompaction(Option.empty()); + // delete 50 records + List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList()); + JavaRDD deleteRecords = jsc().parallelize(toBeDeleted, 2); + client.delete(deleteRecords, client.startCommit()); + // insert the same 100 records again + client.upsert(writeRecords, client.startCommit()); + Assertions.assertEquals(100, readTableTotalRecordsNum()); + } finally { + jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"); + } - // initialize 100 records - client.upsert(writeRecords, client.startCommit()); - // update 100 records - client.upsert(writeRecords, client.startCommit()); - // schedule compaction - client.scheduleCompaction(Option.empty()); - // delete 50 records - List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList()); - JavaRDD deleteRecords = jsc().parallelize(toBeDeleted, 2); - client.delete(deleteRecords, client.startCommit()); - // insert the same 100 records again - client.upsert(writeRecords, client.startCommit()); - Assertions.assertEquals(100, readTableTotalRecordsNum()); } private long readTableTotalRecordsNum() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 218e0eb4b03f..b79562f8b436 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.storage.HoodieStorage; @@ -38,6 +39,8 @@ import java.util.Map; import java.util.function.UnaryOperator; +import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; + /** * An abstract reader context class for {@code HoodieFileGroupReader} to use, containing APIs for * engine-specific implementation on reading data files, getting field values from a record, @@ -197,7 +200,10 @@ public abstract ClosableIterator getFileRecordIterator( * @param schema The Avro schema of the record. * @return The record key in String. */ - public abstract String getRecordKey(T record, Schema schema); + public String getRecordKey(T record, Schema schema) { + Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD); + return val.toString(); + } /** * Gets the ordering value in particular type. @@ -208,10 +214,23 @@ public abstract ClosableIterator getFileRecordIterator( * @param props Properties. * @return The ordering value. */ - public abstract Comparable getOrderingValue(Option recordOption, - Map metadataMap, - Schema schema, - TypedProperties props); + public Comparable getOrderingValue(Option recordOption, + Map metadataMap, + Schema schema, + TypedProperties props) { + if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) { + return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD); + } + + if (!recordOption.isPresent()) { + return 0; + } + + String orderingFieldName = ConfigUtils.getOrderingField(props); + Object value = getValue(recordOption.get(), schema, orderingFieldName); + return value != null ? (Comparable) value : 0; + + } /** * Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 76e640d927a5..442419d07139 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -476,6 +476,6 @@ public Object get(String key) { } public enum HoodieRecordType { - AVRO, SPARK + AVRO, SPARK, HIVE } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index 9b087482c72c..e8e92f6b4204 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -61,6 +61,8 @@ import java.util.regex.Matcher; import java.util.stream.Collectors; +import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN; + /** * Utility functions related to accessing the file storage on Hadoop. */ @@ -377,13 +379,24 @@ public static String getRelativePartitionPath(Path basePath, Path fullPartitionP * the file name. */ public static String getFileIdFromLogPath(Path path) { - Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName()); + Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.find()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); } return matcher.group(1); } + /** + * Get the second part of the file name in the log file. That will be the delta commit time. + */ + public static String getDeltaCommitTimeFromLogPath(Path path) { + Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); + if (!matcher.find()) { + throw new InvalidHoodiePathException(path.toString(), "LogFile"); + } + return matcher.group(2); + } + /** * Check if the file is a base file of a log file. Then get the fileId appropriately. */ diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java new file mode 100644 index 000000000000..904d4882cc95 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieEmptyRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames; + +/** + * {@link HoodieReaderContext} for Hive-specific {@link HoodieFileGroupReaderBasedRecordReader}. + */ +public class HiveHoodieReaderContext extends HoodieReaderContext { + protected final HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator; + protected final InputSplit split; + protected final JobConf jobConf; + protected final Reporter reporter; + protected final Schema writerSchema; + protected Map hosts; + protected final Map columnTypeMap; + private final ObjectInspectorCache objectInspectorCache; + private RecordReader firstRecordReader = null; + + private final List partitionCols; + private final Set partitionColSet; + + private final String recordKeyField; + + protected HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator, + InputSplit split, + JobConf jobConf, + Reporter reporter, + Schema writerSchema, + Map hosts, + HoodieTableMetaClient metaClient) { + this.readerCreator = readerCreator; + this.split = split; + this.jobConf = jobConf; + this.reporter = reporter; + this.writerSchema = writerSchema; + this.hosts = hosts; + this.partitionCols = getPartitionFieldNames(jobConf).stream().filter(n -> writerSchema.getField(n) != null).collect(Collectors.toList()); + this.partitionColSet = new HashSet<>(this.partitionCols); + String tableName = metaClient.getTableConfig().getTableName(); + recordKeyField = getRecordKeyField(metaClient); + this.objectInspectorCache = HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf); + this.columnTypeMap = objectInspectorCache.getColumnTypeMap(); + } + + /** + * If populate meta fields is false, then getRecordKeyFields() + * should return exactly 1 recordkey field. + */ + private static String getRecordKeyField(HoodieTableMetaClient metaClient) { + if (metaClient.getTableConfig().populateMetaFields()) { + return HoodieRecord.RECORD_KEY_METADATA_FIELD; + } + + Option recordKeyFieldsOpt = metaClient.getTableConfig().getRecordKeyFields(); + ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "No record key field set in table config, but populateMetaFields is disabled"); + ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "More than 1 record key set in table config, but populateMetaFields is disabled"); + return recordKeyFieldsOpt.get()[0]; + } + + private void setSchemas(JobConf jobConf, Schema dataSchema, Schema requiredSchema) { + List dataColumnNameList = dataSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList()); + List dataColumnTypeList = dataColumnNameList.stream().map(fieldName -> { + TypeInfo type = columnTypeMap.get(fieldName); + if (type == null) { + throw new IllegalArgumentException("Field: " + fieldName + ", does not have a defined type"); + } + return type; + }).collect(Collectors.toList()); + jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", dataColumnNameList)); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(","))); + // don't replace `f -> f.name()` with lambda reference + String readColNames = requiredSchema.getFields().stream().map(f -> f.name()).collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, requiredSchema.getFields() + .stream().map(f -> String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(","))); + } + + @Override + public HoodieStorage getStorage(String path, StorageConfiguration conf) { + return HoodieStorageUtils.getStorage(path, conf); + } + + @Override + public ClosableIterator getFileRecordIterator(StoragePath filePath, long start, long length, Schema dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException { + JobConf jobConfCopy = new JobConf(jobConf); + //move the partition cols to the end, because in some cases it has issues if we don't do that + Schema modifiedDataSchema = HoodieAvroUtils.generateProjectionSchema(dataSchema, Stream.concat(dataSchema.getFields().stream() + .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColSet.contains(n)), + partitionCols.stream().filter(c -> dataSchema.getField(c) != null)).collect(Collectors.toList())); + setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema); + InputSplit inputSplit = new FileSplit(new Path(filePath.toString()), start, length, hosts.get(filePath.toString())); + RecordReader recordReader = readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter); + if (firstRecordReader == null) { + firstRecordReader = recordReader; + } + ClosableIterator recordIterator = new RecordReaderValueIterator<>(recordReader); + if (modifiedDataSchema.equals(requiredSchema)) { + return recordIterator; + } + // record reader puts the required columns in the positions of the data schema and nulls the rest of the columns + return new CloseableMappingIterator<>(recordIterator, projectRecord(modifiedDataSchema, requiredSchema)); + } + + @Override + public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) { + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroRecord.getSchema(), true); + } + + @Override + public HoodieRecordMerger getRecordMerger(String mergerStrategy) { + if (mergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) { + return new HoodieHiveRecordMerger(); + } + throw new HoodieException(String.format("The merger strategy UUID is not supported, Default: %s, Passed: %s", mergerStrategy, DEFAULT_MERGER_STRATEGY_UUID)); + } + + @Override + public String getRecordKey(ArrayWritable record, Schema schema) { + return getValue(record, schema, recordKeyField).toString(); + } + + @Override + public Object getValue(ArrayWritable record, Schema schema, String fieldName) { + return StringUtils.isNullOrEmpty(fieldName) ? null : objectInspectorCache.getValue(record, schema, fieldName); + } + + @Override + public HoodieRecord constructHoodieRecord(Option recordOption, Map metadataMap) { + if (!recordOption.isPresent()) { + return new HoodieEmptyRecord<>(new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY), (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)), HoodieRecord.HoodieRecordType.HIVE); + } + Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA); + ArrayWritable writable = recordOption.get(); + return new HoodieHiveRecord(new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY), (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, objectInspectorCache); + } + + @Override + public ArrayWritable seal(ArrayWritable record) { + return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(), record.get().length)); + } + + @Override + public ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, + Schema skeletonRequiredSchema, + ClosableIterator dataFileIterator, + Schema dataRequiredSchema) { + int skeletonLen = skeletonRequiredSchema.getFields().size(); + int dataLen = dataRequiredSchema.getFields().size(); + return new ClosableIterator() { + + private final ArrayWritable returnWritable = new ArrayWritable(Writable.class); + + @Override + public boolean hasNext() { + if (dataFileIterator.hasNext() != skeletonFileIterator.hasNext()) { + throw new IllegalStateException("bootstrap data file iterator and skeleton file iterator are out of sync"); + } + return dataFileIterator.hasNext(); + } + + @Override + public ArrayWritable next() { + Writable[] skeletonWritable = skeletonFileIterator.next().get(); + Writable[] dataWritable = dataFileIterator.next().get(); + Writable[] mergedWritable = new Writable[skeletonLen + dataLen]; + System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen); + System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen, dataLen); + returnWritable.set(mergedWritable); + return returnWritable; + } + + @Override + public void close() { + skeletonFileIterator.close(); + dataFileIterator.close(); + } + }; + } + + @Override + public UnaryOperator projectRecord(Schema from, Schema to, Map renamedColumns) { + if (!renamedColumns.isEmpty()) { + throw new IllegalStateException("Schema evolution is not supported in the filegroup reader for Hive currently"); + } + return HoodieArrayWritableAvroUtils.projectRecord(from, to); + } + + public UnaryOperator reverseProjectRecord(Schema from, Schema to) { + return HoodieArrayWritableAvroUtils.reverseProject(from, to); + } + + public long getPos() throws IOException { + if (firstRecordReader != null) { + return firstRecordReader.getPos(); + } + throw new IllegalStateException("getPos() should not be called before a record reader has been initialized"); + } + + public float getProgress() throws IOException { + if (firstRecordReader != null) { + return firstRecordReader.getProgress(); + } + throw new IllegalStateException("getProgress() should not be called before a record reader has been initialized"); + } + +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java new file mode 100644 index 000000000000..efbf68c8e0f2 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED; +import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE; +import static org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE; +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.fs.FSUtils.getCommitTime; +import static org.apache.hudi.common.fs.FSUtils.getFileId; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getDeltaCommitTimeFromLogPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFileIdFromLogPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFs; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableBasePath; + +/** + * {@link HoodieFileGroupReader} based implementation of Hive's {@link RecordReader} for {@link ArrayWritable}. + */ +public class HoodieFileGroupReaderBasedRecordReader implements RecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieFileGroupReaderBasedRecordReader.class); + + public interface HiveReaderCreator { + org.apache.hadoop.mapred.RecordReader getRecordReader( + final org.apache.hadoop.mapred.InputSplit split, + final org.apache.hadoop.mapred.JobConf job, + final org.apache.hadoop.mapred.Reporter reporter + ) throws IOException; + } + + private final HiveHoodieReaderContext readerContext; + private final HoodieFileGroupReader fileGroupReader; + private final ArrayWritable arrayWritable; + private final NullWritable nullWritable = NullWritable.get(); + private final InputSplit inputSplit; + private final JobConf jobConfCopy; + private final UnaryOperator reverseProjection; + + public HoodieFileGroupReaderBasedRecordReader(HiveReaderCreator readerCreator, + final InputSplit split, + final JobConf jobConf, + final Reporter reporter) throws IOException { + this.jobConfCopy = new JobConf(jobConf); + HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy); + Set partitionColumns = new HashSet<>(getPartitionFieldNames(jobConfCopy)); + this.inputSplit = split; + + FileSplit fileSplit = (FileSplit) split; + String tableBasePath = getTableBasePath(split, jobConfCopy); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(getStorageConf(jobConfCopy)) + .setBasePath(tableBasePath) + .build(); + String latestCommitTime = getLatestCommitTime(split, metaClient); + Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, latestCommitTime); + Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy); + Map hosts = new HashMap<>(); + this.readerContext = new HiveHoodieReaderContext(readerCreator, split, jobConfCopy, reporter, tableSchema, hosts, metaClient); + this.arrayWritable = new ArrayWritable(Writable.class, new Writable[requestedSchema.getFields().size()]); + // get some config values + long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), MAX_MEMORY_FOR_MERGE.defaultValue()); + String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath()); + ExternalSpillableMap.DiskMapType spillMapType = ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(), + SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)); + boolean bitmaskCompressEnabled = jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), + DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); + LOG.debug("Creating HoodieFileGroupReaderRecordReader with tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath, latestCommitTime, fileSplit.getPath()); + this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, metaClient.getStorage(), tableBasePath, + latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, getFs(tableBasePath, jobConfCopy), tableBasePath), + tableSchema, requestedSchema, Option.empty(), metaClient, metaClient.getTableConfig().getProps(), metaClient.getTableConfig(), fileSplit.getStart(), + fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath, spillMapType, bitmaskCompressEnabled); + this.fileGroupReader.initRecordIterators(); + // it expects the partition columns to be at the end + Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema, + Stream.concat(tableSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)), + partitionColumns.stream()).collect(Collectors.toList())); + this.reverseProjection = readerContext.reverseProjectRecord(requestedSchema, outputSchema); + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (!fileGroupReader.hasNext()) { + return false; + } + value.set(fileGroupReader.next().get()); + reverseProjection.apply(value); + return true; + } + + @Override + public NullWritable createKey() { + return nullWritable; + } + + @Override + public ArrayWritable createValue() { + return arrayWritable; + } + + @Override + public long getPos() throws IOException { + return readerContext.getPos(); + } + + @Override + public void close() throws IOException { + fileGroupReader.close(); + } + + @Override + public float getProgress() throws IOException { + return readerContext.getProgress(); + } + + public RealtimeSplit getSplit() { + return (RealtimeSplit) inputSplit; + } + + public JobConf getJobConf() { + return jobConfCopy; + } + + private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, JobConf jobConf, String latestCommitTime) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { + Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, getPartitionFieldNames(jobConf)); + } catch (Exception e) { + throw new RuntimeException("Unable to get table schema", e); + } + } + + private static String getLatestCommitTime(InputSplit split, HoodieTableMetaClient metaClient) { + if (split instanceof RealtimeSplit) { + return ((RealtimeSplit) split).getMaxCommitTime(); + } + Option lastInstant = metaClient.getCommitsTimeline().lastInstant(); + if (lastInstant.isPresent()) { + return lastInstant.get().getTimestamp(); + } else { + return EMPTY_STRING; + } + } + + /** + * Convert FileSplit to FileSlice, but save the locations in 'hosts' because that data is otherwise lost. + */ + private static FileSlice getFileSliceFromSplit(FileSplit split, Map hosts, FileSystem fs, String tableBasePath) throws IOException { + BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs); + if (split instanceof RealtimeSplit) { + // MOR + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + boolean isLogFile = isLogFile(realtimeSplit.getPath()); + String fileID; + String commitTime; + if (isLogFile) { + fileID = getFileIdFromLogPath(realtimeSplit.getPath()); + commitTime = getDeltaCommitTimeFromLogPath(realtimeSplit.getPath()); + } else { + fileID = getFileId(realtimeSplit.getPath().getName()); + commitTime = getCommitTime(realtimeSplit.getPath().toString()); + } + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(getRelativePartitionPath(new Path(realtimeSplit.getBasePath()), realtimeSplit.getPath()), fileID); + if (isLogFile) { + return new FileSlice(fileGroupId, commitTime, null, realtimeSplit.getDeltaLogFiles()); + } + hosts.put(realtimeSplit.getPath().toString(), realtimeSplit.getLocations()); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath())), bootstrapBaseFile); + return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, realtimeSplit.getDeltaLogFiles()); + } + // COW + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(getFileId(split.getPath().getName()), getRelativePartitionPath(new Path(tableBasePath), split.getPath())); + hosts.put(split.getPath().toString(), split.getLocations()); + return new FileSlice( + fileGroupId, + getCommitTime(split.getPath().toString()), + new HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath())), bootstrapBaseFile), + Collections.emptyList()); + } + + private static BaseFile createBootstrapBaseFile(FileSplit split, Map hosts, FileSystem fs) throws IOException { + if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) split; + FileSplit bootstrapFileSplit = bootstrapBaseFileSplit.getBootstrapFileSplit(); + hosts.put(bootstrapFileSplit.getPath().toString(), bootstrapFileSplit.getLocations()); + return new BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath()))); + } + return null; + } + + private static Schema createRequestedSchema(Schema tableSchema, JobConf jobConf) { + String readCols = jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (StringUtils.isNullOrEmpty(readCols)) { + Schema emptySchema = Schema.createRecord(tableSchema.getName(), tableSchema.getDoc(), + tableSchema.getNamespace(), tableSchema.isError()); + emptySchema.setFields(Collections.emptyList()); + return emptySchema; + } + // hive will handle the partition cols + String partitionColString = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + Set partitionColumns; + if (partitionColString == null) { + partitionColumns = Collections.emptySet(); + } else { + partitionColumns = Arrays.stream(partitionColString.split(",")).collect(Collectors.toSet()); + } + // if they are actually written to the file, then it is ok to read them from the file + tableSchema.getFields().forEach(f -> partitionColumns.remove(f.name().toLowerCase(Locale.ROOT))); + return HoodieAvroUtils.generateProjectionSchema(tableSchema, + Arrays.stream(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",")).filter(c -> !partitionColumns.contains(c)).collect(Collectors.toList())); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java new file mode 100644 index 000000000000..a2fb08fd6146 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; +import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.keygen.BaseKeyGenerator; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + * {@link HoodieRecord} implementation for Hive records of {@link ArrayWritable}. + */ +public class HoodieHiveRecord extends HoodieRecord { + + private boolean copy; + private final boolean isDeleted; + + public boolean isDeleted() { + return isDeleted; + } + + private final ArrayWritableObjectInspector objectInspector; + + private final ObjectInspectorCache objectInspectorCache; + + protected Schema schema; + + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, ObjectInspectorCache objectInspectorCache) { + super(key, data); + this.objectInspector = objectInspectorCache.getObjectInspector(schema); + this.objectInspectorCache = objectInspectorCache; + this.schema = schema; + this.copy = false; + isDeleted = data == null; + } + + private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HoodieOperation operation, boolean isCopy, + ArrayWritableObjectInspector objectInspector, ObjectInspectorCache objectInspectorCache) { + super(key, data, operation, Option.empty()); + this.schema = schema; + this.copy = isCopy; + isDeleted = data == null; + this.objectInspector = objectInspector; + this.objectInspectorCache = objectInspectorCache; + } + + @Override + public HoodieRecord newInstance() { + return new HoodieHiveRecord(this.key, this.data, this.schema, this.operation, this.copy, this.objectInspector, this.objectInspectorCache); + } + + @Override + public HoodieRecord newInstance(HoodieKey key, HoodieOperation op) { + throw new UnsupportedOperationException("ObjectInspector is needed for HoodieHiveRecord"); + } + + @Override + public HoodieRecord newInstance(HoodieKey key) { + throw new UnsupportedOperationException("ObjectInspector is needed for HoodieHiveRecord"); + } + + @Override + public Comparable getOrderingValue(Schema recordSchema, Properties props) { + String orderingField = ConfigUtils.getOrderingField(props); + if (orderingField == null) { + return 0; + //throw new IllegalArgumentException("Ordering Field is not set. Precombine must be set. (If you are using a custom record merger it might be something else)"); + } + return (Comparable) getValue(ConfigUtils.getOrderingField(props)); + } + + @Override + public HoodieRecordType getRecordType() { + return HoodieRecordType.HIVE; + } + + @Override + public String getRecordKey(Schema recordSchema, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public String getRecordKey(Schema recordSchema, String keyFieldName) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + protected void writeRecordPayload(ArrayWritable payload, Kryo kryo, Output output) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + protected ArrayWritable readRecordPayload(Kryo kryo, Input input) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) { + Object[] objects = new Object[columns.length]; + for (int i = 0; i < objects.length; i++) { + objects[i] = getValue(columns[i]); + } + return objects; + } + + @Override + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public boolean isDelete(Schema recordSchema, Properties props) throws IOException { + if (null == data) { + return true; + } + if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) { + return false; + } + Object deleteMarker = getValue(HoodieRecord.HOODIE_IS_DELETED_FIELD); + return deleteMarker instanceof BooleanWritable && ((BooleanWritable) deleteMarker).get(); + } + + @Override + public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { + return false; + } + + @Override + public HoodieRecord copy() { + if (!copy) { + this.data = new ArrayWritable(Writable.class, Arrays.copyOf(this.data.get(), this.data.get().length)); + this.copy = true; + } + return this; + } + + @Override + public Option> getMetadata() { + // TODO HUDI-5282 support metaData + return Option.empty(); + } + + @Override + public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema recordSchema, Properties props, Option> simpleKeyGenFieldsOpt, Boolean withOperation, + Option partitionNameOp, Boolean populateMetaFieldsOp, Option schemaWithoutMetaFields) throws IOException { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option keyGen) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException { + data.get()[recordSchema.getIndexNamed(keyFieldName)] = new Text(); + return this; + } + + @Override + public Option toIndexedRecord(Schema recordSchema, Properties props) throws IOException { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + private Object getValue(String name) { + return HoodieArrayWritableAvroUtils.getWritableValue(data, objectInspector, name); + } + + protected Schema getSchema() { + return schema; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java new file mode 100644 index 000000000000..17a4738569e5 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.avro.Schema; + +import java.io.IOException; + +public class HoodieHiveRecordMerger implements HoodieRecordMerger { + @Override + public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { + ValidationUtils.checkArgument(older.getRecordType() == HoodieRecord.HoodieRecordType.HIVE); + ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecord.HoodieRecordType.HIVE); + if (newer instanceof HoodieHiveRecord) { + HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer; + if (newHiveRecord.isDeleted()) { + return Option.empty(); + } + } else if (newer.getData() == null) { + return Option.empty(); + } + + if (older instanceof HoodieHiveRecord) { + HoodieHiveRecord oldHiveRecord = (HoodieHiveRecord) older; + if (oldHiveRecord.isDeleted()) { + return Option.of(Pair.of(newer, newSchema)); + } + } else if (older.getData() == null) { + return Option.empty(); + } + if (older.getOrderingValue(oldSchema, props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) { + return Option.of(Pair.of(older, oldSchema)); + } else { + return Option.of(Pair.of(newer, newSchema)); + } + } + + @Override + public HoodieRecord.HoodieRecordType getRecordType() { + return HoodieRecord.HoodieRecordType.HIVE; + } + + @Override + public String getMergingStrategy() { + return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 9e6565299040..18b9e221978d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -20,10 +20,15 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -35,9 +40,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.parquet.hadoop.ParquetInputFormat; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +51,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.hudi.common.util.TablePathUtils.getTablePath; +import static org.apache.hudi.common.util.TablePathUtils.isHoodieTablePath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader; + /** * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths * that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus() @@ -91,9 +99,42 @@ private void initAvroInputFormat() { } } + private static boolean checkIfHudiTable(final InputSplit split, final JobConf job) { + try { + Path inputPath = ((FileSplit) split).getPath(); + FileSystem fs = inputPath.getFileSystem(job); + HoodieStorage storage = new HoodieHadoopStorage(fs); + return getTablePath(storage, convertToStoragePath(inputPath)) + .map(path -> isHoodieTablePath(storage, path)).orElse(false); + } catch (IOException e) { + return false; + } + } + @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { + HoodieRealtimeInputFormatUtils.addProjectionField(job, job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/")); + if (shouldUseFilegroupReader(job)) { + try { + if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) { + return super.getRecordReader(split, job, reporter); + } + if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { + return new HoodieFileGroupReaderBasedRecordReader((s, j, r) -> { + try { + return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), s, j, r); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, split, job, reporter); + } else { + return new HoodieFileGroupReaderBasedRecordReader(super::getRecordReader, split, job, reporter); + } + } catch (final IOException e) { + throw new RuntimeException("Cannot create a RecordReaderWrapper", e); + } + } // TODO enable automatic predicate pushdown after fixing issues // FileSplit fileSplit = (FileSplit) split; // HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent()); @@ -117,7 +158,6 @@ public RecordReader getRecordReader(final InputSpli LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split); } - HoodieRealtimeInputFormatUtils.addProjectionField(job, job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/")); return getRecordReaderInternal(split, job, reporter); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java index 7ffa3bf555c0..c08c358c0c87 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.mapred.RecordReader; @@ -25,7 +26,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Iterator; import java.util.NoSuchElementException; /** @@ -34,7 +34,7 @@ * @param Key Type * @param Value Type */ -public class RecordReaderValueIterator implements Iterator { +public class RecordReaderValueIterator implements ClosableIterator { private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class); @@ -79,7 +79,12 @@ public V next() { return retVal; } - public void close() throws IOException { - this.reader.close(); + @Override + public void close() { + try { + this.reader.close(); + } catch (IOException e) { + throw new RuntimeException("Could not close reader", e); + } } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java index 1edf29d45d57..b89e69f4be8c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java @@ -19,8 +19,10 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -35,6 +37,8 @@ import java.util.LinkedList; import java.util.List; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader; + /** * Allows to read multiple realtime file splits grouped together by CombineInputFormat. */ @@ -42,19 +46,29 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader recordReaders = new LinkedList<>(); + private List recordReaders = new LinkedList<>(); // Points to the currently iterating record reader - HoodieRealtimeRecordReader currentRecordReader; + private RecordReader currentRecordReader; + + private final boolean useFileGroupReader; public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split, List readers) { + useFileGroupReader = shouldUseFilegroupReader(jobConf); try { ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers .size(), "Num Splits does not match number of unique RecordReaders!"); for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) { - LOG.info("Creating new RealtimeRecordReader for split"); - recordReaders.add( - new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0))); + if (useFileGroupReader) { + LOG.info("Creating new HoodieFileGroupReaderRecordReader for split"); + RecordReader reader = readers.remove(0); + ValidationUtils.checkArgument(reader instanceof HoodieFileGroupReaderBasedRecordReader, reader.toString() + "not instance of HoodieFileGroupReaderRecordReader "); + recordReaders.add(reader); + } else { + LOG.info("Creating new RealtimeRecordReader for split"); + recordReaders.add( + new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0))); + } } currentRecordReader = recordReaders.remove(0); } catch (Exception e) { @@ -69,9 +83,20 @@ public boolean next(NullWritable key, ArrayWritable value) throws IOException { } else if (recordReaders.size() > 0) { this.currentRecordReader.close(); this.currentRecordReader = recordReaders.remove(0); - AbstractRealtimeRecordReader reader = (AbstractRealtimeRecordReader)currentRecordReader.getReader(); + RecordReader reader; + JobConf jobConf; + Path path; + if (useFileGroupReader) { + reader = currentRecordReader; + jobConf = ((HoodieFileGroupReaderBasedRecordReader) reader).getJobConf(); + path = ((HoodieFileGroupReaderBasedRecordReader) reader).getSplit().getPath(); + } else { + reader = ((HoodieRealtimeRecordReader)currentRecordReader).getReader(); + jobConf = ((AbstractRealtimeRecordReader) reader).getJobConf(); + path = ((AbstractRealtimeRecordReader) reader).getSplit().getPath(); + } // when switch reader, ioctx should be updated - IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath()); + IOContextMap.get(jobConf).setInputPath(path); return next(key, value); } else { return false; @@ -80,12 +105,20 @@ public boolean next(NullWritable key, ArrayWritable value) throws IOException { @Override public NullWritable createKey() { - return this.currentRecordReader.createKey(); + if (useFileGroupReader) { + return ((HoodieFileGroupReaderBasedRecordReader) this.currentRecordReader).createKey(); + } else { + return ((HoodieRealtimeRecordReader) this.currentRecordReader).createKey(); + } } @Override public ArrayWritable createValue() { - return this.currentRecordReader.createValue(); + if (useFileGroupReader) { + return ((HoodieFileGroupReaderBasedRecordReader) this.currentRecordReader).createValue(); + } else { + return ((HoodieRealtimeRecordReader) this.currentRecordReader).createValue(); + } } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 7e74171c3f98..8d56e77dda92 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -26,7 +26,6 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; @@ -45,6 +44,10 @@ import java.util.Arrays; import java.util.List; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader; + /** * Input Format, that provides a real-time view of data in a Hoodie table. */ @@ -69,16 +72,20 @@ public RecordReader getRecordReader(final InputSpli ValidationUtils.checkArgument(split instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); RealtimeSplit realtimeSplit = (RealtimeSplit) split; + + if (shouldUseFilegroupReader(jobConf)) { + return super.getRecordReader(realtimeSplit, jobConf, reporter); + } + // add preCombineKey - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(HadoopFSUtils.getStorageConfWithCopy(jobConf)).setBasePath(realtimeSplit.getBasePath()).build(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(getStorageConf(jobConf)).setBasePath(realtimeSplit.getBasePath()).build(); HoodieTableConfig tableConfig = metaClient.getTableConfig(); addProjectionToJobConf(realtimeSplit, jobConf, tableConfig); LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // for log only split, set the parquet reader as empty. - if (HadoopFSUtils.isLogFile(realtimeSplit.getPath())) { + if (isLogFile(realtimeSplit.getPath())) { return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java new file mode 100644 index 000000000000..a2da796c6f77 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.common.util.collection.Pair; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.List; +import java.util.function.UnaryOperator; + +public class HoodieArrayWritableAvroUtils { + + private static final Cache + OBJECT_INSPECTOR_TABLE_CACHE = Caffeine.newBuilder().maximumSize(1000).build(); + + public static ObjectInspectorCache getCacheForTable(String table, Schema tableSchema, JobConf jobConf) { + ObjectInspectorCache cache = OBJECT_INSPECTOR_TABLE_CACHE.getIfPresent(table); + if (cache == null) { + cache = new ObjectInspectorCache(tableSchema, jobConf); + } + return cache; + } + + private static final Cache, int[]> + PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build(); + + public static int[] getProjection(Schema from, Schema to) { + return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { + List toFields = to.getFields(); + int[] newProjection = new int[toFields.size()]; + for (int i = 0; i < newProjection.length; i++) { + newProjection[i] = from.getField(toFields.get(i).name()).pos(); + } + return newProjection; + }); + } + + /** + * Projection will keep the size from the "from" schema because it gets recycled + * and if the size changes the reader will fail + */ + public static UnaryOperator projectRecord(Schema from, Schema to) { + int[] projection = getProjection(from, to); + return arrayWritable -> { + Writable[] values = new Writable[arrayWritable.get().length]; + for (int i = 0; i < projection.length; i++) { + values[i] = arrayWritable.get()[projection[i]]; + } + arrayWritable.set(values); + return arrayWritable; + }; + } + + public static int[] getReverseProjection(Schema from, Schema to) { + return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { + List fromFields = from.getFields(); + int[] newProjection = new int[fromFields.size()]; + for (int i = 0; i < newProjection.length; i++) { + newProjection[i] = to.getField(fromFields.get(i).name()).pos(); + } + return newProjection; + }); + } + + /** + * After the reading and merging etc is done, we need to put the records + * into the positions of the original schema + */ + public static UnaryOperator reverseProject(Schema from, Schema to) { + int[] projection = getReverseProjection(from, to); + return arrayWritable -> { + Writable[] values = new Writable[to.getFields().size()]; + for (int i = 0; i < projection.length; i++) { + values[projection[i]] = arrayWritable.get()[i]; + } + arrayWritable.set(values); + return arrayWritable; + }; + } + + public static Object getWritableValue(ArrayWritable arrayWritable, ArrayWritableObjectInspector objectInspector, String name) { + return objectInspector.getStructFieldData(arrayWritable, objectInspector.getStructFieldRef(name)); + } +} + + diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index fe88855d4581..64dc1f63af8d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -18,7 +18,9 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -33,6 +35,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; @@ -44,6 +47,8 @@ import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.HoodieRealtimePath; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StoragePath; @@ -52,8 +57,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; @@ -61,6 +68,8 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.slf4j.Logger; @@ -540,4 +549,31 @@ public static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath throw new HoodieIOException(String.format("Failed to create instance of %s", HoodieRealtimeFileSplit.class.getName()), e); } } + + public static List getPartitionFieldNames(JobConf jobConf) { + String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + return partitionFields.isEmpty() ? new ArrayList<>() : Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()); + } + + public static String getTableBasePath(InputSplit split, JobConf jobConf) throws IOException { + if (split instanceof RealtimeSplit) { + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + return realtimeSplit.getBasePath(); + } else { + Path inputPath = ((FileSplit) split).getPath(); + FileSystem fs = inputPath.getFileSystem(jobConf); + HoodieStorage storage = new HoodieHadoopStorage(fs); + Option tablePath = TablePathUtils.getTablePath(storage, convertToStoragePath(inputPath)); + return tablePath.get().toString(); + } + } + + /** + * `schema.on.read` and skip merge not implemented + */ + public static boolean shouldUseFilegroupReader(final JobConf jobConf) { + return jobConf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue()) + && !jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) + && !jobConf.getBoolean(HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP, false); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java new file mode 100644 index 000000000000..ddcc28851dfd --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * To read value from an ArrayWritable, an ObjectInspector is needed. + * Object inspectors are cached here or created using the column type map. + */ +public class ObjectInspectorCache { + private final Map columnTypeMap = new HashMap<>(); + private final Cache + objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build(); + + public Map getColumnTypeMap() { + return columnTypeMap; + } + + public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) { + //From AbstractRealtimeRecordReader#prepareHiveAvroSerializer + // hive will append virtual columns at the end of column list. we should remove those columns. + // eg: current table is col1, col2, col3; jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 ,BLOCK__OFFSET__INSIDE__FILE ... + Set writerSchemaColNames = tableSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet()); + List columnNameList = Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList()); + List columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); + + int columnNameListLen = columnNameList.size() - 1; + for (int i = columnNameListLen; i >= 0; i--) { + String lastColName = columnNameList.get(columnNameList.size() - 1); + // virtual columns will only append at the end of column list. it will be ok to break the loop. + if (writerSchemaColNames.contains(lastColName)) { + break; + } + columnNameList.remove(columnNameList.size() - 1); + columnTypeList.remove(columnTypeList.size() - 1); + } + + //Use columnNameList.size() instead of columnTypeList because the type list is longer for some reason + IntStream.range(0, columnNameList.size()).boxed().forEach(i -> columnTypeMap.put(columnNameList.get(i), + TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0))); + + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + ArrayWritableObjectInspector objectInspector = new ArrayWritableObjectInspector(rowTypeInfo); + objectInspectorCache.put(tableSchema, objectInspector); + } + + public ArrayWritableObjectInspector getObjectInspector(Schema schema) { + return objectInspectorCache.get(schema, s -> { + List columnNameList = s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List columnTypeList = columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList()); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + return new ArrayWritableObjectInspector(rowTypeInfo); + }); + + } + + public Object getValue(ArrayWritable record, Schema schema, String fieldName) { + try { + ArrayWritableObjectInspector objectInspector = getObjectInspector(schema); + return objectInspector.getStructFieldData(record, objectInspector.getStructFieldRef(fieldName)); + } catch (Exception e) { + throw e; + } + + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 08cd33c2d56e..7d7a2eec626c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -368,7 +369,7 @@ public void testIncrementalSimple() throws IOException { files = inputFormat.listStatus(jobConf); assertEquals(10, files.length, "When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty" - + " and the incremental database name is not set, then the incremental query will not take effect"); + + " and the incremental database name is not set, then the incremental query will not take effect"); } @Test @@ -403,7 +404,7 @@ public void testIncrementalWithDatabaseName() throws IOException { metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, baseFileFormat, HoodieTestUtils.HOODIE_DATABASE); assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(), - String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); + String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); files = inputFormat.listStatus(jobConf); assertEquals(0, files.length, @@ -414,7 +415,7 @@ public void testIncrementalWithDatabaseName() throws IOException { files = inputFormat.listStatus(jobConf); assertEquals(10, files.length, "When hoodie.incremental.use.database is false and the incremental database name is set, " - + "then the incremental query will not take effect"); + + "then the incremental query will not take effect"); // The configuration with and without database name exists together InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true); @@ -422,13 +423,13 @@ public void testIncrementalWithDatabaseName() throws IOException { files = inputFormat.listStatus(jobConf); assertEquals(0, files.length, "When hoodie.incremental.use.database is true, " - + "We should exclude commit 100 because the returning incremental pull with start commit time is 100"); + + "We should exclude commit 100 because the returning incremental pull with start commit time is 100"); InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false); files = inputFormat.listStatus(jobConf); assertEquals(10, files.length, "When hoodie.incremental.use.database is false, " - + "We should include commit 100 because the returning incremental pull with start commit time is 1"); + + "We should include commit 100 because the returning incremental pull with start commit time is 1"); } @Test @@ -679,13 +680,13 @@ public void testSnapshotPreCommitValidate() throws IOException { try { // Verify that Validate mode throws error with invalid commit time - InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300"); + InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300"); inputFormat.listStatus(jobConf); fail("Expected list status to fail when validate is called with unknown timestamp"); } catch (HoodieIOException e) { // expected because validate is called with invalid instantTime } - + //Creating a new jobCOnf Object because old one has hoodie.%.consume.commit set jobConf = new JobConf(); inputFormat.setConf(jobConf); @@ -751,7 +752,7 @@ public void testInputFormatLoadForEmptyPartitionedTable() throws IOException { } private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, - int totalExpected) throws IOException { + int totalExpected) throws IOException { int actualCount = 0; int totalCount = 0; InputSplit[] splits = inputFormat.getSplits(jobConf, 1); @@ -777,59 +778,64 @@ private void ensureRecordsInCommit(String msg, String commit, int expectedNumber @Test public void testHoodieParquetInputFormatReadTimeType() throws IOException { - long testTimestampLong = System.currentTimeMillis(); - int testDate = 19116;// 2022-05-04 - - Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), "/test_timetype.avsc"); - String commit = "20160628071126"; - HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), basePath.toString(), - HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); - java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); - String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", - HoodieFileFormat.PARQUET.getFileExtension()); - try (AvroParquetWriter parquetWriter = new AvroParquetWriter( - new Path(partitionPath.resolve(fileId).toString()), schema)) { - GenericData.Record record = new GenericData.Record(schema); - record.put("test_timestamp", testTimestampLong * 1000); - record.put("test_long", testTimestampLong * 1000); - record.put("test_date", testDate); - record.put("_hoodie_commit_time", commit); - record.put("_hoodie_commit_seqno", commit + 1); - parquetWriter.write(record); - } - - jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); - jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); - jobConf.set(READ_COLUMN_NAMES_CONF_STR, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); - InputFormatTestUtil.setupPartition(basePath, partitionPath); - InputFormatTestUtil.commit(basePath, commit); - FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + try { + long testTimestampLong = System.currentTimeMillis(); + int testDate = 19116;// 2022-05-04 + + Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), "/test_timetype.avsc"); + String commit = "20160628071126"; + HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); + String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", + HoodieFileFormat.PARQUET.getFileExtension()); + try (AvroParquetWriter parquetWriter = new AvroParquetWriter( + new Path(partitionPath.resolve(fileId).toString()), schema)) { + GenericData.Record record = new GenericData.Record(schema); + record.put("test_timestamp", testTimestampLong * 1000); + record.put("test_long", testTimestampLong * 1000); + record.put("test_date", testDate); + record.put("_hoodie_commit_time", commit); + record.put("_hoodie_commit_seqno", commit + 1); + parquetWriter.write(record); + } - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - for (InputSplit split : splits) { - RecordReader recordReader = inputFormat - .getRecordReader(split, jobConf, null); - NullWritable key = recordReader.createKey(); - ArrayWritable writable = recordReader.createValue(); - while (recordReader.next(key, writable)) { - // test timestamp - if (HiveVersionInfo.getShortVersion().startsWith("3")) { - LocalDateTime localDateTime = LocalDateTime.ofInstant( - Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); - assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); - } else { - Date date = new Date(); - date.setTime(testTimestampLong); - Timestamp actualTime = ((TimestampWritable) writable.get()[0]).getTimestamp(); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - assertEquals(dateFormat.format(date), dateFormat.format(actualTime)); + //this is not a hoodie table!! + jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); + jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); + jobConf.set(READ_COLUMN_NAMES_CONF_STR, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + InputFormatTestUtil.setupPartition(basePath, partitionPath); + InputFormatTestUtil.commit(basePath, commit); + FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat + .getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + while (recordReader.next(key, writable)) { + // test timestamp + if (HiveVersionInfo.getShortVersion().startsWith("3")) { + LocalDateTime localDateTime = LocalDateTime.ofInstant( + Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); + assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); + } else { + Date date = new Date(); + date.setTime(testTimestampLong); + Timestamp actualTime = ((TimestampWritable) writable.get()[0]).getTimestamp(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + assertEquals(dateFormat.format(date), dateFormat.format(actualTime)); + } + // test long + assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); + // test date + assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); } - // test long - assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); - // test date - assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); } - recordReader.close(); + } finally { + jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"); } } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index 3371b5efb27b..ab907390f884 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; @@ -243,7 +244,18 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; - InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + + String hiveColumnNames = fields.stream().map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",year,month,day"; + String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes); + modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string,string,string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); // unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718 jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java index f982a9710621..463ad5a2ebc1 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -89,6 +90,7 @@ public void setUp() { baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS); baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES); + baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); storage = new HoodieHadoopStorage(HadoopFSUtils.getFs(new StoragePath(basePath.toUri()), baseJobConf)); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 3ee83a09a3b6..b992987c6909 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; @@ -120,6 +121,7 @@ public void setUp() { storageConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); baseJobConf = new JobConf(storageConf.unwrap()); baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); + baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); fs = HadoopFSUtils.getFs(basePath.toUri().toString(), baseJobConf); storage = new HoodieHadoopStorage(fs); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java new file mode 100644 index 000000000000..d7b4a93009b8 --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.JobConf; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieArrayWritableAvroUtils { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + Schema tableSchema = HoodieTestDataGenerator.AVRO_SCHEMA; + ObjectInspectorCache objectInspectorCache; + + @BeforeEach + public void setup() { + List fields = tableSchema.getFields(); + Configuration conf = HoodieTestUtils.getDefaultStorageConf().unwrap(); + JobConf jobConf = new JobConf(conf); + jobConf.set(serdeConstants.LIST_COLUMNS, fields.stream().map(Schema.Field::name).collect(Collectors.joining(","))); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES); + objectInspectorCache = new ObjectInspectorCache(HoodieTestDataGenerator.AVRO_SCHEMA, jobConf); + } + + @Test + public void testProjection() { + Schema from = tableSchema; + Schema to = HoodieAvroUtils.generateProjectionSchema(from, Arrays.asList("trip_type", "current_ts", "weight")); + UnaryOperator projection = HoodieArrayWritableAvroUtils.projectRecord(from, to); + UnaryOperator reverseProjection = HoodieArrayWritableAvroUtils.reverseProject(to, from); + + //We reuse the ArrayWritable, so we need to get the values before projecting + ArrayWritable record = convertArrayWritable(dataGen.generateGenericRecord()); + Object tripType = objectInspectorCache.getValue(record, from, "trip_type"); + Object currentTs = objectInspectorCache.getValue(record, from, "current_ts"); + Object weight = objectInspectorCache.getValue(record, from, "weight"); + + //Make sure the projected fields can be read + ArrayWritable projectedRecord = projection.apply(record); + assertEquals(tripType, objectInspectorCache.getValue(projectedRecord, to, "trip_type")); + assertEquals(currentTs, objectInspectorCache.getValue(projectedRecord, to, "current_ts")); + assertEquals(weight, objectInspectorCache.getValue(projectedRecord, to, "weight")); + + //Reverse projection, the fields are in the original spots, but only the fields we set can be read. + //Therefore, we can only check the 3 fields that were in the projection + ArrayWritable reverseProjected = reverseProjection.apply(projectedRecord); + assertEquals(tripType, objectInspectorCache.getValue(reverseProjected, from, "trip_type")); + assertEquals(currentTs, objectInspectorCache.getValue(reverseProjected, from, "current_ts")); + assertEquals(weight, objectInspectorCache.getValue(reverseProjected, from, "weight")); + } + + private static ArrayWritable convertArrayWritable(GenericRecord record) { + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(), false); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index b6795bc2a2ae..99fcdcbf8a33 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -253,6 +253,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec long timestamp = Instant.now().toEpochMilli(); Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); HoodieWriteConfig config = getConfigBuilder(schema.toString()) + .withPreCombineField("timestamp") .withAutoCommit(true) .withSchema(schema.toString()) .withKeyGenerator(keyGeneratorClass) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index a5a45cabf81d..806f77544231 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -19,6 +19,7 @@ package org.apache.hudi.functional; import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -106,6 +107,7 @@ public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName)); JobConf jobConf = new JobConf(); + jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true"); jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new"); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java index 723d2389d22e..2f2d2ba0efaf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java @@ -219,7 +219,7 @@ public void testLoadMetadata(boolean isCommitFilePresent, boolean rowWriterEnabl */ @ParameterizedTest @MethodSource("configParamsForSorting") - public void testClusteringColumnSort(String sortColumn, boolean rowWriterEnable) throws IOException { + public void testClusteringColumnSort(String sortColumn, boolean rowWriterEnable) throws Exception { Map options = new HashMap<>(); // Record key is handled specially if (sortColumn.equals("_row_key")) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java index e6c388b3e3b1..e00d72900941 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java @@ -32,12 +32,14 @@ import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doNothing; @@ -56,8 +58,15 @@ public static void setupOnce() throws Exception { initTestServices(); } + private static Stream validRecordTypes() { + Stream.Builder b = Stream.builder(); + b.add(Arguments.of(HoodieRecordType.SPARK)); + b.add(Arguments.of(HoodieRecordType.AVRO)); + return b.build(); + } + @ParameterizedTest - @EnumSource(HoodieRecordType.class) + @MethodSource("validRecordTypes") public void testCreateHoodieRecordsWithError(HoodieRecordType recordType) { Schema schema = new Schema.Parser().parse(SCHEMA_STRING); JavaRDD recordRdd = jsc.parallelize(Collections.singletonList(1)).map(i -> {