From 74f6bdca2bce67e1758851bf8a3bc883204cf934 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 5 Mar 2024 16:32:31 -0800 Subject: [PATCH 1/9] Add benchmark base class and skipping index suite Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 131 ++++++++++++++++++ .../sql/benchmark/FlintSparkBenchmark.scala | 42 ++++++ 2 files changed, 173 insertions(+) create mode 100644 integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala create mode 100644 integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala new file mode 100644 index 000000000..919835113 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.benchmark + +import scala.util.Random + +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName + +import org.apache.spark.benchmark.Benchmark + +object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { + + private val noIndexTableNamePrefix = "spark_catalog.default.no_index_test" + private val valueSetTableNamePrefix = "spark_catalog.default.value_set_test" + private val bloomFilterTableNamePrefix = "spark_catalog.default.bloom_filter_test" + + override def afterAll(): Unit = { + val prefixes = + Array(noIndexTableNamePrefix, valueSetTableNamePrefix, bloomFilterTableNamePrefix) + for (prefix <- prefixes) { + val tables = spark.sql(s"SHOW TABLES IN spark_catalog.default LIKE '$prefix%'").collect() + for (table <- tables) { + spark.sql(s"DROP TABLE $table") + } + } + super.afterAll() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + beforeAll() + + runBenchmark("Skipping Index Write") { + runWriteBenchmark(1000, 16) + runWriteBenchmark(1000, 64) + runWriteBenchmark(1000, 512) + } + + runBenchmark("Skipping Index Read") { + runReadBenchmark(1000, 16) + runReadBenchmark(1000, 64) + runReadBenchmark(1000, 512) + } + } + + private def runWriteBenchmark(N: Int, cardinality: Int): Unit = { + val numFiles = 10 + val benchmark = new Benchmark( + s"Write $N rows with cardinality $cardinality in $numFiles files", + numFiles * N, + output = output) + + // ValueSet write + val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" + createTestTable(valueSetTableName, numFiles, N, cardinality) + flint + .skippingIndex() + .onTable(valueSetTableName) + .addValueSet("col") + .create() + benchmark.addCase("ValueSet Write") { _ => + flint.refreshIndex(getSkippingIndexName(valueSetTableName)) + } + + // BloomFilter write + val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" + createTestTable(bloomFilterTableName, numFiles, N, cardinality) + flint + .skippingIndex() + .onTable(bloomFilterTableName) + .addValueSet("col") + .create() + benchmark.addCase("BloomFilter Write") { _ => + flint.refreshIndex(getSkippingIndexName(bloomFilterTableName)) + } + benchmark.run() + } + + private def runReadBenchmark(N: Int, cardinality: Int): Unit = { + val numFiles = 10 + val benchmark = new Benchmark( + s"Read $N rows with cardinality $cardinality in $numFiles files", + numFiles * N, + output = output) + + val uniqueValue = cardinality + 10 + val noIndexTableName = s"${noIndexTableNamePrefix}_${N}_$cardinality" + createTestTable(noIndexTableName, numFiles, N, cardinality) + benchmark.addCase("No Index Read") { _ => + spark.sql(s"SELECT * FROM $noIndexTableName WHERE col = $uniqueValue") + } + + val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" + // spark.sql(s"INSERT INTO $valueSetTableName VALUES ($uniqueValue)") + benchmark.addCase("ValueSet Read") { _ => + spark.sql(s"SELECT * FROM $valueSetTableName WHERE col = $uniqueValue") + } + + val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" + // spark.sql(s"INSERT INTO $valueSetTableName VALUES ($uniqueValue)") + benchmark.addCase("BloomFilter Read") { _ => + spark.sql(s"SELECT * FROM $bloomFilterTableName WHERE col = $uniqueValue") + } + benchmark.run() + } + + private def createTestTable( + tableName: String, + numFiles: Int, + numRows: Int, + cardinality: Int): Unit = { + spark.sql(s"CREATE TABLE $tableName (col INT) USING JSON") + + val uniques = Seq.range(1, cardinality + 1) + val values = + Seq + .fill(numRows)(uniques(Random.nextInt(cardinality))) + .map(v => s"($v)") + .mkString(", ") + + for (_ <- 1 to numFiles) { + spark.sql(s""" + | INSERT INTO $tableName + | SELECT /*+ COALESCE(1) */ * + | FROM VALUES $values + |""".stripMargin) + } + } +} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala new file mode 100644 index 000000000..5a891a8ca --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.benchmark + +import org.apache.http.HttpHost +import org.opensearch.client.{RestClient, RestHighLevelClient} +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.testcontainers.OpenSearchContainer + +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.flint.config.FlintSparkConf._ + +trait FlintSparkBenchmark extends SqlBasedBenchmark { + + protected lazy val flint: FlintSpark = new FlintSpark(spark) + + protected lazy val container = new OpenSearchContainer() + + protected lazy val openSearchPort: Int = container.port() + + protected lazy val openSearchHost: String = container.getHost + + protected lazy val openSearchClient = new RestHighLevelClient( + RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) + + def beforeAll(): Unit = { + container.start() + + spark.conf.set(HOST_ENDPOINT.key, openSearchHost) + spark.conf.set(HOST_PORT.key, openSearchPort) + spark.conf.set(REFRESH_POLICY.key, "true") + spark.conf.set(CHECKPOINT_MANDATORY.key, "false") + } + + override def afterAll(): Unit = { + container.close() + super.afterAll() + } +} From 4dd5f6a316b0f6f368c9417352acca625226ca9d Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 6 Mar 2024 08:47:26 -0800 Subject: [PATCH 2/9] Test with variant cardinality Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 919835113..ff3dcc0a9 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -13,6 +13,9 @@ import org.apache.spark.benchmark.Benchmark object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { + private val N = 1000 + private val numFiles = 100 + private val noIndexTableNamePrefix = "spark_catalog.default.no_index_test" private val valueSetTableNamePrefix = "spark_catalog.default.value_set_test" private val bloomFilterTableNamePrefix = "spark_catalog.default.bloom_filter_test" @@ -33,20 +36,19 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { beforeAll() runBenchmark("Skipping Index Write") { - runWriteBenchmark(1000, 16) - runWriteBenchmark(1000, 64) - runWriteBenchmark(1000, 512) + runWriteBenchmark(16) + runWriteBenchmark(64) + runWriteBenchmark(512) } runBenchmark("Skipping Index Read") { - runReadBenchmark(1000, 16) - runReadBenchmark(1000, 64) - runReadBenchmark(1000, 512) + runReadBenchmark(16) + runReadBenchmark(64) + runReadBenchmark(512) } } - private def runWriteBenchmark(N: Int, cardinality: Int): Unit = { - val numFiles = 10 + private def runWriteBenchmark(cardinality: Int): Unit = { val benchmark = new Benchmark( s"Write $N rows with cardinality $cardinality in $numFiles files", numFiles * N, @@ -78,8 +80,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { benchmark.run() } - private def runReadBenchmark(N: Int, cardinality: Int): Unit = { - val numFiles = 10 + private def runReadBenchmark(cardinality: Int): Unit = { val benchmark = new Benchmark( s"Read $N rows with cardinality $cardinality in $numFiles files", numFiles * N, From 8c71e3e20ffb6093c5b1ada7364bef1405bde998 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 13:51:18 -0700 Subject: [PATCH 3/9] Add micro benchmark Signed-off-by: Chen Dai --- .../skipping/bloomfilter/BloomFilterAgg.scala | 13 ++ .../FlintSkippingIndexMicroBenchmark.scala | 206 ++++++++++++++++++ .../sql/benchmark/FlintSparkBenchmark.scala | 12 +- 3 files changed, 226 insertions(+), 5 deletions(-) create mode 100644 integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 4fb79d67d..6dcd2f15f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -7,12 +7,16 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.field.bloomfilter.{BloomFilter, BloomFilterFactory} import org.opensearch.flint.core.field.bloomfilter.adaptive.AdaptiveBloomFilter +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} +import org.apache.spark.sql.functions.{col, xxhash64} import org.apache.spark.sql.types.{BinaryType, DataType} /** @@ -107,3 +111,12 @@ case class BloomFilterAgg( override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newOffset) } + +object BloomFilterAgg { + + def bloom_filter_agg(colName: String, params: Map[String, String]): Column = { + new Column( + new BloomFilterAgg(xxhash64(col(colName)).expr, BloomFilterFactory.of(params.asJava)) + .toAggregateExpression()) + } +} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala new file mode 100644 index 000000000..6e7182bfb --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala @@ -0,0 +1,206 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.benchmark + +import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy +import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy +import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.Column +import org.apache.spark.sql.SaveMode.Overwrite +import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.functions.rand + +/** + * Flint skipping index microbenchmark that focus on skipping data structure read and write + * performance with OpenSearch based on Spark benchmark framework. + * + * To run this benchmark: + * {{{ + * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexMicroBenchmark" + * Results will be written to "benchmarks/FlintSkippingIndexMicroBenchmark--results.txt". + * }}} + */ +object FlintSkippingIndexMicroBenchmark extends FlintSparkBenchmark { + + private val N = 1000000 + private val numFiles = 100 + + private val testColName = "value" + private val testColType = "Int" + + private val noIndexTableNamePrefix = "spark_catalog.default.no_index_test" + private val valueSetTableNamePrefix = "spark_catalog.default.value_set_test" + private val bloomFilterTableNamePrefix = "spark_catalog.default.bloom_filter_test" + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + beforeAll() + + // output = Some(System.err) + + runBenchmark("Skipping Index Write") { + runWriteBenchmark(64) + runWriteBenchmark(512) + runWriteBenchmark(65536) + } + + /* + runBenchmark("Skipping Index Read") { + runReadBenchmark(16) + runReadBenchmark(64) + runReadBenchmark(512) + } + */ + } + + private def runWriteBenchmark(cardinality: Int): Unit = { + benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") + .addCase("Partition Write") { _ => + buildSkippingIndex( + PartitionSkippingStrategy(columnName = testColName, columnType = testColType), + 1 // partition column value should be the same in a single file + ) + } + .addCase("MinMax Write") { _ => + buildSkippingIndex( + MinMaxSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("ValueSet Write") { _ => + buildSkippingIndex( + ValueSetSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("ValueSet Write (Unlimited Size)") { _ => + buildSkippingIndex( + ValueSetSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .addCase("BloomFilter Write") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .addCase("BloomFilter Write (Optimal NDV)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .addCase("Adaptive BloomFilter Write") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } + + private def runReadBenchmark(cardinality: Int): Unit = { + val benchmark = new Benchmark( + s"Read $N rows with cardinality $cardinality in $numFiles files", + numFiles * N, + output = output) + + val uniqueValue = cardinality + 10 + val noIndexTableName = s"${noIndexTableNamePrefix}_${N}_$cardinality" + benchmark.addCase("No Index Read") { _ => + // spark.sql(s"SELECT * FROM $noIndexTableName WHERE col = $uniqueValue") + } + + val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" + benchmark.addCase("ValueSet Read") { _ => + // spark.sql(s"SELECT * FROM $valueSetTableName WHERE col = $uniqueValue") + } + + val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" + benchmark.addCase("BloomFilter Read") { _ => + // spark.sql(s"SELECT * FROM $bloomFilterTableName WHERE col = $uniqueValue") + } + benchmark.run() + } + + private def benchmark(name: String): BenchmarkBuilder = { + new BenchmarkBuilder(new Benchmark(name, N, output = output)) + } + + private class BenchmarkBuilder(benchmark: Benchmark) { + + def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { + benchmark.addCase(name)(f) + this + } + + def run(): Unit = { + benchmark.run() + } + } + + private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val indexName = getTestIndexName(indexCol, cardinality) + val namedAggCols = getNamedAggColumn(indexCol) + + // Generate N random numbers of cardinality and build single skipping index + // data structure without grouping by + spark + .range(N) + .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) + .agg(namedAggCols.head, namedAggCols.tail: _*) + .write + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .mode(Overwrite) + .save(indexName) + } + + private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { + val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") + s"${indexCol.kind}_${params}_$cardinality" + } + + private def getNamedAggColumn(indexCol: FlintSparkSkippingStrategy): Seq[Column] = { + val outputNames = indexCol.outputSchema().keys + val aggFuncs = indexCol.getAggregators + + // Wrap aggregate function with output column name + (outputNames, aggFuncs).zipped.map { case (name, aggFunc) => + new Column(aggFunc.as(name)) + }.toSeq + } +} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala index 5a891a8ca..e52706628 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala @@ -26,13 +26,15 @@ trait FlintSparkBenchmark extends SqlBasedBenchmark { protected lazy val openSearchClient = new RestHighLevelClient( RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) + protected lazy val openSearchOptions: Map[String, String] = + Map( + s"${HOST_ENDPOINT.optionKey}" -> openSearchHost, + s"${HOST_PORT.optionKey}" -> s"$openSearchPort", + s"${REFRESH_POLICY.optionKey}" -> "wait_for", + s"${IGNORE_DOC_ID_COLUMN.optionKey}" -> "false") + def beforeAll(): Unit = { container.start() - - spark.conf.set(HOST_ENDPOINT.key, openSearchHost) - spark.conf.set(HOST_PORT.key, openSearchPort) - spark.conf.set(REFRESH_POLICY.key, "true") - spark.conf.set(CHECKPOINT_MANDATORY.key, "false") } override def afterAll(): Unit = { From 891e345c2a02c34c51873c99370abeef80d28c33 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 14:06:54 -0700 Subject: [PATCH 4/9] Remove old benchmark test Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 235 +++++++++++------- .../FlintSkippingIndexMicroBenchmark.scala | 206 --------------- .../sql/benchmark/FlintSparkBenchmark.scala | 7 +- 3 files changed, 147 insertions(+), 301 deletions(-) delete mode 100644 integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index ff3dcc0a9..0abc1916d 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -5,128 +5,175 @@ package org.apache.spark.sql.benchmark -import scala.util.Random - -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy +import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy +import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.apache.spark.benchmark.Benchmark - +import org.apache.spark.sql.Column +import org.apache.spark.sql.SaveMode.Overwrite +import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.functions.rand + +/** + * Flint skipping index benchmark that focus on skipping data structure read and write performance + * with OpenSearch based on Spark benchmark framework. + * + * To run this benchmark: + * {{{ + * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark" + * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". + * }}} + */ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { - private val N = 1000 - private val numFiles = 100 - - private val noIndexTableNamePrefix = "spark_catalog.default.no_index_test" - private val valueSetTableNamePrefix = "spark_catalog.default.value_set_test" - private val bloomFilterTableNamePrefix = "spark_catalog.default.bloom_filter_test" + /** How many rows generated in test data */ + private val N = 1000000 - override def afterAll(): Unit = { - val prefixes = - Array(noIndexTableNamePrefix, valueSetTableNamePrefix, bloomFilterTableNamePrefix) - for (prefix <- prefixes) { - val tables = spark.sql(s"SHOW TABLES IN spark_catalog.default LIKE '$prefix%'").collect() - for (table <- tables) { - spark.sql(s"DROP TABLE $table") - } - } - super.afterAll() - } + /** Test column name and type */ + private val testColName = "value" + private val testColType = "Int" - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - beforeAll() + override def runBenchmarkSuite(args: Array[String]): Unit = { + super.runBenchmarkSuite(args) runBenchmark("Skipping Index Write") { - runWriteBenchmark(16) runWriteBenchmark(64) runWriteBenchmark(512) + runWriteBenchmark(65536) } + /* runBenchmark("Skipping Index Read") { runReadBenchmark(16) runReadBenchmark(64) runReadBenchmark(512) } + */ } private def runWriteBenchmark(cardinality: Int): Unit = { - val benchmark = new Benchmark( - s"Write $N rows with cardinality $cardinality in $numFiles files", - numFiles * N, - output = output) - - // ValueSet write - val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" - createTestTable(valueSetTableName, numFiles, N, cardinality) - flint - .skippingIndex() - .onTable(valueSetTableName) - .addValueSet("col") - .create() - benchmark.addCase("ValueSet Write") { _ => - flint.refreshIndex(getSkippingIndexName(valueSetTableName)) - } + benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") + .addCase("Partition Write") { _ => + buildSkippingIndex( + PartitionSkippingStrategy(columnName = testColName, columnType = testColType), + 1 // partition column value should be the same in a single file + ) + } + .addCase("MinMax Write") { _ => + buildSkippingIndex( + MinMaxSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("ValueSet Write") { _ => + buildSkippingIndex( + ValueSetSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("ValueSet Write (Unlimited Size)") { _ => + buildSkippingIndex( + ValueSetSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .addCase("BloomFilter Write") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .addCase("BloomFilter Write (Optimal NDV)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .addCase("Adaptive BloomFilter Write") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy(columnName = testColName, columnType = testColType), + cardinality) + } + .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => + buildSkippingIndex( + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } - // BloomFilter write - val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" - createTestTable(bloomFilterTableName, numFiles, N, cardinality) - flint - .skippingIndex() - .onTable(bloomFilterTableName) - .addValueSet("col") - .create() - benchmark.addCase("BloomFilter Write") { _ => - flint.refreshIndex(getSkippingIndexName(bloomFilterTableName)) - } - benchmark.run() + private def runReadBenchmark(cardinality: Int): Unit = {} + + private def benchmark(name: String): BenchmarkBuilder = { + new BenchmarkBuilder(new Benchmark(name, N, output = output)) } - private def runReadBenchmark(cardinality: Int): Unit = { - val benchmark = new Benchmark( - s"Read $N rows with cardinality $cardinality in $numFiles files", - numFiles * N, - output = output) - - val uniqueValue = cardinality + 10 - val noIndexTableName = s"${noIndexTableNamePrefix}_${N}_$cardinality" - createTestTable(noIndexTableName, numFiles, N, cardinality) - benchmark.addCase("No Index Read") { _ => - spark.sql(s"SELECT * FROM $noIndexTableName WHERE col = $uniqueValue") - } + private class BenchmarkBuilder(benchmark: Benchmark) { - val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" - // spark.sql(s"INSERT INTO $valueSetTableName VALUES ($uniqueValue)") - benchmark.addCase("ValueSet Read") { _ => - spark.sql(s"SELECT * FROM $valueSetTableName WHERE col = $uniqueValue") + def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { + benchmark.addCase(name)(f) + this } - val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" - // spark.sql(s"INSERT INTO $valueSetTableName VALUES ($uniqueValue)") - benchmark.addCase("BloomFilter Read") { _ => - spark.sql(s"SELECT * FROM $bloomFilterTableName WHERE col = $uniqueValue") + def run(): Unit = { + benchmark.run() } - benchmark.run() } - private def createTestTable( - tableName: String, - numFiles: Int, - numRows: Int, - cardinality: Int): Unit = { - spark.sql(s"CREATE TABLE $tableName (col INT) USING JSON") - - val uniques = Seq.range(1, cardinality + 1) - val values = - Seq - .fill(numRows)(uniques(Random.nextInt(cardinality))) - .map(v => s"($v)") - .mkString(", ") - - for (_ <- 1 to numFiles) { - spark.sql(s""" - | INSERT INTO $tableName - | SELECT /*+ COALESCE(1) */ * - | FROM VALUES $values - |""".stripMargin) - } + private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val indexName = getTestIndexName(indexCol, cardinality) + val namedAggCols = getNamedAggColumn(indexCol) + + // Generate N random numbers of cardinality and build single skipping index + // data structure without grouping by + spark + .range(N) + .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) + .agg(namedAggCols.head, namedAggCols.tail: _*) + .write + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .mode(Overwrite) + .save(indexName) + } + + private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { + val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") + s"${indexCol.kind}_${params}_$cardinality" + } + + private def getNamedAggColumn(indexCol: FlintSparkSkippingStrategy): Seq[Column] = { + val outputNames = indexCol.outputSchema().keys + val aggFuncs = indexCol.getAggregators + + // Wrap aggregate function with output column name + (outputNames, aggFuncs).zipped.map { case (name, aggFunc) => + new Column(aggFunc.as(name)) + }.toSeq } } diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala deleted file mode 100644 index 6e7182bfb..000000000 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexMicroBenchmark.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.sql.benchmark - -import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy -import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy -import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY - -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.Column -import org.apache.spark.sql.SaveMode.Overwrite -import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE -import org.apache.spark.sql.functions.rand - -/** - * Flint skipping index microbenchmark that focus on skipping data structure read and write - * performance with OpenSearch based on Spark benchmark framework. - * - * To run this benchmark: - * {{{ - * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexMicroBenchmark" - * Results will be written to "benchmarks/FlintSkippingIndexMicroBenchmark--results.txt". - * }}} - */ -object FlintSkippingIndexMicroBenchmark extends FlintSparkBenchmark { - - private val N = 1000000 - private val numFiles = 100 - - private val testColName = "value" - private val testColType = "Int" - - private val noIndexTableNamePrefix = "spark_catalog.default.no_index_test" - private val valueSetTableNamePrefix = "spark_catalog.default.value_set_test" - private val bloomFilterTableNamePrefix = "spark_catalog.default.bloom_filter_test" - - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - beforeAll() - - // output = Some(System.err) - - runBenchmark("Skipping Index Write") { - runWriteBenchmark(64) - runWriteBenchmark(512) - runWriteBenchmark(65536) - } - - /* - runBenchmark("Skipping Index Read") { - runReadBenchmark(16) - runReadBenchmark(64) - runReadBenchmark(512) - } - */ - } - - private def runWriteBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") - .addCase("Partition Write") { _ => - buildSkippingIndex( - PartitionSkippingStrategy(columnName = testColName, columnType = testColType), - 1 // partition column value should be the same in a single file - ) - } - .addCase("MinMax Write") { _ => - buildSkippingIndex( - MinMaxSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) - } - .addCase("ValueSet Write") { _ => - buildSkippingIndex( - ValueSetSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) - } - .addCase("ValueSet Write (Unlimited Size)") { _ => - buildSkippingIndex( - ValueSetSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), - cardinality) - } - .addCase("BloomFilter Write") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map( - BLOOM_FILTER_ADAPTIVE_KEY -> "false", - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), - cardinality) - } - .addCase("BloomFilter Write (Optimal NDV)") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map( - BLOOM_FILTER_ADAPTIVE_KEY -> "false", - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), - cardinality) - } - .addCase("Adaptive BloomFilter Write") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) - } - .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), - cardinality) - } - .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), - cardinality) - } - .run() - } - - private def runReadBenchmark(cardinality: Int): Unit = { - val benchmark = new Benchmark( - s"Read $N rows with cardinality $cardinality in $numFiles files", - numFiles * N, - output = output) - - val uniqueValue = cardinality + 10 - val noIndexTableName = s"${noIndexTableNamePrefix}_${N}_$cardinality" - benchmark.addCase("No Index Read") { _ => - // spark.sql(s"SELECT * FROM $noIndexTableName WHERE col = $uniqueValue") - } - - val valueSetTableName = s"${valueSetTableNamePrefix}_${N}_$cardinality" - benchmark.addCase("ValueSet Read") { _ => - // spark.sql(s"SELECT * FROM $valueSetTableName WHERE col = $uniqueValue") - } - - val bloomFilterTableName = s"${bloomFilterTableNamePrefix}_${N}_$cardinality" - benchmark.addCase("BloomFilter Read") { _ => - // spark.sql(s"SELECT * FROM $bloomFilterTableName WHERE col = $uniqueValue") - } - benchmark.run() - } - - private def benchmark(name: String): BenchmarkBuilder = { - new BenchmarkBuilder(new Benchmark(name, N, output = output)) - } - - private class BenchmarkBuilder(benchmark: Benchmark) { - - def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { - benchmark.addCase(name)(f) - this - } - - def run(): Unit = { - benchmark.run() - } - } - - private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { - val indexName = getTestIndexName(indexCol, cardinality) - val namedAggCols = getNamedAggColumn(indexCol) - - // Generate N random numbers of cardinality and build single skipping index - // data structure without grouping by - spark - .range(N) - .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) - .agg(namedAggCols.head, namedAggCols.tail: _*) - .write - .format(FLINT_DATASOURCE) - .options(openSearchOptions) - .mode(Overwrite) - .save(indexName) - } - - private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { - val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") - s"${indexCol.kind}_${params}_$cardinality" - } - - private def getNamedAggColumn(indexCol: FlintSparkSkippingStrategy): Seq[Column] = { - val outputNames = indexCol.outputSchema().keys - val aggFuncs = indexCol.getAggregators - - // Wrap aggregate function with output column name - (outputNames, aggFuncs).zipped.map { case (name, aggFunc) => - new Column(aggFunc.as(name)) - }.toSeq - } -} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala index e52706628..ae1602805 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala @@ -13,6 +13,11 @@ import org.opensearch.testcontainers.OpenSearchContainer import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark import org.apache.spark.sql.flint.config.FlintSparkConf._ +/** + * Flint benchmark base class for benchmarking with Spark and OpenSearch. + * [[org.opensearch.flint.OpenSearchSuite]] doesn't work here because Spark's + * [[SqlBasedBenchmark]] is not a Scala test suite. + */ trait FlintSparkBenchmark extends SqlBasedBenchmark { protected lazy val flint: FlintSpark = new FlintSpark(spark) @@ -33,7 +38,7 @@ trait FlintSparkBenchmark extends SqlBasedBenchmark { s"${REFRESH_POLICY.optionKey}" -> "wait_for", s"${IGNORE_DOC_ID_COLUMN.optionKey}" -> "false") - def beforeAll(): Unit = { + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { container.start() } From 819dd28ca510b9d423e161d05cc4b3b7ced4f97c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 14:18:48 -0700 Subject: [PATCH 5/9] Refactor test case Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 74 ++++++++++--------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 0abc1916d..8ddb8aa51 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -7,6 +7,7 @@ package org.apache.spark.sql.benchmark import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._ import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy @@ -60,68 +61,49 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { private def runWriteBenchmark(cardinality: Int): Unit = { benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") .addCase("Partition Write") { _ => - buildSkippingIndex( - PartitionSkippingStrategy(columnName = testColName, columnType = testColType), - 1 // partition column value should be the same in a single file - ) + // Partitioned column cardinality must be 1 (all values are the same in a single file0 + buildSkippingIndex(strategy(PARTITION), 1) } .addCase("MinMax Write") { _ => - buildSkippingIndex( - MinMaxSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) + buildSkippingIndex(strategy(MIN_MAX), cardinality) } - .addCase("ValueSet Write") { _ => - buildSkippingIndex( - ValueSetSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) + .addCase("ValueSet Write (Default Size 100") { _ => + buildSkippingIndex(strategy(VALUE_SET), cardinality) } .addCase("ValueSet Write (Unlimited Size)") { _ => buildSkippingIndex( - ValueSetSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), cardinality) } - .addCase("BloomFilter Write") { _ => + .addCase("BloomFilter Write (1M NDV)") { _ => buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map( + strategy( + BLOOM_FILTER, + Map( BLOOM_FILTER_ADAPTIVE_KEY -> "false", CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), cardinality) } .addCase("BloomFilter Write (Optimal NDV)") { _ => buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map( + strategy( + BLOOM_FILTER, + Map( BLOOM_FILTER_ADAPTIVE_KEY -> "false", CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Write") { _ => - buildSkippingIndex( - BloomFilterSkippingStrategy(columnName = testColName, columnType = testColType), - cardinality) + .addCase("Adaptive BloomFilter Write (Default, 10 Candidates)") { _ => + buildSkippingIndex(strategy(BLOOM_FILTER), cardinality) } .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), cardinality) } .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => buildSkippingIndex( - BloomFilterSkippingStrategy( - columnName = testColName, - columnType = testColType, - params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), cardinality) } .run() @@ -145,6 +127,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } } + private def strategy( + kind: SkippingKind, + params: Map[String, String] = Map.empty): FlintSparkSkippingStrategy = { + kind match { + case PARTITION => + PartitionSkippingStrategy(columnName = testColName, columnType = testColType) + case MIN_MAX => MinMaxSkippingStrategy(columnName = testColName, columnType = testColType) + case VALUE_SET => + ValueSetSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = params) + case BLOOM_FILTER => + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = params) + } + } + private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { val indexName = getTestIndexName(indexCol, cardinality) val namedAggCols = getNamedAggColumn(indexCol) From 55ab74631d8f75c1114af821d209f080b9b131cf Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 14:52:46 -0700 Subject: [PATCH 6/9] Add skipping index read benchmark Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 101 ++++++++++++++---- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 8ddb8aa51..fd6371f44 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE -import org.apache.spark.sql.functions.rand +import org.apache.spark.sql.functions.{expr, rand} /** * Flint skipping index benchmark that focus on skipping data structure read and write performance @@ -45,38 +45,35 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { runBenchmark("Skipping Index Write") { runWriteBenchmark(64) - runWriteBenchmark(512) - runWriteBenchmark(65536) + // runWriteBenchmark(512) + // runWriteBenchmark(65536) } - - /* runBenchmark("Skipping Index Read") { runReadBenchmark(16) - runReadBenchmark(64) - runReadBenchmark(512) + // runReadBenchmark(64) + // runReadBenchmark(512) } - */ } private def runWriteBenchmark(cardinality: Int): Unit = { benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") .addCase("Partition Write") { _ => // Partitioned column cardinality must be 1 (all values are the same in a single file0 - buildSkippingIndex(strategy(PARTITION), 1) + writeSkippingIndex(strategy(PARTITION), 1) } .addCase("MinMax Write") { _ => - buildSkippingIndex(strategy(MIN_MAX), cardinality) + writeSkippingIndex(strategy(MIN_MAX), cardinality) } - .addCase("ValueSet Write (Default Size 100") { _ => - buildSkippingIndex(strategy(VALUE_SET), cardinality) + .addCase("ValueSet Write (Default Size 100)") { _ => + writeSkippingIndex(strategy(VALUE_SET), cardinality) } .addCase("ValueSet Write (Unlimited Size)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), cardinality) } .addCase("BloomFilter Write (1M NDV)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy( BLOOM_FILTER, Map( @@ -85,7 +82,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { cardinality) } .addCase("BloomFilter Write (Optimal NDV)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy( BLOOM_FILTER, Map( @@ -93,23 +90,71 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Write (Default, 10 Candidates)") { _ => - buildSkippingIndex(strategy(BLOOM_FILTER), cardinality) + .addCase("Adaptive BloomFilter Write (Default 10 Candidates)") { _ => + writeSkippingIndex(strategy(BLOOM_FILTER), cardinality) } .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), cardinality) } .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), cardinality) } .run() } - private def runReadBenchmark(cardinality: Int): Unit = {} + private def runReadBenchmark(cardinality: Int): Unit = { + benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality") + .addCase("Partition Read") { _ => + readSkippingIndex(strategy(PARTITION), cardinality) + } + .addCase("MinMax Read") { _ => + readSkippingIndex(strategy(MIN_MAX), cardinality) + } + .addCase("ValueSet Read (Default Size 100)") { _ => + readSkippingIndex(strategy(VALUE_SET), cardinality) + } + .addCase("ValueSet Read (Unlimited Size)") { _ => + readSkippingIndex( + strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .addCase("BloomFilter Read (1M NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .addCase("BloomFilter Read (Optimal NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ => + readSkippingIndex(strategy(BLOOM_FILTER), cardinality) + } + .addCase("Adaptive BloomFilter Read (5 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .addCase("Adaptive BloomFilter Read (15 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } private def benchmark(name: String): BenchmarkBuilder = { new BenchmarkBuilder(new Benchmark(name, N, output = output)) @@ -147,8 +192,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } } - private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { - val indexName = getTestIndexName(indexCol, cardinality) + private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val schema = + indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ") + spark.read + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .schema(schema) + .load(getTestIndexName(indexCol, cardinality)) + .where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get) + } + + private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { val namedAggCols = getNamedAggColumn(indexCol) // Generate N random numbers of cardinality and build single skipping index @@ -161,7 +216,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .format(FLINT_DATASOURCE) .options(openSearchOptions) .mode(Overwrite) - .save(indexName) + .save(getTestIndexName(indexCol, cardinality)) } private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { From 61c4dc4c3f254b56755d154970433cc02f18b443 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 15:40:52 -0700 Subject: [PATCH 7/9] Add more comment and Javadoc Signed-off-by: Chen Dai --- .../skipping/bloomfilter/BloomFilterAgg.scala | 13 ----- .../FlintSkippingIndexBenchmark.scala | 50 +++++++++++++------ 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 6dcd2f15f..4fb79d67d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -7,16 +7,12 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.opensearch.flint.core.field.bloomfilter.{BloomFilter, BloomFilterFactory} import org.opensearch.flint.core.field.bloomfilter.adaptive.AdaptiveBloomFilter -import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} -import org.apache.spark.sql.functions.{col, xxhash64} import org.apache.spark.sql.types.{BinaryType, DataType} /** @@ -111,12 +107,3 @@ case class BloomFilterAgg( override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newOffset) } - -object BloomFilterAgg { - - def bloom_filter_agg(colName: String, params: Map[String, String]): Column = { - new Column( - new BloomFilterAgg(xxhash64(col(colName)).expr, BloomFilterFactory.of(params.asJava)) - .toAggregateExpression()) - } -} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index fd6371f44..84fb1c439 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -25,11 +25,24 @@ import org.apache.spark.sql.functions.{expr, rand} * Flint skipping index benchmark that focus on skipping data structure read and write performance * with OpenSearch based on Spark benchmark framework. * + * Test cases include reading and writing the following skipping data structures: + * {{{ + * 1. Partition + * 2. MinMax + * 3. ValueSet (default size 100) + * 4. ValueSet (unlimited size) + * 5. BloomFilter (classic, 1M NDV) + * 6. BloomFilter (classic, optimal NDV = cardinality) + * 7. BloomFilter (adaptive, default 10 candidates) + * 8. BloomFilter (adaptive, 5 candidates) + * 9. BloomFilter (adaptive, 15 candidates) + * }}} + * * To run this benchmark: * {{{ * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark" - * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". * }}} + * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". */ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { @@ -45,18 +58,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { runBenchmark("Skipping Index Write") { runWriteBenchmark(64) - // runWriteBenchmark(512) - // runWriteBenchmark(65536) + runWriteBenchmark(512) + runWriteBenchmark(65536) } runBenchmark("Skipping Index Read") { runReadBenchmark(16) - // runReadBenchmark(64) - // runReadBenchmark(512) + runReadBenchmark(512) + runReadBenchmark(65536) } } private def runWriteBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") + benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality", N) .addCase("Partition Write") { _ => // Partitioned column cardinality must be 1 (all values are the same in a single file0 writeSkippingIndex(strategy(PARTITION), 1) @@ -107,7 +120,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } private def runReadBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality") + benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality", 1) .addCase("Partition Read") { _ => readSkippingIndex(strategy(PARTITION), cardinality) } @@ -140,7 +153,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ => + .addCase("Adaptive BloomFilter Read (Default 10 Candidates)") { _ => readSkippingIndex(strategy(BLOOM_FILTER), cardinality) } .addCase("Adaptive BloomFilter Read (5 Candidates)") { _ => @@ -156,10 +169,11 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .run() } - private def benchmark(name: String): BenchmarkBuilder = { - new BenchmarkBuilder(new Benchmark(name, N, output = output)) + private def benchmark(name: String, numRows: Int): BenchmarkBuilder = { + new BenchmarkBuilder(new Benchmark(name, numRows, output = output)) } + /** Benchmark builder in fluent API style */ private class BenchmarkBuilder(benchmark: Benchmark) { def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { @@ -195,19 +209,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { val schema = indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ") + /* + * Rewrite the query on test column and run it against skipping index created in previous + * write benchmark. + */ + val indexQuery = + indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get spark.read .format(FLINT_DATASOURCE) .options(openSearchOptions) .schema(schema) .load(getTestIndexName(indexCol, cardinality)) - .where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get) + .where(indexQuery) } private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + /* + * Generate N random numbers with the given cardinality and build single skipping index + * data structure without group by. + */ val namedAggCols = getNamedAggColumn(indexCol) - - // Generate N random numbers of cardinality and build single skipping index - // data structure without grouping by spark .range(N) .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) @@ -220,6 +241,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { + // Generate unique name as skipping index name in OpenSearch val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") s"${indexCol.kind}_${params}_$cardinality" } From 282332440a1534d865d6b31e61cdec86818b4685 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Mar 2024 12:34:07 -0700 Subject: [PATCH 8/9] Refactor test code and add test results Signed-off-by: Chen Dai --- docs/benchmark-skipping-index.txt | 92 +++++++++++ .../FlintSkippingIndexBenchmark.scala | 150 +++++++++++++----- 2 files changed, 198 insertions(+), 44 deletions(-) create mode 100644 docs/benchmark-skipping-index.txt diff --git a/docs/benchmark-skipping-index.txt b/docs/benchmark-skipping-index.txt new file mode 100644 index 000000000..312858c72 --- /dev/null +++ b/docs/benchmark-skipping-index.txt @@ -0,0 +1,92 @@ +================================================================================================ +Skipping Index Write +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 7928 7928 0 0.1 7928.2 1.0X +MinMax Write 1446 1446 0 0.7 1446.0 5.5X +ValueSet Write (Default Size 100) 1698 1698 0 0.6 1698.4 4.7X +ValueSet Write (Unlimited Size) 1456 1456 0 0.7 1455.6 5.4X +BloomFilter Write (1M NDV) 1105 1105 0 0.9 1105.0 7.2X +BloomFilter Write (Optimal NDV) 1063 1063 0 0.9 1063.0 7.5X +Adaptive BloomFilter Write (Default 10 Candidates) 1056 1056 0 0.9 1056.5 7.5X +Adaptive BloomFilter Write (5 Candidates) 1058 1058 0 0.9 1058.3 7.5X +Adaptive BloomFilter Write (15 Candidates) 2084 2084 0 0.5 2084.4 3.8X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 1300 1300 0 0.8 1300.2 1.0X +MinMax Write 1261 1261 0 0.8 1261.3 1.0X +ValueSet Write (Default Size 100) 1379 1379 0 0.7 1378.6 0.9X +ValueSet Write (Unlimited Size) 1369 1369 0 0.7 1368.9 0.9X +BloomFilter Write (1M NDV) 1054 1054 0 0.9 1054.4 1.2X +BloomFilter Write (Optimal NDV) 1056 1056 0 0.9 1055.9 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1051 1051 0 1.0 1051.1 1.2X +Adaptive BloomFilter Write (5 Candidates) 1063 1063 0 0.9 1062.7 1.2X +Adaptive BloomFilter Write (15 Candidates) 2051 2051 0 0.5 2051.0 0.6X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 1233 1233 0 0.8 1233.0 1.0X +MinMax Write 1245 1245 0 0.8 1244.9 1.0X +ValueSet Write (Default Size 100) 1520 1520 0 0.7 1519.7 0.8X +ValueSet Write (Unlimited Size) 1596 1596 0 0.6 1595.8 0.8X +BloomFilter Write (1M NDV) 1065 1065 0 0.9 1065.3 1.2X +BloomFilter Write (Optimal NDV) 1057 1057 0 0.9 1057.0 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1062 1062 0 0.9 1061.8 1.2X +Adaptive BloomFilter Write (5 Candidates) 1052 1052 0 1.0 1051.9 1.2X +Adaptive BloomFilter Write (15 Candidates) 2047 2047 0 0.5 2047.1 0.6X + + +================================================================================================ +Skipping Index Read +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------ +Partition Read 59 79 20 0.0 58833150.0 1.0X +MinMax Read 56 69 12 0.0 55926430.0 1.1X +ValueSet Read (Default Size 100) 44 57 8 0.0 44342915.0 1.3X +ValueSet Read (Unlimited Size) 45 56 8 0.0 44744854.0 1.3X +BloomFilter Read (1M NDV) 2494 2544 68 0.0 2493617832.0 0.0X +BloomFilter Read (Optimal NDV) 2388 2615 450 0.0 2387523635.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2349 2389 47 0.0 2349330716.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2366 2394 28 0.0 2366240134.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2347 2407 59 0.0 2347162371.0 0.0X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------- +Partition Read 34 41 6 0.0 34280024.0 1.0X +MinMax Read 39 50 9 0.0 39144570.0 0.9X +ValueSet Read (Default Size 100) 41 54 13 0.0 40504625.0 0.8X +ValueSet Read (Unlimited Size) 37 46 8 0.0 36629686.0 0.9X +BloomFilter Read (1M NDV) 2396 2471 52 0.0 2396434455.0 0.0X +BloomFilter Read (Optimal NDV) 2406 2442 32 0.0 2406111582.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2419 2446 26 0.0 2419392722.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2421 2458 33 0.0 2420675769.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2415 2424 9 0.0 2415005443.0 0.0X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------- +Partition Read 29 36 6 0.0 29499019.0 1.0X +MinMax Read 33 43 9 0.0 33272156.0 0.9X +ValueSet Read (Default Size 100) 32 39 8 0.0 31548122.0 0.9X +ValueSet Read (Unlimited Size) 46 55 8 0.0 45973861.0 0.6X +BloomFilter Read (1M NDV) 2521 2554 26 0.0 2520994171.0 0.0X +BloomFilter Read (Optimal NDV) 2359 2424 46 0.0 2358583966.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2358 2413 45 0.0 2358475850.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2483 2682 263 0.0 2482781961.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2401 2442 34 0.0 2400778171.0 0.0X \ No newline at end of file diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 84fb1c439..2660f9065 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -5,6 +5,14 @@ package org.apache.spark.sql.benchmark +import java.util.Locale + +import scala.concurrent.duration.DurationInt + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.CreateIndexRequest +import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._ @@ -38,6 +46,13 @@ import org.apache.spark.sql.functions.{expr, rand} * 9. BloomFilter (adaptive, 15 candidates) * }}} * + * Test parameters: + * {{{ + * 1. N = 1M rows of test data generated + * 2. M = 1 OpenSearch doc written as skipping index + * 3. cardinality = {64, 2048, 65536} distinct values in N rows + * }}} + * * To run this benchmark: * {{{ * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark" @@ -49,43 +64,57 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { /** How many rows generated in test data */ private val N = 1000000 + /** How many OpenSearch docs created */ + private val M = 1 + + /** Cardinalities of test data */ + private val CARDINALITIES = Seq(64, 2048, 65536) + + /** How many runs for each test case */ + private val WRITE_TEST_NUM_ITERATIONS = 1 + private val READ_TEST_NUM_ITERATIONS = 5 + + /** Test index name prefix */ + private val TEST_INDEX_PREFIX = "flint_benchmark" + /** Test column name and type */ private val testColName = "value" private val testColType = "Int" - override def runBenchmarkSuite(args: Array[String]): Unit = { - super.runBenchmarkSuite(args) + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + super.runBenchmarkSuite(mainArgs) runBenchmark("Skipping Index Write") { - runWriteBenchmark(64) - runWriteBenchmark(512) - runWriteBenchmark(65536) + CARDINALITIES.foreach(runWriteBenchmark) } runBenchmark("Skipping Index Read") { - runReadBenchmark(16) - runReadBenchmark(512) - runReadBenchmark(65536) + CARDINALITIES.foreach(runReadBenchmark) } } private def runWriteBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality", N) - .addCase("Partition Write") { _ => - // Partitioned column cardinality must be 1 (all values are the same in a single file0 - writeSkippingIndex(strategy(PARTITION), 1) + new Benchmark( + name = s"Skipping Index Write $N Rows with Cardinality $cardinality", + valuesPerIteration = N, + minNumIters = WRITE_TEST_NUM_ITERATIONS, + warmupTime = (-1).seconds, // ensure no warm up, otherwise test doc duplicate + minTime = (-1).seconds, // ensure run only once, otherwise test doc duplicate + output = output) + .test("Partition Write") { _ => + writeSkippingIndex(strategy(PARTITION), cardinality) } - .addCase("MinMax Write") { _ => + .test("MinMax Write") { _ => writeSkippingIndex(strategy(MIN_MAX), cardinality) } - .addCase("ValueSet Write (Default Size 100)") { _ => + .test("ValueSet Write (Default Size 100)") { _ => writeSkippingIndex(strategy(VALUE_SET), cardinality) } - .addCase("ValueSet Write (Unlimited Size)") { _ => + .test("ValueSet Write (Unlimited Size)") { _ => writeSkippingIndex( strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), cardinality) } - .addCase("BloomFilter Write (1M NDV)") { _ => + .test("BloomFilter Write (1M NDV)") { _ => writeSkippingIndex( strategy( BLOOM_FILTER, @@ -94,7 +123,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), cardinality) } - .addCase("BloomFilter Write (Optimal NDV)") { _ => + .test("BloomFilter Write (Optimal NDV)") { _ => writeSkippingIndex( strategy( BLOOM_FILTER, @@ -103,15 +132,15 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Write (Default 10 Candidates)") { _ => + .test("Adaptive BloomFilter Write (Default 10 Candidates)") { _ => writeSkippingIndex(strategy(BLOOM_FILTER), cardinality) } - .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => + .test("Adaptive BloomFilter Write (5 Candidates)") { _ => writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), cardinality) } - .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => + .test("Adaptive BloomFilter Write (15 Candidates)") { _ => writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), cardinality) @@ -120,22 +149,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } private def runReadBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality", 1) - .addCase("Partition Read") { _ => + new Benchmark( + name = s"Skipping Index Read $N Rows with Cardinality $cardinality", + valuesPerIteration = M, + minNumIters = READ_TEST_NUM_ITERATIONS, + output = output) + .test("Partition Read") { _ => readSkippingIndex(strategy(PARTITION), cardinality) } - .addCase("MinMax Read") { _ => + .test("MinMax Read") { _ => readSkippingIndex(strategy(MIN_MAX), cardinality) } - .addCase("ValueSet Read (Default Size 100)") { _ => + .test("ValueSet Read (Default Size 100)") { _ => readSkippingIndex(strategy(VALUE_SET), cardinality) } - .addCase("ValueSet Read (Unlimited Size)") { _ => + .test("ValueSet Read (Unlimited Size)") { _ => readSkippingIndex( strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), cardinality) } - .addCase("BloomFilter Read (1M NDV)") { _ => + .test("BloomFilter Read (1M NDV)") { _ => readSkippingIndex( strategy( BLOOM_FILTER, @@ -144,7 +177,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), cardinality) } - .addCase("BloomFilter Read (Optimal NDV)") { _ => + .test("BloomFilter Read (Optimal NDV)") { _ => readSkippingIndex( strategy( BLOOM_FILTER, @@ -153,15 +186,15 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Read (Default 10 Candidates)") { _ => + .test("Adaptive BloomFilter Read (Default 10 Candidates)") { _ => readSkippingIndex(strategy(BLOOM_FILTER), cardinality) } - .addCase("Adaptive BloomFilter Read (5 Candidates)") { _ => + .test("Adaptive BloomFilter Read (5 Candidates)") { _ => readSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), cardinality) } - .addCase("Adaptive BloomFilter Read (15 Candidates)") { _ => + .test("Adaptive BloomFilter Read (15 Candidates)") { _ => readSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), cardinality) @@ -169,20 +202,12 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .run() } - private def benchmark(name: String, numRows: Int): BenchmarkBuilder = { - new BenchmarkBuilder(new Benchmark(name, numRows, output = output)) - } - /** Benchmark builder in fluent API style */ - private class BenchmarkBuilder(benchmark: Benchmark) { + private implicit class BenchmarkBuilder(val benchmark: Benchmark) { - def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { + def test(name: String)(f: Int => Unit): Benchmark = { benchmark.addCase(name)(f) - this - } - - def run(): Unit = { - benchmark.run() + benchmark } } @@ -221,9 +246,33 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .schema(schema) .load(getTestIndexName(indexCol, cardinality)) .where(indexQuery) + .noop() // Trigger data frame execution and discard output } private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val testIndexName = getTestIndexName(indexCol, cardinality) + + /* + * FIXME: must pre-create index because data type lost in bulk JSON and binary code is auto mapped to text by OS + */ + if (indexCol.kind == BLOOM_FILTER) { + val mappings = + s"""{ + | "properties": { + | "$testColName": { + | "type": "binary", + | "doc_values": true + | } + | } + |}""".stripMargin + val testOSIndexName = testIndexName.toLowerCase(Locale.ROOT) + + openSearchClient.indices.create( + new CreateIndexRequest(testOSIndexName) + .mapping(mappings, XContentType.JSON), + RequestOptions.DEFAULT) + } + /* * Generate N random numbers with the given cardinality and build single skipping index * data structure without group by. @@ -233,17 +282,24 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .range(N) .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) .agg(namedAggCols.head, namedAggCols.tail: _*) + .coalesce(M) .write .format(FLINT_DATASOURCE) .options(openSearchOptions) .mode(Overwrite) - .save(getTestIndexName(indexCol, cardinality)) + .save(testIndexName) } private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { // Generate unique name as skipping index name in OpenSearch - val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") - s"${indexCol.kind}_${params}_$cardinality" + val params = + indexCol.parameters.toSeq + .sortBy(_._1) + .map { case (name, value) => s"${name}_$value" } + .mkString("_") + val paramsOrDefault = if (params.isEmpty) "default" else params + + s"${TEST_INDEX_PREFIX}_${indexCol.kind}_cardinality_${cardinality}_$paramsOrDefault" } private def getNamedAggColumn(indexCol: FlintSparkSkippingStrategy): Seq[Column] = { @@ -255,4 +311,10 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { new Column(aggFunc.as(name)) }.toSeq } + + private def deleteAllTestIndices(): Unit = { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(s"${TEST_INDEX_PREFIX}_*"), RequestOptions.DEFAULT) + } } From f50770c535dc764c3e305d762825e72fa1b3523f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Mar 2024 14:01:31 -0700 Subject: [PATCH 9/9] Update test result with warm up Signed-off-by: Chen Dai --- docs/benchmark-skipping-index.txt | 108 +++++++++--------- .../FlintSkippingIndexBenchmark.scala | 13 +++ 2 files changed, 67 insertions(+), 54 deletions(-) diff --git a/docs/benchmark-skipping-index.txt b/docs/benchmark-skipping-index.txt index 312858c72..4475be36f 100644 --- a/docs/benchmark-skipping-index.txt +++ b/docs/benchmark-skipping-index.txt @@ -6,43 +6,43 @@ OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Write 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------- -Partition Write 7928 7928 0 0.1 7928.2 1.0X -MinMax Write 1446 1446 0 0.7 1446.0 5.5X -ValueSet Write (Default Size 100) 1698 1698 0 0.6 1698.4 4.7X -ValueSet Write (Unlimited Size) 1456 1456 0 0.7 1455.6 5.4X -BloomFilter Write (1M NDV) 1105 1105 0 0.9 1105.0 7.2X -BloomFilter Write (Optimal NDV) 1063 1063 0 0.9 1063.0 7.5X -Adaptive BloomFilter Write (Default 10 Candidates) 1056 1056 0 0.9 1056.5 7.5X -Adaptive BloomFilter Write (5 Candidates) 1058 1058 0 0.9 1058.3 7.5X -Adaptive BloomFilter Write (15 Candidates) 2084 2084 0 0.5 2084.4 3.8X +Partition Write 1334 1334 0 0.7 1333.8 1.0X +MinMax Write 1318 1318 0 0.8 1318.3 1.0X +ValueSet Write (Default Size 100) 1400 1400 0 0.7 1399.8 1.0X +ValueSet Write (Unlimited Size) 1331 1331 0 0.8 1330.8 1.0X +BloomFilter Write (1M NDV) 1062 1062 0 0.9 1062.4 1.3X +BloomFilter Write (Optimal NDV) 1048 1048 0 1.0 1047.6 1.3X +Adaptive BloomFilter Write (Default 10 Candidates) 1045 1045 0 1.0 1045.2 1.3X +Adaptive BloomFilter Write (5 Candidates) 1053 1053 0 0.9 1053.4 1.3X +Adaptive BloomFilter Write (15 Candidates) 2102 2102 0 0.5 2102.1 0.6X OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Write 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -Partition Write 1300 1300 0 0.8 1300.2 1.0X -MinMax Write 1261 1261 0 0.8 1261.3 1.0X -ValueSet Write (Default Size 100) 1379 1379 0 0.7 1378.6 0.9X -ValueSet Write (Unlimited Size) 1369 1369 0 0.7 1368.9 0.9X -BloomFilter Write (1M NDV) 1054 1054 0 0.9 1054.4 1.2X -BloomFilter Write (Optimal NDV) 1056 1056 0 0.9 1055.9 1.2X -Adaptive BloomFilter Write (Default 10 Candidates) 1051 1051 0 1.0 1051.1 1.2X -Adaptive BloomFilter Write (5 Candidates) 1063 1063 0 0.9 1062.7 1.2X -Adaptive BloomFilter Write (15 Candidates) 2051 2051 0 0.5 2051.0 0.6X +Partition Write 1288 1288 0 0.8 1287.7 1.0X +MinMax Write 1295 1295 0 0.8 1295.0 1.0X +ValueSet Write (Default Size 100) 1357 1357 0 0.7 1357.4 0.9X +ValueSet Write (Unlimited Size) 1360 1360 0 0.7 1360.5 0.9X +BloomFilter Write (1M NDV) 1070 1070 0 0.9 1070.2 1.2X +BloomFilter Write (Optimal NDV) 1048 1048 0 1.0 1047.6 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1045 1045 0 1.0 1044.9 1.2X +Adaptive BloomFilter Write (5 Candidates) 1061 1061 0 0.9 1061.4 1.2X +Adaptive BloomFilter Write (15 Candidates) 2050 2050 0 0.5 2050.2 0.6X OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Write 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------- -Partition Write 1233 1233 0 0.8 1233.0 1.0X -MinMax Write 1245 1245 0 0.8 1244.9 1.0X -ValueSet Write (Default Size 100) 1520 1520 0 0.7 1519.7 0.8X -ValueSet Write (Unlimited Size) 1596 1596 0 0.6 1595.8 0.8X -BloomFilter Write (1M NDV) 1065 1065 0 0.9 1065.3 1.2X -BloomFilter Write (Optimal NDV) 1057 1057 0 0.9 1057.0 1.2X -Adaptive BloomFilter Write (Default 10 Candidates) 1062 1062 0 0.9 1061.8 1.2X -Adaptive BloomFilter Write (5 Candidates) 1052 1052 0 1.0 1051.9 1.2X -Adaptive BloomFilter Write (15 Candidates) 2047 2047 0 0.5 2047.1 0.6X +Partition Write 1304 1304 0 0.8 1304.1 1.0X +MinMax Write 1287 1287 0 0.8 1286.8 1.0X +ValueSet Write (Default Size 100) 1474 1474 0 0.7 1473.7 0.9X +ValueSet Write (Unlimited Size) 1713 1713 0 0.6 1713.5 0.8X +BloomFilter Write (1M NDV) 1077 1077 0 0.9 1076.7 1.2X +BloomFilter Write (Optimal NDV) 1049 1049 0 1.0 1048.8 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1055 1055 0 0.9 1054.7 1.2X +Adaptive BloomFilter Write (5 Candidates) 1047 1047 0 1.0 1047.4 1.2X +Adaptive BloomFilter Write (15 Candidates) 2054 2054 0 0.5 2054.3 0.6X ================================================================================================ @@ -53,40 +53,40 @@ OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Read 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------ -Partition Read 59 79 20 0.0 58833150.0 1.0X -MinMax Read 56 69 12 0.0 55926430.0 1.1X -ValueSet Read (Default Size 100) 44 57 8 0.0 44342915.0 1.3X -ValueSet Read (Unlimited Size) 45 56 8 0.0 44744854.0 1.3X -BloomFilter Read (1M NDV) 2494 2544 68 0.0 2493617832.0 0.0X -BloomFilter Read (Optimal NDV) 2388 2615 450 0.0 2387523635.0 0.0X -Adaptive BloomFilter Read (Default 10 Candidates) 2349 2389 47 0.0 2349330716.0 0.0X -Adaptive BloomFilter Read (5 Candidates) 2366 2394 28 0.0 2366240134.0 0.0X -Adaptive BloomFilter Read (15 Candidates) 2347 2407 59 0.0 2347162371.0 0.0X +Partition Read 54 65 9 0.0 54473389.0 1.0X +MinMax Read 57 65 8 0.0 56855820.0 1.0X +ValueSet Read (Default Size 100) 50 61 11 0.0 49529808.0 1.1X +ValueSet Read (Unlimited Size) 43 54 8 0.0 43301469.0 1.3X +BloomFilter Read (1M NDV) 2648 2733 60 0.0 2647662965.0 0.0X +BloomFilter Read (Optimal NDV) 2450 2484 24 0.0 2450135369.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2441 2458 18 0.0 2441226280.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2451 2476 26 0.0 2450510244.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2397 2461 44 0.0 2397133383.0 0.0X OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Read 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- -Partition Read 34 41 6 0.0 34280024.0 1.0X -MinMax Read 39 50 9 0.0 39144570.0 0.9X -ValueSet Read (Default Size 100) 41 54 13 0.0 40504625.0 0.8X -ValueSet Read (Unlimited Size) 37 46 8 0.0 36629686.0 0.9X -BloomFilter Read (1M NDV) 2396 2471 52 0.0 2396434455.0 0.0X -BloomFilter Read (Optimal NDV) 2406 2442 32 0.0 2406111582.0 0.0X -Adaptive BloomFilter Read (Default 10 Candidates) 2419 2446 26 0.0 2419392722.0 0.0X -Adaptive BloomFilter Read (5 Candidates) 2421 2458 33 0.0 2420675769.0 0.0X -Adaptive BloomFilter Read (15 Candidates) 2415 2424 9 0.0 2415005443.0 0.0X +Partition Read 31 35 5 0.0 31101827.0 1.0X +MinMax Read 33 40 6 0.0 33385163.0 0.9X +ValueSet Read (Default Size 100) 30 37 6 0.0 30479810.0 1.0X +ValueSet Read (Unlimited Size) 31 37 6 0.0 31004587.0 1.0X +BloomFilter Read (1M NDV) 2477 2537 51 0.0 2477281890.0 0.0X +BloomFilter Read (Optimal NDV) 2408 2461 45 0.0 2408002056.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2367 2413 43 0.0 2366950203.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2399 2429 26 0.0 2399147197.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2382 2421 34 0.0 2381512783.0 0.0X OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz Skipping Index Read 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -Partition Read 29 36 6 0.0 29499019.0 1.0X -MinMax Read 33 43 9 0.0 33272156.0 0.9X -ValueSet Read (Default Size 100) 32 39 8 0.0 31548122.0 0.9X -ValueSet Read (Unlimited Size) 46 55 8 0.0 45973861.0 0.6X -BloomFilter Read (1M NDV) 2521 2554 26 0.0 2520994171.0 0.0X -BloomFilter Read (Optimal NDV) 2359 2424 46 0.0 2358583966.0 0.0X -Adaptive BloomFilter Read (Default 10 Candidates) 2358 2413 45 0.0 2358475850.0 0.0X -Adaptive BloomFilter Read (5 Candidates) 2483 2682 263 0.0 2482781961.0 0.0X -Adaptive BloomFilter Read (15 Candidates) 2401 2442 34 0.0 2400778171.0 0.0X \ No newline at end of file +Partition Read 26 30 5 0.0 25781731.0 1.0X +MinMax Read 30 34 7 0.0 29514335.0 0.9X +ValueSet Read (Default Size 100) 27 34 6 0.0 27338628.0 0.9X +ValueSet Read (Unlimited Size) 39 45 6 0.0 39315292.0 0.7X +BloomFilter Read (1M NDV) 2374 2433 55 0.0 2373982609.0 0.0X +BloomFilter Read (Optimal NDV) 2354 2415 60 0.0 2354204521.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2322 2407 51 0.0 2321669934.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2413 2465 44 0.0 2413487418.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2351 2401 36 0.0 2351322414.0 0.0X diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 2660f9065..3e42304b2 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -15,6 +15,7 @@ import org.opensearch.client.indices.CreateIndexRequest import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._ import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -83,6 +84,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { super.runBenchmarkSuite(mainArgs) + warmup() runBenchmark("Skipping Index Write") { CARDINALITIES.foreach(runWriteBenchmark) @@ -92,6 +94,17 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } } + private def warmup(): Unit = { + try { + SkippingKind.values.foreach(kind => { + writeSkippingIndex(strategy(kind), 1) + readSkippingIndex(strategy(kind), 1) + }) + } finally { + deleteAllTestIndices() + } + } + private def runWriteBenchmark(cardinality: Int): Unit = { new Benchmark( name = s"Skipping Index Write $N Rows with Cardinality $cardinality",