diff --git a/docker/compose/docker-compose_hadoop310_hive312_spark321_mac_aarch64.yml b/docker/compose/docker-compose_hadoop310_hive312_spark321_mac_aarch64.yml new file mode 100644 index 000000000000..f19835a623a8 --- /dev/null +++ b/docker/compose/docker-compose_hadoop310_hive312_spark321_mac_aarch64.yml @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "3.3" + +services: + + namenode: + image: apachehudi/hudi-hadoop_3.1.0-namenode:latest + hostname: namenode + container_name: namenode + environment: + - CLUSTER_NAME=hudi_hadoop310_hive312_spark321 + ports: + - "9870:9870" + - "8020:8020" + env_file: + - ./hadoop.env + healthcheck: + test: ["CMD", "curl", "-f", "http://namenode:9870"] + interval: 30s + timeout: 10s + retries: 3 + + datanode1: + image: apachehudi/hudi-hadoop_3.1.0-datanode:latest + container_name: datanode1 + hostname: datanode1 + environment: + - CLUSTER_NAME=hudi_hadoop310_hive312_spark321 + env_file: + - ./hadoop.env + ports: + - "50075:50075" + - "50010:50010" + links: + - "namenode" + - "historyserver" + healthcheck: + test: ["CMD", "curl", "-f", "http://datanode1:50075"] + interval: 30s + timeout: 10s + retries: 3 + depends_on: + - namenode + + historyserver: + image: apachehudi/hudi-hadoop_3.1.0-history:latest + hostname: historyserver + container_name: historyserver + environment: + - CLUSTER_NAME=hudi_hadoop310_hive312_spark321 + depends_on: + - "namenode" + links: + - "namenode" + ports: + - "58188:8188" + healthcheck: + test: ["CMD", "curl", "-f", "http://historyserver:8188"] + interval: 30s + timeout: 10s + retries: 3 + env_file: + - ./hadoop.env + volumes: + - historyserver:/hadoop/yarn/timeline + + hive-metastore-postgresql: + image: bde2020/hive-metastore-postgresql:3.1.0 + volumes: + - hive-metastore-postgresql:/var/lib/postgresql + hostname: hive-metastore-postgresql + container_name: hive-metastore-postgresql + + hivemetastore: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2:latest + hostname: hivemetastore + container_name: hivemetastore + links: + - "hive-metastore-postgresql" + - "namenode" + env_file: + - ./hadoop.env + command: /opt/hive/bin/hive --service metastore + environment: + SERVICE_PRECONDITION: "namenode:9870 hive-metastore-postgresql:5432" + ports: + - "9083:9083" + healthcheck: + test: ["CMD", "nc", "-z", "hivemetastore", "9083"] + interval: 30s + timeout: 10s + retries: 3 + depends_on: + - "hive-metastore-postgresql" + - "namenode" + + hiveserver: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2:latest + hostname: hiveserver + container_name: hiveserver + env_file: + - ./hadoop.env + environment: + SERVICE_PRECONDITION: "hivemetastore:9083" + JAVA_TOOL_OPTIONS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" + ports: + - "10000:10000" + # JVM debugging port + - "64757:5005" + depends_on: + - "hivemetastore" + links: + - "hivemetastore" + - "hive-metastore-postgresql" + - "namenode" + volumes: + - ${HUDI_WS}:/var/hoodie/ws + - /Users/jon/Desktop/hiveWorkload:/var/hiveWorkload + + sparkmaster: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2-sparkmaster_3.2.1:latest + hostname: sparkmaster + container_name: sparkmaster + env_file: + - ./hadoop.env + ports: + - "8080:8080" + - "7077:7077" + environment: + - INIT_DAEMON_STEP=setup_spark + links: + - "hivemetastore" + - "hiveserver" + - "hive-metastore-postgresql" + - "namenode" + + spark-worker-1: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2-sparkworker_3.2.1:latest + hostname: spark-worker-1 + container_name: spark-worker-1 + env_file: + - ./hadoop.env + depends_on: + - sparkmaster + ports: + - "8081:8081" + environment: + - "SPARK_MASTER=spark://sparkmaster:7077" + links: + - "hivemetastore" + - "hiveserver" + - "hive-metastore-postgresql" + - "namenode" + + zookeeper: + image: 'arm64v8/zookeeper:3.4.12' + platform: linux/arm64 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + image: 'wurstmeister/kafka:2.12-2.0.1' + platform: linux/arm64 + hostname: kafkabroker + container_name: kafkabroker + ports: + - "9092:9092" + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_ADVERTISED_HOST_NAME=kafkabroker + +# presto-coordinator-1: +# container_name: presto-coordinator-1 +# hostname: presto-coordinator-1 +# image: apachehudi/hudi-hadoop_3.1.0-prestobase_0.271:latest +# ports: +# - '8090:8090' +# environment: +# - PRESTO_JVM_MAX_HEAP=512M +# - PRESTO_QUERY_MAX_MEMORY=1GB +# - PRESTO_QUERY_MAX_MEMORY_PER_NODE=256MB +# - PRESTO_QUERY_MAX_TOTAL_MEMORY_PER_NODE=384MB +# - PRESTO_MEMORY_HEAP_HEADROOM_PER_NODE=100MB +# - TERM=xterm +# links: +# - "hivemetastore" +# volumes: +# - ${HUDI_WS}:/var/hoodie/ws +# command: coordinator +# +# presto-worker-1: +# container_name: presto-worker-1 +# hostname: presto-worker-1 +# image: apachehudi/hudi-hadoop_3.1.0-prestobase_0.271:latest +# depends_on: [ "presto-coordinator-1" ] +# environment: +# - PRESTO_JVM_MAX_HEAP=512M +# - PRESTO_QUERY_MAX_MEMORY=1GB +# - PRESTO_QUERY_MAX_MEMORY_PER_NODE=256MB +# - PRESTO_QUERY_MAX_TOTAL_MEMORY_PER_NODE=384MB +# - PRESTO_MEMORY_HEAP_HEADROOM_PER_NODE=100MB +# - TERM=xterm +# links: +# - "hivemetastore" +# - "hiveserver" +# - "hive-metastore-postgresql" +# - "namenode" +# volumes: +# - ${HUDI_WS}:/var/hoodie/ws +# command: worker +# +# trino-coordinator-1: +# container_name: trino-coordinator-1 +# hostname: trino-coordinator-1 +# image: apachehudi/hudi-hadoop_3.1.0-trinocoordinator_368:latest +# ports: +# - '8091:8091' +# links: +# - "hivemetastore" +# volumes: +# - ${HUDI_WS}:/var/hoodie/ws +# command: http://trino-coordinator-1:8091 trino-coordinator-1 +# +# trino-worker-1: +# container_name: trino-worker-1 +# hostname: trino-worker-1 +# image: apachehudi/hudi-hadoop_3.1.0-trinoworker_368:latest +# depends_on: [ "trino-coordinator-1" ] +# ports: +# - '8092:8092' +# links: +# - "hivemetastore" +# - "hiveserver" +# - "hive-metastore-postgresql" +# - "namenode" +# volumes: +# - ${HUDI_WS}:/var/hoodie/ws +# command: http://trino-coordinator-1:8091 trino-worker-1 +# +# graphite: +# container_name: graphite +# hostname: graphite +# image: graphiteapp/graphite-statsd +# ports: +# - 80:80 +# - 2003-2004:2003-2004 +# - 8126:8126 + + adhoc-1: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2-sparkadhoc_3.2.1:latest + hostname: adhoc-1 + container_name: adhoc-1 + env_file: + - ./hadoop.env + depends_on: + - sparkmaster + ports: + - '4040:4040' + environment: + - "SPARK_MASTER=spark://sparkmaster:7077" + links: + - "hivemetastore" + - "hiveserver" + - "hive-metastore-postgresql" + - "namenode" + #- "presto-coordinator-1" + #- "trino-coordinator-1" + volumes: + - ${HUDI_WS}:/var/hoodie/ws + - /Users/jon/Desktop/hiveWorkload:/var/hiveWorkload + + adhoc-2: + image: apachehudi/hudi-hadoop_3.1.0-hive_3.1.2-sparkadhoc_3.2.1:latest + hostname: adhoc-2 + container_name: adhoc-2 + env_file: + - ./hadoop.env + depends_on: + - sparkmaster + environment: + - "SPARK_MASTER=spark://sparkmaster:7077" + links: + - "hivemetastore" + - "hiveserver" + - "hive-metastore-postgresql" + - "namenode" + #- "presto-coordinator-1" + #- "trino-coordinator-1" + volumes: + - ${HUDI_WS}:/var/hoodie/ws + - /Users/jon/Desktop/hiveWorkload:/var/hiveWorkload + +volumes: + namenode: + historyserver: + hive-metastore-postgresql: + +networks: + default: + name: hudi-network \ No newline at end of file diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh index e847f913a5ac..94ba3d42c262 100755 --- a/docker/setup_demo.sh +++ b/docker/setup_demo.sh @@ -23,6 +23,11 @@ COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244.yml" if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml" fi + +if [ "$HUDI_DEMO_ENV" = "--hive3" ]; then + COMPOSE_FILE_NAME="docker-compose_hadoop310_hive312_spark321_mac_aarch64.yml" +fi + # restart cluster HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down if [ "$HUDI_DEMO_ENV" != "dev" ]; then diff --git a/docker/stop_demo.sh b/docker/stop_demo.sh index 32a0e70c3791..550397c082df 100755 --- a/docker/stop_demo.sh +++ b/docker/stop_demo.sh @@ -24,6 +24,11 @@ COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244.yml" if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml" fi + +if [ "$HUDI_DEMO_ENV" = "--hive3" ]; then + COMPOSE_FILE_NAME="docker-compose_hadoop310_hive312_spark321_mac_aarch64.yml" +fi + # shut down cluster HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java index d0b27e507fe5..44cdfa5d52f7 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java @@ -69,30 +69,15 @@ public HoodieRecordMerger getRecordMerger(String mergerStrategy) { @Override public Object getValue(InternalRow row, Schema schema, String fieldName) { - return getFieldValueFromInternalRow(row, schema, fieldName); - } - - @Override - public String getRecordKey(InternalRow row, Schema schema) { - return getFieldValueFromInternalRow(row, schema, RECORD_KEY_METADATA_FIELD).toString(); - } - - @Override - public Comparable getOrderingValue(Option rowOption, - Map metadataMap, - Schema schema, - TypedProperties props) { - if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) { - return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD); - } - - if (!rowOption.isPresent()) { - return 0; + StructType structType = getCachedSchema(schema); + scala.Option cachedNestedFieldPath = + HoodieInternalRowUtils.getCachedPosList(structType, fieldName); + if (cachedNestedFieldPath.isDefined()) { + HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get(); + return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, nestedFieldPath); + } else { + return null; } - - String orderingFieldName = ConfigUtils.getOrderingField(props); - Object value = getFieldValueFromInternalRow(rowOption.get(), schema, orderingFieldName); - return value != null ? (Comparable) value : 0; } @Override @@ -117,25 +102,13 @@ public InternalRow seal(InternalRow internalRow) { @Override public long extractRecordPosition(InternalRow record, Schema recordSchema, String fieldName, long providedPositionIfNeeded) { - Object position = getFieldValueFromInternalRow(record, recordSchema, fieldName); + Object position = getValue(record, recordSchema, fieldName); if (position != null) { return (long) position; } return providedPositionIfNeeded; } - private Object getFieldValueFromInternalRow(InternalRow row, Schema recordSchema, String fieldName) { - StructType structType = getCachedSchema(recordSchema); - scala.Option cachedNestedFieldPath = - HoodieInternalRowUtils.getCachedPosList(structType, fieldName); - if (cachedNestedFieldPath.isDefined()) { - HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get(); - return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, nestedFieldPath); - } else { - return null; - } - } - @Override public UnaryOperator projectRecord(Schema from, Schema to) { UnsafeProjection projection = HoodieInternalRowUtils.generateUnsafeProjectionAlias(getCachedSchema(from), getCachedSchema(to)); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 963035caf217..e59081f1c6dc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -107,7 +107,9 @@ class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part } override def mergeBootstrapReaders(skeletonFileIterator: ClosableIterator[InternalRow], - dataFileIterator: ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = { + skeletonRequiredSchema: Schema, + dataFileIterator: ClosableIterator[InternalRow], + dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], dataFileIterator.asInstanceOf[ClosableIterator[Any]]) } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b2fab0ae4927..e84693e7b50d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -212,7 +212,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) .collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, - basePath(), new JobConf(hadoopConf()), true, false); + basePath(), new JobConf(hadoopConf()), true, populateMetaFields); // Wrote 20 records in 2 batches assertEquals(40, recordsRead.size(), "Must contain 40 records"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index d1d92a19742a..fd44be24f4c5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; @@ -146,43 +147,50 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException { @ParameterizedTest @MethodSource("writeLogTest") public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException { - Properties props = getPropertiesForKeyGen(true); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder() - .forTable("test-trip-table") - .withPath(basePath()) - .withSchema(TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withAutoCommit(true) - .withEmbeddedTimelineServerEnabled(enableTimelineServer) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withLayoutConfig(HoodieLayoutConfig.newBuilder() - .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) - .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) - .build(); - props.putAll(config.getProps()); - - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); - client = getHoodieWriteClient(config); - - final List records = dataGen.generateInserts("001", 100); - JavaRDD writeRecords = jsc().parallelize(records, 2); + try { + //disable for this test because it seems like we process mor in a different order? + jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); + Properties props = getPropertiesForKeyGen(true); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .forTable("test-trip-table") + .withPath(basePath()) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(true) + .withEmbeddedTimelineServerEnabled(enableTimelineServer) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withLayoutConfig(HoodieLayoutConfig.newBuilder() + .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) + .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); + client = getHoodieWriteClient(config); + + final List records = dataGen.generateInserts("001", 100); + JavaRDD writeRecords = jsc().parallelize(records, 2); + + // initialize 100 records + client.upsert(writeRecords, client.startCommit()); + // update 100 records + client.upsert(writeRecords, client.startCommit()); + // schedule compaction + client.scheduleCompaction(Option.empty()); + // delete 50 records + List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList()); + JavaRDD deleteRecords = jsc().parallelize(toBeDeleted, 2); + client.delete(deleteRecords, client.startCommit()); + // insert the same 100 records again + client.upsert(writeRecords, client.startCommit()); + Assertions.assertEquals(100, readTableTotalRecordsNum()); + } finally { + jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"); + } - // initialize 100 records - client.upsert(writeRecords, client.startCommit()); - // update 100 records - client.upsert(writeRecords, client.startCommit()); - // schedule compaction - client.scheduleCompaction(Option.empty()); - // delete 50 records - List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList()); - JavaRDD deleteRecords = jsc().parallelize(toBeDeleted, 2); - client.delete(deleteRecords, client.startCommit()); - // insert the same 100 records again - client.upsert(writeRecords, client.startCommit()); - Assertions.assertEquals(100, readTableTotalRecordsNum()); } private long readTableTotalRecordsNum() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 86a875c9df33..f230afed30f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -36,6 +37,8 @@ import java.util.Map; import java.util.function.UnaryOperator; +import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; + /** * An abstract reader context class for {@code HoodieFileGroupReader} to use, containing APIs for * engine-specific implementation on reading data files, getting field values from a record, @@ -112,7 +115,10 @@ public abstract ClosableIterator getFileRecordIterator( * @param schema The Avro schema of the record. * @return The record key in String. */ - public abstract String getRecordKey(T record, Schema schema); + public String getRecordKey(T record, Schema schema) { + Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD); + return val.toString(); + } /** * Gets the ordering value in particular type. @@ -123,10 +129,23 @@ public abstract ClosableIterator getFileRecordIterator( * @param props Properties. * @return The ordering value. */ - public abstract Comparable getOrderingValue(Option recordOption, + public Comparable getOrderingValue(Option recordOption, Map metadataMap, Schema schema, - TypedProperties props); + TypedProperties props) { + if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) { + return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD); + } + + if (!recordOption.isPresent()) { + return 0; + } + + String orderingFieldName = ConfigUtils.getOrderingField(props); + Object value = getValue(recordOption.get(), schema, orderingFieldName); + return value != null ? (Comparable) value : 0; + + } /** * Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging. @@ -199,7 +218,10 @@ public Map updateSchemaAndResetOrderingValInMetadata(Map mergeBootstrapReaders(ClosableIterator skeletonFileIterator, ClosableIterator dataFileIterator); + public abstract ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, + Schema skeletonRequiredSchema, + ClosableIterator dataFileIterator, + Schema dataRequiredSchema); /** * Creates a function that will reorder records of schema "from" to schema of "to" @@ -217,6 +239,10 @@ public Map updateSchemaAndResetOrderingValInMetadata(Map makeBaseFileIterator() throws IOException { } private Schema generateRequiredSchema() { + return maybeReorderForBootstrap(generateRequiredSchemaHelper()); + } + + private Schema generateRequiredSchemaHelper() { //might need to change this if other queries than mor have mandatory fields if (logFiles.isEmpty()) { return requestedSchema; @@ -193,10 +197,10 @@ private Schema generateRequiredSchema() { } if (addedFields.isEmpty()) { - return maybeReorderForBootstrap(requestedSchema); + return requestedSchema; } - return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, addedFields)); + return appendFieldsToSchema(requestedSchema, addedFields); } private Schema maybeReorderForBootstrap(Schema input) { @@ -232,23 +236,31 @@ private ClosableIterator makeBootstrapBaseFileIterator(HoodieBaseFile baseFil BaseFile dataFile = baseFile.getBootstrapBaseFile().get(); Pair,List> requiredFields = getDataAndMetaCols(requiredSchema); Pair,List> allFields = getDataAndMetaCols(dataSchema); - - Option> dataFileIterator = requiredFields.getRight().isEmpty() ? Option.empty() : - Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0, dataFile.getFileLen(), - createSchemaFromFields(allFields.getRight()), createSchemaFromFields(requiredFields.getRight()), hadoopConf)); - - Option> skeletonFileIterator = requiredFields.getLeft().isEmpty() ? Option.empty() : - Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, baseFile.getFileLen(), - createSchemaFromFields(allFields.getLeft()), createSchemaFromFields(requiredFields.getLeft()), hadoopConf)); + Option,Schema>> dataFileIterator = + makeBootstrapBaseFileIteratorHelper(requiredFields.getRight(), allFields.getRight(), dataFile); + Option,Schema>> skeletonFileIterator = + makeBootstrapBaseFileIteratorHelper(requiredFields.getLeft(), allFields.getLeft(), baseFile); if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) { throw new IllegalStateException("should not be here if only partition cols are required"); } else if (!dataFileIterator.isPresent()) { - return skeletonFileIterator.get(); + return skeletonFileIterator.get().getLeft(); } else if (!skeletonFileIterator.isPresent()) { - return dataFileIterator.get(); + return dataFileIterator.get().getLeft(); } else { - return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(), dataFileIterator.get()); + return readerContext.mergeBootstrapReaders(skeletonFileIterator.get().getLeft(), skeletonFileIterator.get().getRight(), + dataFileIterator.get().getLeft(), dataFileIterator.get().getRight()); + } + } + + private Option,Schema>> makeBootstrapBaseFileIteratorHelper(List requiredFields, + List allFields, + BaseFile file) throws IOException { + if (requiredFields.isEmpty()) { + return Option.empty(); } + Schema requiredSchema = createSchemaFromFields(requiredFields); + return Option.of(Pair.of(readerContext.getFileRecordIterator(file.getHadoopPath(), 0, file.getFileLen(), + createSchemaFromFields(allFields), requiredSchema, hadoopConf), requiredSchema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java index cb19926ceebb..04a68cc90c6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,10 +49,18 @@ private static boolean hasTableMetadataFolder(FileSystem fs, Path path) { } } + public static boolean isHoodieTablePath(Path path, Configuration conf) throws IOException { + return isHoodieTablePath(path.getFileSystem(conf), path); + } + public static boolean isHoodieTablePath(FileSystem fs, Path path) { return hasTableMetadataFolder(fs, path); } + public static Option getTablePath(Path path, Configuration conf) throws HoodieException, IOException { + return getTablePath(path.getFileSystem(conf), path); + } + public static Option getTablePath(FileSystem fs, Path path) throws HoodieException, IOException { LOG.info("Getting table path from path : " + path); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java index 3aad5e9a0aa3..92344ba39ab2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java @@ -164,9 +164,8 @@ public IndexedRecord seal(IndexedRecord record) { } @Override - public ClosableIterator mergeBootstrapReaders( - ClosableIterator skeletonFileIterator, - ClosableIterator dataFileIterator) { + public ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, Schema skeletonRequiredSchema, + ClosableIterator dataFileIterator, Schema dataRequiredSchema) { return null; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java new file mode 100644 index 000000000000..03f77d848118 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieEmptyRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.hadoop.utils.ObjectInspectorCache; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; + +public class HiveHoodieReaderContext extends HoodieReaderContext { + protected final HoodieFileGroupReaderRecordReader.HiveReaderCreator readerCreator; + protected final InputSplit split; + protected final JobConf jobConf; + protected final Reporter reporter; + protected final Schema writerSchema; + protected Map hosts; + protected final Map columnTypeMap; + private final ObjectInspectorCache objectInspectorCache; + private RecordReader firstRecordReader = null; + + private final List partitionCols; + private final Set partitionColSet; + + private final String recordKeyField; + protected HiveHoodieReaderContext(HoodieFileGroupReaderRecordReader.HiveReaderCreator readerCreator, + InputSplit split, + JobConf jobConf, + Reporter reporter, + Schema writerSchema, + Map hosts, + HoodieTableMetaClient metaClient) { + this.readerCreator = readerCreator; + this.split = split; + this.jobConf = jobConf; + this.reporter = reporter; + this.writerSchema = writerSchema; + this.hosts = hosts; + this.partitionCols = HoodieFileGroupReaderRecordReader.getPartitionFieldNames(jobConf).stream() + .filter(n -> writerSchema.getField(n) != null).collect(Collectors.toList()); + this.partitionColSet = new HashSet<>(this.partitionCols); + String tableName = metaClient.getTableConfig().getTableName(); + recordKeyField = metaClient.getTableConfig().populateMetaFields() + ? HoodieRecord.RECORD_KEY_METADATA_FIELD + : assertSingleKey(metaClient.getTableConfig().getRecordKeyFields()); + this.objectInspectorCache = HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf); + this.columnTypeMap = objectInspectorCache.getColumnTypeMap(); + } + + /** + * If populate meta fields is false, then getRecordKeyFields() + * should return exactly 1 recordkey field. + */ + private static String assertSingleKey(Option recordKeyFieldsOpt) { + ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "No record key field set in table config, but populateMetaFields is disabled"); + ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "More than 1 record key set in table config, but populateMetaFields is disabled"); + return recordKeyFieldsOpt.get()[0]; + } + + @Override + public FileSystem getFs(String path, Configuration conf) { + return HadoopFSUtils.getFs(path, conf); + } + + @Override + public ClosableIterator getFileRecordIterator(Path filePath, long start, long length, Schema dataSchema, Schema requiredSchema, Configuration conf) throws IOException { + JobConf jobConfCopy = new JobConf(jobConf); + //move the partition cols to the end, because in some cases it has issues if we don't do that + Schema modifiedDataSchema = HoodieAvroUtils.generateProjectionSchema(dataSchema, Stream.concat(dataSchema.getFields().stream() + .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColSet.contains(n)), + partitionCols.stream().filter(c -> dataSchema.getField(c) != null)).collect(Collectors.toList())); + setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema); + InputSplit inputSplit = new FileSplit(filePath, start, length, hosts.get(filePath.toString())); + RecordReader recordReader = readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter); + if (firstRecordReader == null) { + firstRecordReader = recordReader; + } + ClosableIterator recordIterator = new RecordReaderValueIterator<>(recordReader); + if (modifiedDataSchema.equals(requiredSchema)) { + return recordIterator; + } + //The record reader puts the required columns in the positions of the data schema and nulls the rest of the columns + return new CloseableMappingIterator<>(recordIterator, projectRecord(modifiedDataSchema, requiredSchema)); + } + + private void setSchemas(JobConf jobConf, Schema dataSchema, Schema requiredSchema) { + List dataColumnNameList = dataSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList()); + List dataColumnTypeList = dataColumnNameList.stream().map(fieldName -> { + TypeInfo type = columnTypeMap.get(fieldName); + if (type == null) { + throw new IllegalArgumentException("Field: " + fieldName + ", does not have a defined type"); + } + return type; + }).collect(Collectors.toList()); + jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", dataColumnNameList)); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(","))); + //don't replace `f -> f.name()` with lambda reference + String readColNames = requiredSchema.getFields().stream().map(f -> f.name()).collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNames); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, requiredSchema.getFields() + .stream().map(f -> String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(","))); + } + + @Override + public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) { + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroRecord.getSchema(), true); + } + + @Override + public HoodieRecordMerger getRecordMerger(String mergerStrategy) { + switch (mergerStrategy) { + case DEFAULT_MERGER_STRATEGY_UUID: + return new HoodieHiveRecordMerger(); + default: + throw new HoodieException("The merger strategy UUID is not supported: " + mergerStrategy); + } + } + + @Override + public String getRecordKey(ArrayWritable record, Schema schema) { + return getValue(record, schema, recordKeyField).toString(); + } + + @Override + public Object getValue(ArrayWritable record, Schema schema, String fieldName) { + return StringUtils.isNullOrEmpty(fieldName) ? null : objectInspectorCache.getValue(record, schema, fieldName); + } + + @Override + public HoodieRecord constructHoodieRecord(Option recordOption, Map metadataMap) { + if (!recordOption.isPresent()) { + return new HoodieEmptyRecord<>(new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY), (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)), HoodieRecord.HoodieRecordType.HIVE); + } + Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA); + ArrayWritable writable = recordOption.get(); + return new HoodieHiveRecord(new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY), (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, objectInspectorCache); + } + + @Override + public ArrayWritable seal(ArrayWritable record) { + return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(), record.get().length)); + } + + @Override + public ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, + Schema skeletonRequiredSchema, + ClosableIterator dataFileIterator, + Schema dataRequiredSchema) { + int skeletonLen = skeletonRequiredSchema.getFields().size(); + int dataLen = dataRequiredSchema.getFields().size(); + return new ClosableIterator() { + + private final ArrayWritable returnWritable = new ArrayWritable(Writable.class); + @Override + public boolean hasNext() { + if (dataFileIterator.hasNext() != skeletonFileIterator.hasNext()) { + throw new IllegalStateException("bootstrap data file iterator and skeleton file iterator are out of sync"); + } + return dataFileIterator.hasNext(); + } + + @Override + public ArrayWritable next() { + Writable[] skeletonWritable = skeletonFileIterator.next().get(); + Writable[] dataWritable = dataFileIterator.next().get(); + Writable[] mergedWritable = new Writable[skeletonLen + dataLen]; + System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen); + System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen, dataLen); + returnWritable.set(mergedWritable); + return returnWritable; + } + + @Override + public void close() { + skeletonFileIterator.close(); + dataFileIterator.close(); + } + }; + } + + @Override + public UnaryOperator projectRecord(Schema from, Schema to) { + return HoodieArrayWritableAvroUtils.projectRecord(from, to); + } + + public UnaryOperator reverseProjectRecord(Schema from, Schema to) { + return HoodieArrayWritableAvroUtils.reverseProject(from, to); + } + + public long getPos() throws IOException { + if (firstRecordReader != null) { + return firstRecordReader.getPos(); + } + throw new IllegalStateException("getPos() should not be called before a record reader has been initialized"); + } + + public float getProgress() throws IOException { + if (firstRecordReader != null) { + return firstRecordReader.getProgress(); + } + throw new IllegalStateException("getProgress() should not be called before a record reader has been initialized"); + } + +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java new file mode 100644 index 000000000000..d8a45ed6b058 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderRecordReader.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TablePathUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED; +import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE; +import static org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE; +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; + +public class HoodieFileGroupReaderRecordReader implements RecordReader { + + public interface HiveReaderCreator { + org.apache.hadoop.mapred.RecordReader getRecordReader( + final org.apache.hadoop.mapred.InputSplit split, + final org.apache.hadoop.mapred.JobConf job, + final org.apache.hadoop.mapred.Reporter reporter + ) throws IOException; + } + + private final HiveHoodieReaderContext readerContext; + private final HoodieFileGroupReader fileGroupReader; + private final ArrayWritable arrayWritable; + private final NullWritable nullWritable = NullWritable.get(); + private final InputSplit inputSplit; + private final JobConf jobConfCopy; + private final UnaryOperator reverseProjection; + + public HoodieFileGroupReaderRecordReader(HiveReaderCreator readerCreator, + final InputSplit split, + final JobConf jobConf, + final Reporter reporter) throws IOException { + this.jobConfCopy = new JobConf(jobConf); + HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy); + Set partitionColumns = new HashSet<>(getPartitionFieldNames(jobConfCopy)); + this.inputSplit = split; + + FileSplit fileSplit = (FileSplit) split; + String tableBasePath = getTableBasePath(split, jobConfCopy); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(jobConfCopy) + .setBasePath(tableBasePath) + .build(); + String latestCommitTime = getLatestCommitTime(split, metaClient); + Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, latestCommitTime); + Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy); + Map hosts = new HashMap<>(); + this.readerContext = new HiveHoodieReaderContext(readerCreator, split, jobConfCopy, reporter, tableSchema, hosts, metaClient); + this.arrayWritable = new ArrayWritable(Writable.class, new Writable[requestedSchema.getFields().size()]); + //get some config vals + long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), MAX_MEMORY_FOR_MERGE.defaultValue()); + String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath()); + ExternalSpillableMap.DiskMapType spillMapType = ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(), + SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)); + boolean bitmaskCompressEnabled = jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), + DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); + + this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, jobConfCopy, tableBasePath, + latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, readerContext.getFs(tableBasePath, jobConfCopy), tableBasePath), + tableSchema, requestedSchema, metaClient.getTableConfig().getProps(), metaClient.getTableConfig(), fileSplit.getStart(), + fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath,spillMapType, bitmaskCompressEnabled); + this.fileGroupReader.initRecordIterators(); + //it expects the partition columns to be at the end + Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema, + Stream.concat(tableSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)), + partitionColumns.stream()).collect(Collectors.toList())); + this.reverseProjection = readerContext.reverseProjectRecord(requestedSchema, outputSchema); + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (!fileGroupReader.hasNext()) { + return false; + } + value.set(fileGroupReader.next().get()); + reverseProjection.apply(value); + return true; + } + + @Override + public NullWritable createKey() { + return nullWritable; + } + + @Override + public ArrayWritable createValue() { + return arrayWritable; + } + + @Override + public long getPos() throws IOException { + return readerContext.getPos(); + } + + @Override + public void close() throws IOException { + fileGroupReader.close(); + } + + @Override + public float getProgress() throws IOException { + return readerContext.getProgress(); + } + + public RealtimeSplit getSplit() { + return (RealtimeSplit) inputSplit; + } + + public JobConf getJobConf() { + return jobConfCopy; + } + + public static List getPartitionFieldNames(JobConf jobConf) { + String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + return partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + : new ArrayList<>(); + } + + private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, JobConf jobConf, String latestCommitTime) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { + Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, getPartitionFieldNames(jobConf)); + } catch (Exception e) { + throw new RuntimeException("Unable to get table schema", e); + } + } + + public static String getTableBasePath(InputSplit split, JobConf jobConf) throws IOException { + if (split instanceof RealtimeSplit) { + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + return realtimeSplit.getBasePath(); + } else { + Path inputPath = ((FileSplit)split).getPath(); + FileSystem fs = inputPath.getFileSystem(jobConf); + Option tablePath = TablePathUtils.getTablePath(fs, inputPath); + return tablePath.get().toString(); + } + } + + private static String getLatestCommitTime(InputSplit split, HoodieTableMetaClient metaClient) { + if (split instanceof RealtimeSplit) { + return ((RealtimeSplit) split).getMaxCommitTime(); + } + Option lastInstant = metaClient.getCommitsTimeline().lastInstant(); + if (lastInstant.isPresent()) { + return lastInstant.get().getTimestamp(); + } else { + return ""; + } + } + + /** + * Convert FileSplit to FileSlice, but save the locations in 'hosts' because that data is otherwise lost. + */ + private static FileSlice getFileSliceFromSplit(FileSplit split, Map hosts, FileSystem fs, String tableBasePath) throws IOException { + BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs); + if (split instanceof RealtimeSplit) { + //mor + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + boolean isLogFile = FSUtils.isLogFile(realtimeSplit.getPath()); + String fileID; + String commitTime; + if (isLogFile) { + fileID = FSUtils.getFileIdFromLogPath(realtimeSplit.getPath()); + commitTime = FSUtils.getDeltaCommitTimeFromLogPath(realtimeSplit.getPath()); + } else { + fileID = FSUtils.getFileId(realtimeSplit.getPath().getName()); + commitTime = FSUtils.getCommitTime(realtimeSplit.getPath().toString()); + } + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(FSUtils.getPartitionPath(realtimeSplit.getBasePath(), + realtimeSplit.getPath().getParent().toString()).toString(), fileID); + if (isLogFile) { + return new FileSlice(fileGroupId, commitTime, null, realtimeSplit.getDeltaLogFiles()); + } + hosts.put(realtimeSplit.getPath().toString(), realtimeSplit.getLocations()); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fs.getFileStatus(realtimeSplit.getPath()), bootstrapBaseFile); + return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, realtimeSplit.getDeltaLogFiles()); + } + //cow + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(FSUtils.getFileId(split.getPath().getName()), + FSUtils.getPartitionPath(tableBasePath, split.getPath().getParent().toString()).toString()); + hosts.put(split.getPath().toString(), split.getLocations()); + return new FileSlice(fileGroupId, FSUtils.getCommitTime(split.getPath().toString()), new HoodieBaseFile(fs.getFileStatus(split.getPath()), bootstrapBaseFile), Collections.emptyList()); + } + + private static BaseFile createBootstrapBaseFile(FileSplit split, Map hosts, FileSystem fs) throws IOException { + if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) split; + FileSplit bootstrapFileSplit = bootstrapBaseFileSplit.getBootstrapFileSplit(); + hosts.put(bootstrapFileSplit.getPath().toString(), bootstrapFileSplit.getLocations()); + return new BaseFile(fs.getFileStatus(bootstrapFileSplit.getPath())); + } + return null; + } + + private static Schema createRequestedSchema(Schema tableSchema, JobConf jobConf) { + String readCols = jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (StringUtils.isNullOrEmpty(readCols)) { + Schema emptySchema = Schema.createRecord(tableSchema.getName(), tableSchema.getDoc(), + tableSchema.getNamespace(), tableSchema.isError()); + emptySchema.setFields(Collections.emptyList()); + return emptySchema; + } + //hive will handle the partition cols + String partitionColString = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + Set partitionColumns; + if (partitionColString == null) { + partitionColumns = Collections.emptySet(); + } else { + partitionColumns = Arrays.stream(partitionColString.split(",")).collect(Collectors.toSet()); + } + //if they are actually written to the file, then it is ok to read them from the file + tableSchema.getFields().forEach(f -> partitionColumns.remove(f.name().toLowerCase(Locale.ROOT))); + return HoodieAvroUtils.generateProjectionSchema(tableSchema, + Arrays.stream(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",")).filter(c -> !partitionColumns.contains(c)).collect(Collectors.toList())); + } + + /** + * `schema.on.read` and skip merge not implemented + */ + public static boolean useFilegroupReader(final JobConf jobConf) { + return jobConf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue()) + && !jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()) + && !jobConf.getBoolean(HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP, false); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java new file mode 100644 index 000000000000..7efcd5fea75a --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; +import org.apache.hudi.hadoop.utils.ObjectInspectorCache; +import org.apache.hudi.keygen.BaseKeyGenerator; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +public class HoodieHiveRecord extends HoodieRecord { + + private boolean copy; + private boolean isDeleted; + + public boolean isDeleted() { + return isDeleted; + } + + private final ArrayWritableObjectInspector objectInspector; + + private final ObjectInspectorCache objectInspectorCache; + + protected Schema schema; + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, ObjectInspectorCache objectInspectorCache) { + super(key, data); + this.objectInspector = objectInspectorCache.getObjectInspector(schema); + this.objectInspectorCache = objectInspectorCache; + this.schema = schema; + this.copy = false; + isDeleted = data == null; + } + + private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HoodieOperation operation, boolean isCopy, + ArrayWritableObjectInspector objectInspector, ObjectInspectorCache objectInspectorCache) { + super(key, data, operation, Option.empty()); + this.schema = schema; + this.copy = isCopy; + isDeleted = data == null; + this.objectInspector = objectInspector; + this.objectInspectorCache = objectInspectorCache; + } + + @Override + public HoodieRecord newInstance() { + return new HoodieHiveRecord(this.key, this.data, this.schema, this.operation, this.copy, this.objectInspector, this.objectInspectorCache); + } + + @Override + public HoodieRecord newInstance(HoodieKey key, HoodieOperation op) { + throw new UnsupportedOperationException("ObjectInspector is needed for HoodieHiveRecord"); + } + + @Override + public HoodieRecord newInstance(HoodieKey key) { + throw new UnsupportedOperationException("ObjectInspector is needed for HoodieHiveRecord"); + } + + @Override + public Comparable getOrderingValue(Schema recordSchema, Properties props) { + String orderingField = ConfigUtils.getOrderingField(props); + if (orderingField == null) { + return 0; + //throw new IllegalArgumentException("Ordering Field is not set. Precombine must be set. (If you are using a custom record merger it might be something else)"); + } + return (Comparable) getValue(ConfigUtils.getOrderingField(props)); + } + + @Override + public HoodieRecordType getRecordType() { + return HoodieRecordType.HIVE; + } + + @Override + public String getRecordKey(Schema recordSchema, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public String getRecordKey(Schema recordSchema, String keyFieldName) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + protected void writeRecordPayload(ArrayWritable payload, Kryo kryo, Output output) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + protected ArrayWritable readRecordPayload(Kryo kryo, Input input) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) { + Object[] objects = new Object[columns.length]; + for (int i = 0; i < objects.length; i++) { + objects[i] = getValue(columns[i]); + } + return objects; + } + + @Override + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public boolean isDelete(Schema recordSchema, Properties props) throws IOException { + if (null == data) { + return true; + } + if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) { + return false; + } + Object deleteMarker = getValue(HoodieRecord.HOODIE_IS_DELETED_FIELD); + return deleteMarker instanceof BooleanWritable && ((BooleanWritable) deleteMarker).get(); + } + + @Override + public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException { + return false; + } + + @Override + public HoodieRecord copy() { + if (!copy) { + this.data = new ArrayWritable(Writable.class, Arrays.copyOf(this.data.get(), this.data.get().length)); + this.copy = true; + } + return this; + } + + @Override + public Option> getMetadata() { + // TODO HUDI-5282 support metaData + return Option.empty(); + } + + @Override + public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema recordSchema, Properties props, Option> simpleKeyGenFieldsOpt, Boolean withOperation, + Option partitionNameOp, Boolean populateMetaFieldsOp, Option schemaWithoutMetaFields) throws IOException { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option keyGen) { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + @Override + public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException { + data.get()[recordSchema.getIndexNamed(keyFieldName)] = new Text(); + return this; + } + + @Override + public Option toIndexedRecord(Schema recordSchema, Properties props) throws IOException { + throw new UnsupportedOperationException("Not supported for HoodieHiveRecord"); + } + + private Object getValue(String name) { + return HoodieArrayWritableAvroUtils.getWritableValue(data, objectInspector, name); + } + + protected Schema getSchema() { + return schema; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java new file mode 100644 index 000000000000..17a4738569e5 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.avro.Schema; + +import java.io.IOException; + +public class HoodieHiveRecordMerger implements HoodieRecordMerger { + @Override + public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { + ValidationUtils.checkArgument(older.getRecordType() == HoodieRecord.HoodieRecordType.HIVE); + ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecord.HoodieRecordType.HIVE); + if (newer instanceof HoodieHiveRecord) { + HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer; + if (newHiveRecord.isDeleted()) { + return Option.empty(); + } + } else if (newer.getData() == null) { + return Option.empty(); + } + + if (older instanceof HoodieHiveRecord) { + HoodieHiveRecord oldHiveRecord = (HoodieHiveRecord) older; + if (oldHiveRecord.isDeleted()) { + return Option.of(Pair.of(newer, newSchema)); + } + } else if (older.getData() == null) { + return Option.empty(); + } + if (older.getOrderingValue(oldSchema, props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) { + return Option.of(Pair.of(older, oldSchema)); + } else { + return Option.of(Pair.of(newer, newSchema)); + } + } + + @Override + public HoodieRecord.HoodieRecordType getRecordType() { + return HoodieRecord.HoodieRecordType.HIVE; + } + + @Override + public String getMergingStrategy() { + return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 9e6565299040..b950bdfefb57 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -19,11 +19,16 @@ package org.apache.hudi.hadoop; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -35,9 +40,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.parquet.hadoop.ParquetInputFormat; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +94,42 @@ private void initAvroInputFormat() { } } + private static boolean checkIfHudiTable(final InputSplit split, final JobConf job) { + try { + Option tablePathOpt = TablePathUtils.getTablePath(((FileSplit) split).getPath(), job); + if (!tablePathOpt.isPresent()) { + return false; + } + return tablePathOpt.get().getFileSystem(job).exists(new Path(tablePathOpt.get(), HoodieTableMetaClient.METAFOLDER_NAME)); + } catch (IOException e) { + return false; + } + } + @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { + HoodieRealtimeInputFormatUtils.addProjectionField(job, job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/")); + if (HoodieFileGroupReaderRecordReader.useFilegroupReader(job)) { + try { + if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) { + return super.getRecordReader(split, job, reporter); + } + if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { + return new HoodieFileGroupReaderRecordReader((s, j, r) -> { + try { + return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), s, j, r); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, split, job, reporter); + } else { + return new HoodieFileGroupReaderRecordReader(super::getRecordReader, split, job, reporter); + } + } catch (final IOException e) { + throw new RuntimeException("Cannot create a RecordReaderWrapper", e); + } + } // TODO enable automatic predicate pushdown after fixing issues // FileSplit fileSplit = (FileSplit) split; // HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent()); @@ -117,7 +153,6 @@ public RecordReader getRecordReader(final InputSpli LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split); } - HoodieRealtimeInputFormatUtils.addProjectionField(job, job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/")); return getRecordReaderInternal(split, job, reporter); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java index 7ffa3bf555c0..c08c358c0c87 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.mapred.RecordReader; @@ -25,7 +26,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Iterator; import java.util.NoSuchElementException; /** @@ -34,7 +34,7 @@ * @param Key Type * @param Value Type */ -public class RecordReaderValueIterator implements Iterator { +public class RecordReaderValueIterator implements ClosableIterator { private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class); @@ -79,7 +79,12 @@ public V next() { return retVal; } - public void close() throws IOException { - this.reader.close(); + @Override + public void close() { + try { + this.reader.close(); + } catch (IOException e) { + throw new RuntimeException("Could not close reader", e); + } } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java index 1edf29d45d57..034d7c6b69c8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java @@ -19,8 +19,10 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieFileGroupReaderRecordReader; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -42,19 +44,29 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader recordReaders = new LinkedList<>(); + private List recordReaders = new LinkedList<>(); // Points to the currently iterating record reader - HoodieRealtimeRecordReader currentRecordReader; + private RecordReader currentRecordReader; + + private final boolean useFileGroupReader; public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split, List readers) { + useFileGroupReader = HoodieFileGroupReaderRecordReader.useFilegroupReader(jobConf); try { ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers .size(), "Num Splits does not match number of unique RecordReaders!"); for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) { - LOG.info("Creating new RealtimeRecordReader for split"); - recordReaders.add( - new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0))); + if (useFileGroupReader) { + LOG.info("Creating new HoodieFileGroupReaderRecordReader for split"); + RecordReader reader = readers.remove(0); + ValidationUtils.checkArgument(reader instanceof HoodieFileGroupReaderRecordReader, reader.toString() + "not instance of HoodieFileGroupReaderRecordReader "); + recordReaders.add(reader); + } else { + LOG.info("Creating new RealtimeRecordReader for split"); + recordReaders.add( + new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0))); + } } currentRecordReader = recordReaders.remove(0); } catch (Exception e) { @@ -69,9 +81,20 @@ public boolean next(NullWritable key, ArrayWritable value) throws IOException { } else if (recordReaders.size() > 0) { this.currentRecordReader.close(); this.currentRecordReader = recordReaders.remove(0); - AbstractRealtimeRecordReader reader = (AbstractRealtimeRecordReader)currentRecordReader.getReader(); + RecordReader reader; + JobConf jobConf; + Path path; + if (useFileGroupReader) { + reader = currentRecordReader; + jobConf = ((HoodieFileGroupReaderRecordReader) reader).getJobConf(); + path = ((HoodieFileGroupReaderRecordReader) reader).getSplit().getPath(); + } else { + reader = ((HoodieRealtimeRecordReader)currentRecordReader).getReader(); + jobConf = ((AbstractRealtimeRecordReader) reader).getJobConf(); + path = ((AbstractRealtimeRecordReader) reader).getSplit().getPath(); + } // when switch reader, ioctx should be updated - IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath()); + IOContextMap.get(jobConf).setInputPath(path); return next(key, value); } else { return false; @@ -80,12 +103,20 @@ public boolean next(NullWritable key, ArrayWritable value) throws IOException { @Override public NullWritable createKey() { - return this.currentRecordReader.createKey(); + if (useFileGroupReader) { + return ((HoodieFileGroupReaderRecordReader) this.currentRecordReader).createKey(); + } else { + return ((HoodieRealtimeRecordReader) this.currentRecordReader).createKey(); + } } @Override public ArrayWritable createValue() { - return this.currentRecordReader.createValue(); + if (useFileGroupReader) { + return ((HoodieFileGroupReaderRecordReader) this.currentRecordReader).createValue(); + } else { + return ((HoodieRealtimeRecordReader) this.currentRecordReader).createValue(); + } } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index c3d2c0d63b57..3974a4c7e3bd 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieFileGroupReaderRecordReader; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; @@ -69,6 +70,11 @@ public RecordReader getRecordReader(final InputSpli ValidationUtils.checkArgument(split instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); RealtimeSplit realtimeSplit = (RealtimeSplit) split; + + if (HoodieFileGroupReaderRecordReader.useFilegroupReader(jobConf)) { + return super.getRecordReader(realtimeSplit, jobConf, reporter); + } + // add preCombineKey HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build(); HoodieTableConfig tableConfig = metaClient.getTableConfig(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java new file mode 100644 index 000000000000..a2da796c6f77 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.common.util.collection.Pair; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.List; +import java.util.function.UnaryOperator; + +public class HoodieArrayWritableAvroUtils { + + private static final Cache + OBJECT_INSPECTOR_TABLE_CACHE = Caffeine.newBuilder().maximumSize(1000).build(); + + public static ObjectInspectorCache getCacheForTable(String table, Schema tableSchema, JobConf jobConf) { + ObjectInspectorCache cache = OBJECT_INSPECTOR_TABLE_CACHE.getIfPresent(table); + if (cache == null) { + cache = new ObjectInspectorCache(tableSchema, jobConf); + } + return cache; + } + + private static final Cache, int[]> + PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build(); + + public static int[] getProjection(Schema from, Schema to) { + return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { + List toFields = to.getFields(); + int[] newProjection = new int[toFields.size()]; + for (int i = 0; i < newProjection.length; i++) { + newProjection[i] = from.getField(toFields.get(i).name()).pos(); + } + return newProjection; + }); + } + + /** + * Projection will keep the size from the "from" schema because it gets recycled + * and if the size changes the reader will fail + */ + public static UnaryOperator projectRecord(Schema from, Schema to) { + int[] projection = getProjection(from, to); + return arrayWritable -> { + Writable[] values = new Writable[arrayWritable.get().length]; + for (int i = 0; i < projection.length; i++) { + values[i] = arrayWritable.get()[projection[i]]; + } + arrayWritable.set(values); + return arrayWritable; + }; + } + + public static int[] getReverseProjection(Schema from, Schema to) { + return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> { + List fromFields = from.getFields(); + int[] newProjection = new int[fromFields.size()]; + for (int i = 0; i < newProjection.length; i++) { + newProjection[i] = to.getField(fromFields.get(i).name()).pos(); + } + return newProjection; + }); + } + + /** + * After the reading and merging etc is done, we need to put the records + * into the positions of the original schema + */ + public static UnaryOperator reverseProject(Schema from, Schema to) { + int[] projection = getReverseProjection(from, to); + return arrayWritable -> { + Writable[] values = new Writable[to.getFields().size()]; + for (int i = 0; i < projection.length; i++) { + values[projection[i]] = arrayWritable.get()[i]; + } + arrayWritable.set(values); + return arrayWritable; + }; + } + + public static Object getWritableValue(ArrayWritable arrayWritable, ArrayWritableObjectInspector objectInspector, String name) { + return objectInspector.getStructFieldData(arrayWritable, objectInspector.getStructFieldRef(name)); + } +} + + diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java new file mode 100644 index 000000000000..ddcc28851dfd --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * To read value from an ArrayWritable, an ObjectInspector is needed. + * Object inspectors are cached here or created using the column type map. + */ +public class ObjectInspectorCache { + private final Map columnTypeMap = new HashMap<>(); + private final Cache + objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build(); + + public Map getColumnTypeMap() { + return columnTypeMap; + } + + public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) { + //From AbstractRealtimeRecordReader#prepareHiveAvroSerializer + // hive will append virtual columns at the end of column list. we should remove those columns. + // eg: current table is col1, col2, col3; jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 ,BLOCK__OFFSET__INSIDE__FILE ... + Set writerSchemaColNames = tableSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet()); + List columnNameList = Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList()); + List columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); + + int columnNameListLen = columnNameList.size() - 1; + for (int i = columnNameListLen; i >= 0; i--) { + String lastColName = columnNameList.get(columnNameList.size() - 1); + // virtual columns will only append at the end of column list. it will be ok to break the loop. + if (writerSchemaColNames.contains(lastColName)) { + break; + } + columnNameList.remove(columnNameList.size() - 1); + columnTypeList.remove(columnTypeList.size() - 1); + } + + //Use columnNameList.size() instead of columnTypeList because the type list is longer for some reason + IntStream.range(0, columnNameList.size()).boxed().forEach(i -> columnTypeMap.put(columnNameList.get(i), + TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0))); + + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + ArrayWritableObjectInspector objectInspector = new ArrayWritableObjectInspector(rowTypeInfo); + objectInspectorCache.put(tableSchema, objectInspector); + } + + public ArrayWritableObjectInspector getObjectInspector(Schema schema) { + return objectInspectorCache.get(schema, s -> { + List columnNameList = s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List columnTypeList = columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList()); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + return new ArrayWritableObjectInspector(rowTypeInfo); + }); + + } + + public Object getValue(ArrayWritable record, Schema schema, String fieldName) { + try { + ArrayWritableObjectInspector objectInspector = getObjectInspector(schema); + return objectInspector.getStructFieldData(record, objectInspector.getStructFieldRef(fieldName)); + } catch (Exception e) { + throw e; + } + + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 5bf0a255eb4e..148830468a08 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -30,10 +31,10 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; @@ -777,59 +778,65 @@ private void ensureRecordsInCommit(String msg, String commit, int expectedNumber @Test public void testHoodieParquetInputFormatReadTimeType() throws IOException { - long testTimestampLong = System.currentTimeMillis(); - int testDate = 19116;// 2022-05-04 - - Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), "/test_timetype.avsc"); - String commit = "20160628071126"; - HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), - HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); - java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); - String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", - HoodieFileFormat.PARQUET.getFileExtension()); - try (AvroParquetWriter parquetWriter = new AvroParquetWriter( - new Path(partitionPath.resolve(fileId).toString()), schema)) { - GenericData.Record record = new GenericData.Record(schema); - record.put("test_timestamp", testTimestampLong * 1000); - record.put("test_long", testTimestampLong * 1000); - record.put("test_date", testDate); - record.put("_hoodie_commit_time", commit); - record.put("_hoodie_commit_seqno", commit + 1); - parquetWriter.write(record); - } - - jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); - jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); - jobConf.set(READ_COLUMN_NAMES_CONF_STR, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); - InputFormatTestUtil.setupPartition(basePath, partitionPath); - InputFormatTestUtil.commit(basePath, commit); - FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + try { + long testTimestampLong = System.currentTimeMillis(); + int testDate = 19116;// 2022-05-04 + + Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), "/test_timetype.avsc"); + String commit = "20160628071126"; + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); + String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", + HoodieFileFormat.PARQUET.getFileExtension()); + try (AvroParquetWriter parquetWriter = new AvroParquetWriter( + new Path(partitionPath.resolve(fileId).toString()), schema)) { + GenericData.Record record = new GenericData.Record(schema); + record.put("test_timestamp", testTimestampLong * 1000); + record.put("test_long", testTimestampLong * 1000); + record.put("test_date", testDate); + record.put("_hoodie_commit_time", commit); + record.put("_hoodie_commit_seqno", commit + 1); + parquetWriter.write(record); + } - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - for (InputSplit split : splits) { - RecordReader recordReader = inputFormat - .getRecordReader(split, jobConf, null); - NullWritable key = recordReader.createKey(); - ArrayWritable writable = recordReader.createValue(); - while (recordReader.next(key, writable)) { - // test timestamp - if (HiveVersionInfo.getShortVersion().startsWith("3")) { - LocalDateTime localDateTime = LocalDateTime.ofInstant( - Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); - assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); - } else { - Date date = new Date(); - date.setTime(testTimestampLong); - Timestamp actualTime = ((TimestampWritable) writable.get()[0]).getTimestamp(); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - assertEquals(dateFormat.format(date), dateFormat.format(actualTime)); + //this is not a hoodie table!! + jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); + jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); + jobConf.set(READ_COLUMN_NAMES_CONF_STR, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + InputFormatTestUtil.setupPartition(basePath, partitionPath); + InputFormatTestUtil.commit(basePath, commit); + FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat + .getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + while (recordReader.next(key, writable)) { + // test timestamp + if (HiveVersionInfo.getShortVersion().startsWith("3")) { + LocalDateTime localDateTime = LocalDateTime.ofInstant( + Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); + assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); + } else { + Date date = new Date(); + date.setTime(testTimestampLong); + Timestamp actualTime = ((TimestampWritable) writable.get()[0]).getTimestamp(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + assertEquals(dateFormat.format(date), dateFormat.format(actualTime)); + } + // test long + assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); + // test date + assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); } - // test long - assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); - // test date - assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); + recordReader.close(); } - recordReader.close(); + } finally { + jobConf.unset(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key()); } } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index 22e5389a9300..39663231fa5d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; @@ -237,7 +238,18 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; - InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + + String hiveColumnNames = fields.stream().map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",year,month,day"; + String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes); + modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string,string,string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); // unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718 jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java index 2ebc8d1b824a..c5dd1d0dc187 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -87,6 +88,7 @@ public void setUp() { baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS); baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES); + baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); fs = getFs(basePath.toUri().toString(), baseJobConf); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index af2c5dd2cca8..a2dfed75c0c8 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -34,10 +35,10 @@ import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; @@ -117,6 +118,7 @@ public void setUp() { hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); baseJobConf = new JobConf(hadoopConf); baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); + baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); fs = HadoopFSUtils.getFs(basePath.toUri().toString(), baseJobConf); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java new file mode 100644 index 000000000000..12676c3ba18d --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.JobConf; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieArrayWritableAvroUtils { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + Schema tableSchema = HoodieTestDataGenerator.AVRO_SCHEMA; + ObjectInspectorCache objectInspectorCache; + + @BeforeEach + public void setup() { + List fields = tableSchema.getFields(); + Configuration conf = HoodieTestUtils.getDefaultHadoopConf(); + JobConf jobConf = new JobConf(conf); + jobConf.set(serdeConstants.LIST_COLUMNS, fields.stream().map(Schema.Field::name).collect(Collectors.joining(","))); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES); + objectInspectorCache = new ObjectInspectorCache(HoodieTestDataGenerator.AVRO_SCHEMA, jobConf); + } + + @Test + public void testProjection() { + Schema from = tableSchema; + Schema to = HoodieAvroUtils.generateProjectionSchema(from, Arrays.asList("trip_type", "current_ts", "weight")); + UnaryOperator projection = HoodieArrayWritableAvroUtils.projectRecord(from, to); + UnaryOperator reverseProjection = HoodieArrayWritableAvroUtils.reverseProject(to, from); + + //We reuse the ArrayWritable, so we need to get the values before projecting + ArrayWritable record = convertArrayWritable(dataGen.generateGenericRecord()); + Object tripType = objectInspectorCache.getValue(record, from, "trip_type"); + Object currentTs = objectInspectorCache.getValue(record, from, "current_ts"); + Object weight = objectInspectorCache.getValue(record, from, "weight"); + + //Make sure the projected fields can be read + ArrayWritable projectedRecord = projection.apply(record); + assertEquals(tripType, objectInspectorCache.getValue(projectedRecord, to, "trip_type")); + assertEquals(currentTs, objectInspectorCache.getValue(projectedRecord, to, "current_ts")); + assertEquals(weight, objectInspectorCache.getValue(projectedRecord, to, "weight")); + + //Reverse projection, the fields are in the original spots, but only the fields we set can be read. + //Therefore, we can only check the 3 fields that were in the projection + ArrayWritable reverseProjected = reverseProjection.apply(projectedRecord); + assertEquals(tripType, objectInspectorCache.getValue(reverseProjected, from, "trip_type")); + assertEquals(currentTs, objectInspectorCache.getValue(reverseProjected, from, "current_ts")); + assertEquals(weight, objectInspectorCache.getValue(reverseProjected, from, "weight")); + } + + private static ArrayWritable convertArrayWritable(GenericRecord record) { + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(), false); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 1368f0e1fe31..88f20b7692b0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -248,6 +248,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec long timestamp = Instant.now().toEpochMilli(); Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); HoodieWriteConfig config = getConfigBuilder(schema.toString()) + .withPreCombineField("timestamp") .withAutoCommit(true) .withSchema(schema.toString()) .withKeyGenerator(keyGeneratorClass) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index a5a45cabf81d..806f77544231 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -19,6 +19,7 @@ package org.apache.hudi.functional; import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -106,6 +107,7 @@ public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName)); JobConf jobConf = new JobConf(); + jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true"); jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new"); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); diff --git a/hudi-sync/hudi-hive-sync/run_sync_tool.sh b/hudi-sync/hudi-hive-sync/run_sync_tool.sh index d5d921eb176d..6b359be61522 100755 --- a/hudi-sync/hudi-hive-sync/run_sync_tool.sh +++ b/hudi-sync/hudi-hive-sync/run_sync_tool.sh @@ -47,7 +47,9 @@ if [ -z "${HIVE_JDBC}" ]; then HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'` fi HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'` -HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON +HIVE_CALCITE=`ls ${HIVE_HOME}/lib/calcite-core-*.jar | tr '\n' ':'` +HIVE_LIBFB=`ls ${HIVE_HOME}/lib/libfb*.jar | tr '\n' ':'` +HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_CALCITE:$HIVE_LIBFB HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*