diff --git a/docs/benchmark-skipping-index.txt b/docs/benchmark-skipping-index.txt new file mode 100644 index 000000000..4475be36f --- /dev/null +++ b/docs/benchmark-skipping-index.txt @@ -0,0 +1,92 @@ +================================================================================================ +Skipping Index Write +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 1334 1334 0 0.7 1333.8 1.0X +MinMax Write 1318 1318 0 0.8 1318.3 1.0X +ValueSet Write (Default Size 100) 1400 1400 0 0.7 1399.8 1.0X +ValueSet Write (Unlimited Size) 1331 1331 0 0.8 1330.8 1.0X +BloomFilter Write (1M NDV) 1062 1062 0 0.9 1062.4 1.3X +BloomFilter Write (Optimal NDV) 1048 1048 0 1.0 1047.6 1.3X +Adaptive BloomFilter Write (Default 10 Candidates) 1045 1045 0 1.0 1045.2 1.3X +Adaptive BloomFilter Write (5 Candidates) 1053 1053 0 0.9 1053.4 1.3X +Adaptive BloomFilter Write (15 Candidates) 2102 2102 0 0.5 2102.1 0.6X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 1288 1288 0 0.8 1287.7 1.0X +MinMax Write 1295 1295 0 0.8 1295.0 1.0X +ValueSet Write (Default Size 100) 1357 1357 0 0.7 1357.4 0.9X +ValueSet Write (Unlimited Size) 1360 1360 0 0.7 1360.5 0.9X +BloomFilter Write (1M NDV) 1070 1070 0 0.9 1070.2 1.2X +BloomFilter Write (Optimal NDV) 1048 1048 0 1.0 1047.6 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1045 1045 0 1.0 1044.9 1.2X +Adaptive BloomFilter Write (5 Candidates) 1061 1061 0 0.9 1061.4 1.2X +Adaptive BloomFilter Write (15 Candidates) 2050 2050 0 0.5 2050.2 0.6X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Write 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------- +Partition Write 1304 1304 0 0.8 1304.1 1.0X +MinMax Write 1287 1287 0 0.8 1286.8 1.0X +ValueSet Write (Default Size 100) 1474 1474 0 0.7 1473.7 0.9X +ValueSet Write (Unlimited Size) 1713 1713 0 0.6 1713.5 0.8X +BloomFilter Write (1M NDV) 1077 1077 0 0.9 1076.7 1.2X +BloomFilter Write (Optimal NDV) 1049 1049 0 1.0 1048.8 1.2X +Adaptive BloomFilter Write (Default 10 Candidates) 1055 1055 0 0.9 1054.7 1.2X +Adaptive BloomFilter Write (5 Candidates) 1047 1047 0 1.0 1047.4 1.2X +Adaptive BloomFilter Write (15 Candidates) 2054 2054 0 0.5 2054.3 0.6X + + +================================================================================================ +Skipping Index Read +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------ +Partition Read 54 65 9 0.0 54473389.0 1.0X +MinMax Read 57 65 8 0.0 56855820.0 1.0X +ValueSet Read (Default Size 100) 50 61 11 0.0 49529808.0 1.1X +ValueSet Read (Unlimited Size) 43 54 8 0.0 43301469.0 1.3X +BloomFilter Read (1M NDV) 2648 2733 60 0.0 2647662965.0 0.0X +BloomFilter Read (Optimal NDV) 2450 2484 24 0.0 2450135369.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2441 2458 18 0.0 2441226280.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2451 2476 26 0.0 2450510244.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2397 2461 44 0.0 2397133383.0 0.0X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 2048: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------- +Partition Read 31 35 5 0.0 31101827.0 1.0X +MinMax Read 33 40 6 0.0 33385163.0 0.9X +ValueSet Read (Default Size 100) 30 37 6 0.0 30479810.0 1.0X +ValueSet Read (Unlimited Size) 31 37 6 0.0 31004587.0 1.0X +BloomFilter Read (1M NDV) 2477 2537 51 0.0 2477281890.0 0.0X +BloomFilter Read (Optimal NDV) 2408 2461 45 0.0 2408002056.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2367 2413 43 0.0 2366950203.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2399 2429 26 0.0 2399147197.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2382 2421 34 0.0 2381512783.0 0.0X + +OpenJDK 64-Bit Server VM 11.0.20+0 on Mac OS X 14.3.1 +Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +Skipping Index Read 1000000 Rows with Cardinality 65536: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------- +Partition Read 26 30 5 0.0 25781731.0 1.0X +MinMax Read 30 34 7 0.0 29514335.0 0.9X +ValueSet Read (Default Size 100) 27 34 6 0.0 27338628.0 0.9X +ValueSet Read (Unlimited Size) 39 45 6 0.0 39315292.0 0.7X +BloomFilter Read (1M NDV) 2374 2433 55 0.0 2373982609.0 0.0X +BloomFilter Read (Optimal NDV) 2354 2415 60 0.0 2354204521.0 0.0X +Adaptive BloomFilter Read (Default 10 Candidates) 2322 2407 51 0.0 2321669934.0 0.0X +Adaptive BloomFilter Read (5 Candidates) 2413 2465 44 0.0 2413487418.0 0.0X +Adaptive BloomFilter Read (15 Candidates) 2351 2401 36 0.0 2351322414.0 0.0X diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala new file mode 100644 index 000000000..3e42304b2 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -0,0 +1,333 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.benchmark + +import java.util.Locale + +import scala.concurrent.duration.DurationInt + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.CreateIndexRequest +import org.opensearch.common.xcontent.XContentType +import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._ +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy +import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy +import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.Column +import org.apache.spark.sql.SaveMode.Overwrite +import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.functions.{expr, rand} + +/** + * Flint skipping index benchmark that focus on skipping data structure read and write performance + * with OpenSearch based on Spark benchmark framework. + * + * Test cases include reading and writing the following skipping data structures: + * {{{ + * 1. Partition + * 2. MinMax + * 3. ValueSet (default size 100) + * 4. ValueSet (unlimited size) + * 5. BloomFilter (classic, 1M NDV) + * 6. BloomFilter (classic, optimal NDV = cardinality) + * 7. BloomFilter (adaptive, default 10 candidates) + * 8. BloomFilter (adaptive, 5 candidates) + * 9. BloomFilter (adaptive, 15 candidates) + * }}} + * + * Test parameters: + * {{{ + * 1. N = 1M rows of test data generated + * 2. M = 1 OpenSearch doc written as skipping index + * 3. cardinality = {64, 2048, 65536} distinct values in N rows + * }}} + * + * To run this benchmark: + * {{{ + * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark" + * }}} + * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". + */ +object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { + + /** How many rows generated in test data */ + private val N = 1000000 + + /** How many OpenSearch docs created */ + private val M = 1 + + /** Cardinalities of test data */ + private val CARDINALITIES = Seq(64, 2048, 65536) + + /** How many runs for each test case */ + private val WRITE_TEST_NUM_ITERATIONS = 1 + private val READ_TEST_NUM_ITERATIONS = 5 + + /** Test index name prefix */ + private val TEST_INDEX_PREFIX = "flint_benchmark" + + /** Test column name and type */ + private val testColName = "value" + private val testColType = "Int" + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + super.runBenchmarkSuite(mainArgs) + warmup() + + runBenchmark("Skipping Index Write") { + CARDINALITIES.foreach(runWriteBenchmark) + } + runBenchmark("Skipping Index Read") { + CARDINALITIES.foreach(runReadBenchmark) + } + } + + private def warmup(): Unit = { + try { + SkippingKind.values.foreach(kind => { + writeSkippingIndex(strategy(kind), 1) + readSkippingIndex(strategy(kind), 1) + }) + } finally { + deleteAllTestIndices() + } + } + + private def runWriteBenchmark(cardinality: Int): Unit = { + new Benchmark( + name = s"Skipping Index Write $N Rows with Cardinality $cardinality", + valuesPerIteration = N, + minNumIters = WRITE_TEST_NUM_ITERATIONS, + warmupTime = (-1).seconds, // ensure no warm up, otherwise test doc duplicate + minTime = (-1).seconds, // ensure run only once, otherwise test doc duplicate + output = output) + .test("Partition Write") { _ => + writeSkippingIndex(strategy(PARTITION), cardinality) + } + .test("MinMax Write") { _ => + writeSkippingIndex(strategy(MIN_MAX), cardinality) + } + .test("ValueSet Write (Default Size 100)") { _ => + writeSkippingIndex(strategy(VALUE_SET), cardinality) + } + .test("ValueSet Write (Unlimited Size)") { _ => + writeSkippingIndex( + strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .test("BloomFilter Write (1M NDV)") { _ => + writeSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .test("BloomFilter Write (Optimal NDV)") { _ => + writeSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .test("Adaptive BloomFilter Write (Default 10 Candidates)") { _ => + writeSkippingIndex(strategy(BLOOM_FILTER), cardinality) + } + .test("Adaptive BloomFilter Write (5 Candidates)") { _ => + writeSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .test("Adaptive BloomFilter Write (15 Candidates)") { _ => + writeSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } + + private def runReadBenchmark(cardinality: Int): Unit = { + new Benchmark( + name = s"Skipping Index Read $N Rows with Cardinality $cardinality", + valuesPerIteration = M, + minNumIters = READ_TEST_NUM_ITERATIONS, + output = output) + .test("Partition Read") { _ => + readSkippingIndex(strategy(PARTITION), cardinality) + } + .test("MinMax Read") { _ => + readSkippingIndex(strategy(MIN_MAX), cardinality) + } + .test("ValueSet Read (Default Size 100)") { _ => + readSkippingIndex(strategy(VALUE_SET), cardinality) + } + .test("ValueSet Read (Unlimited Size)") { _ => + readSkippingIndex( + strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .test("BloomFilter Read (1M NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .test("BloomFilter Read (Optimal NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .test("Adaptive BloomFilter Read (Default 10 Candidates)") { _ => + readSkippingIndex(strategy(BLOOM_FILTER), cardinality) + } + .test("Adaptive BloomFilter Read (5 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .test("Adaptive BloomFilter Read (15 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } + + /** Benchmark builder in fluent API style */ + private implicit class BenchmarkBuilder(val benchmark: Benchmark) { + + def test(name: String)(f: Int => Unit): Benchmark = { + benchmark.addCase(name)(f) + benchmark + } + } + + private def strategy( + kind: SkippingKind, + params: Map[String, String] = Map.empty): FlintSparkSkippingStrategy = { + kind match { + case PARTITION => + PartitionSkippingStrategy(columnName = testColName, columnType = testColType) + case MIN_MAX => MinMaxSkippingStrategy(columnName = testColName, columnType = testColType) + case VALUE_SET => + ValueSetSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = params) + case BLOOM_FILTER => + BloomFilterSkippingStrategy( + columnName = testColName, + columnType = testColType, + params = params) + } + } + + private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val schema = + indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ") + /* + * Rewrite the query on test column and run it against skipping index created in previous + * write benchmark. + */ + val indexQuery = + indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get + spark.read + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .schema(schema) + .load(getTestIndexName(indexCol, cardinality)) + .where(indexQuery) + .noop() // Trigger data frame execution and discard output + } + + private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val testIndexName = getTestIndexName(indexCol, cardinality) + + /* + * FIXME: must pre-create index because data type lost in bulk JSON and binary code is auto mapped to text by OS + */ + if (indexCol.kind == BLOOM_FILTER) { + val mappings = + s"""{ + | "properties": { + | "$testColName": { + | "type": "binary", + | "doc_values": true + | } + | } + |}""".stripMargin + val testOSIndexName = testIndexName.toLowerCase(Locale.ROOT) + + openSearchClient.indices.create( + new CreateIndexRequest(testOSIndexName) + .mapping(mappings, XContentType.JSON), + RequestOptions.DEFAULT) + } + + /* + * Generate N random numbers with the given cardinality and build single skipping index + * data structure without group by. + */ + val namedAggCols = getNamedAggColumn(indexCol) + spark + .range(N) + .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) + .agg(namedAggCols.head, namedAggCols.tail: _*) + .coalesce(M) + .write + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .mode(Overwrite) + .save(testIndexName) + } + + private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { + // Generate unique name as skipping index name in OpenSearch + val params = + indexCol.parameters.toSeq + .sortBy(_._1) + .map { case (name, value) => s"${name}_$value" } + .mkString("_") + val paramsOrDefault = if (params.isEmpty) "default" else params + + s"${TEST_INDEX_PREFIX}_${indexCol.kind}_cardinality_${cardinality}_$paramsOrDefault" + } + + private def getNamedAggColumn(indexCol: FlintSparkSkippingStrategy): Seq[Column] = { + val outputNames = indexCol.outputSchema().keys + val aggFuncs = indexCol.getAggregators + + // Wrap aggregate function with output column name + (outputNames, aggFuncs).zipped.map { case (name, aggFunc) => + new Column(aggFunc.as(name)) + }.toSeq + } + + private def deleteAllTestIndices(): Unit = { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(s"${TEST_INDEX_PREFIX}_*"), RequestOptions.DEFAULT) + } +} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala new file mode 100644 index 000000000..ae1602805 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSparkBenchmark.scala @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.benchmark + +import org.apache.http.HttpHost +import org.opensearch.client.{RestClient, RestHighLevelClient} +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.testcontainers.OpenSearchContainer + +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.flint.config.FlintSparkConf._ + +/** + * Flint benchmark base class for benchmarking with Spark and OpenSearch. + * [[org.opensearch.flint.OpenSearchSuite]] doesn't work here because Spark's + * [[SqlBasedBenchmark]] is not a Scala test suite. + */ +trait FlintSparkBenchmark extends SqlBasedBenchmark { + + protected lazy val flint: FlintSpark = new FlintSpark(spark) + + protected lazy val container = new OpenSearchContainer() + + protected lazy val openSearchPort: Int = container.port() + + protected lazy val openSearchHost: String = container.getHost + + protected lazy val openSearchClient = new RestHighLevelClient( + RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) + + protected lazy val openSearchOptions: Map[String, String] = + Map( + s"${HOST_ENDPOINT.optionKey}" -> openSearchHost, + s"${HOST_PORT.optionKey}" -> s"$openSearchPort", + s"${REFRESH_POLICY.optionKey}" -> "wait_for", + s"${IGNORE_DOC_ID_COLUMN.optionKey}" -> "false") + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + container.start() + } + + override def afterAll(): Unit = { + container.close() + super.afterAll() + } +}