From 058e74d9ecd541219d68a6d74bf8cce47c1e845d Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Fri, 5 Feb 2021 18:04:47 -0800 Subject: [PATCH] ORC: Grow list and map child vectors with a growth factor of 3 (#2218) --- .../iceberg/data/orc/GenericOrcWriters.java | 13 ++- .../iceberg/flink/data/FlinkOrcWriters.java | 13 ++- .../spark/data/SparkOrcValueWriters.java | 13 ++- .../IcebergSourceNestedListDataBenchmark.java | 56 +++++++++++ ...SourceNestedListORCDataWriteBenchmark.java | 99 +++++++++++++++++++ ...ceNestedListParquetDataWriteBenchmark.java | 89 +++++++++++++++++ 6 files changed, 274 insertions(+), 9 deletions(-) create mode 100644 spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java create mode 100644 spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java create mode 100644 spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 00e9121ac..afc92eb55 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -430,7 +430,7 @@ public void nonNullWrite(int rowId, List value, ColumnVector output) { cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); + growColumnVector(cv.child, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child); @@ -466,8 +466,8 @@ public void nonNullWrite(int rowId, Map map, ColumnVector output) { cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); + growColumnVector(cv.keys, cv.childCount); + growColumnVector(cv.values, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int pos = (int) (e + cv.offsets[rowId]); @@ -476,4 +476,11 @@ public void nonNullWrite(int rowId, Map map, ColumnVector output) { } } } + + private static void growColumnVector(ColumnVector cv, int requestedSize) { + if (cv.isNull.length < requestedSize) { + // Use growth factor of 3 to avoid frequent array allocations + cv.ensureSize(requestedSize * 3, true); + } + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 800ad2071..285567170 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -247,7 +247,7 @@ public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough. - cv.child.ensureSize(cv.childCount, true); + growColumnVector(cv.child, cv.childCount); for (int e = 0; e < cv.lengths[rowId]; ++e) { Object value = elementGetter.getElementOrNull(data, e); @@ -287,8 +287,8 @@ public void nonNullWrite(int rowId, MapData data, ColumnVector output) { cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); + growColumnVector(cv.keys, cv.childCount); + growColumnVector(cv.values, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int pos = (int) (e + cv.offsets[rowId]); @@ -330,4 +330,11 @@ public void nonNullWrite(int rowId, RowData data, ColumnVector output) { } } } + + private static void growColumnVector(ColumnVector cv, int requestedSize) { + if (cv.isNull.length < requestedSize) { + // Use growth factor of 3 to avoid frequent array allocations + cv.ensureSize(requestedSize * 3, true); + } + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 9d517983e..b6653b57f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -238,7 +238,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); + growColumnVector(cv.child, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child); @@ -266,8 +266,8 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV cv.offsets[rowId] = cv.childCount; cv.childCount = (int) (cv.childCount + cv.lengths[rowId]); // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); + growColumnVector(cv.keys, cv.childCount); + growColumnVector(cv.values, cv.childCount); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int pos = (int) (e + cv.offsets[rowId]); @@ -276,4 +276,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } } } + + private static void growColumnVector(ColumnVector cv, int requestedSize) { + if (cv.isNull.length < requestedSize) { + // Use growth factor of 3 to avoid frequent array allocations + cv.ensureSize(requestedSize * 3, true); + } + } } diff --git a/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java new file mode 100644 index 000000000..369a1507b --- /dev/null +++ b/spark2/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedListDataBenchmark.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class IcebergSourceNestedListDataBenchmark extends IcebergSourceBenchmark { + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(0, "id", Types.LongType.get()), + optional(1, "outerlist", Types.ListType.ofOptional(2, + Types.StructType.of(required(3, "innerlist", Types.ListType.ofRequired(4, Types.StringType.get()))) + )) + ); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + return tables.create(schema, partitionSpec, properties, newTableLocation()); + } +} diff --git a/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java new file mode 100644 index 000000000..a1106cb88 --- /dev/null +++ b/spark2/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.orc; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.array_repeat; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark2:jmh + * -PjmhIncludeRegex=IcebergSourceNestedListORCDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-list-orc-data-write-benchmark-result.txt + * + */ +public class IcebergSourceNestedListORCDataWriteBenchmark extends IcebergSourceNestedListDataBenchmark { + + @Setup + public void setupBenchmark() { + setupSpark(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Param({"2000", "20000"}) + private int numRows; + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").option("write-format", "orc") + .mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeIcebergDictionaryOff() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put("orc.dictionary.key.threshold", "0"); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").option("write-format", "orc") + .mode(SaveMode.Append).save(tableLocation); + }); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + benchmarkData().write().mode(SaveMode.Append).orc(dataLocation()); + } + + private Dataset benchmarkData() { + return spark().range(numRows) + .withColumn("outerlist", array_repeat(struct( + expr("array_repeat(CAST(id AS string), 1000) AS innerlist")), + 10)) + .coalesce(1); + } +} diff --git a/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java new file mode 100644 index 000000000..160b3707c --- /dev/null +++ b/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.array_repeat; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark2:jmh + * -PjmhIncludeRegex=IcebergSourceNestedListParquetDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-list-parquet-data-write-benchmark-result.txt + * + */ +public class IcebergSourceNestedListParquetDataWriteBenchmark extends IcebergSourceNestedListDataBenchmark { + + @Setup + public void setupBenchmark() { + setupSpark(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Param({"2000", "20000"}) + private int numRows; + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip"); + withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation())); + } + + private Dataset benchmarkData() { + return spark().range(numRows) + .withColumn("outerlist", array_repeat(struct( + expr("array_repeat(CAST(id AS string), 1000) AS innerlist")), + 10)) + .coalesce(1); + } +}