From 679569af1e7f264cf52eb56c3ee0aacc024b381b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Mar 2024 14:52:46 -0700 Subject: [PATCH] Add skipping index read benchmark Signed-off-by: Chen Dai --- .../FlintSkippingIndexBenchmark.scala | 101 ++++++++++++++---- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index 8ddb8aa51..fd6371f44 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE -import org.apache.spark.sql.functions.rand +import org.apache.spark.sql.functions.{expr, rand} /** * Flint skipping index benchmark that focus on skipping data structure read and write performance @@ -45,38 +45,35 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { runBenchmark("Skipping Index Write") { runWriteBenchmark(64) - runWriteBenchmark(512) - runWriteBenchmark(65536) + // runWriteBenchmark(512) + // runWriteBenchmark(65536) } - - /* runBenchmark("Skipping Index Read") { runReadBenchmark(16) - runReadBenchmark(64) - runReadBenchmark(512) + // runReadBenchmark(64) + // runReadBenchmark(512) } - */ } private def runWriteBenchmark(cardinality: Int): Unit = { benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") .addCase("Partition Write") { _ => // Partitioned column cardinality must be 1 (all values are the same in a single file0 - buildSkippingIndex(strategy(PARTITION), 1) + writeSkippingIndex(strategy(PARTITION), 1) } .addCase("MinMax Write") { _ => - buildSkippingIndex(strategy(MIN_MAX), cardinality) + writeSkippingIndex(strategy(MIN_MAX), cardinality) } - .addCase("ValueSet Write (Default Size 100") { _ => - buildSkippingIndex(strategy(VALUE_SET), cardinality) + .addCase("ValueSet Write (Default Size 100)") { _ => + writeSkippingIndex(strategy(VALUE_SET), cardinality) } .addCase("ValueSet Write (Unlimited Size)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), cardinality) } .addCase("BloomFilter Write (1M NDV)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy( BLOOM_FILTER, Map( @@ -85,7 +82,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { cardinality) } .addCase("BloomFilter Write (Optimal NDV)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy( BLOOM_FILTER, Map( @@ -93,23 +90,71 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Write (Default, 10 Candidates)") { _ => - buildSkippingIndex(strategy(BLOOM_FILTER), cardinality) + .addCase("Adaptive BloomFilter Write (Default 10 Candidates)") { _ => + writeSkippingIndex(strategy(BLOOM_FILTER), cardinality) } .addCase("Adaptive BloomFilter Write (5 Candidates)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), cardinality) } .addCase("Adaptive BloomFilter Write (15 Candidates)") { _ => - buildSkippingIndex( + writeSkippingIndex( strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), cardinality) } .run() } - private def runReadBenchmark(cardinality: Int): Unit = {} + private def runReadBenchmark(cardinality: Int): Unit = { + benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality") + .addCase("Partition Read") { _ => + readSkippingIndex(strategy(PARTITION), cardinality) + } + .addCase("MinMax Read") { _ => + readSkippingIndex(strategy(MIN_MAX), cardinality) + } + .addCase("ValueSet Read (Default Size 100)") { _ => + readSkippingIndex(strategy(VALUE_SET), cardinality) + } + .addCase("ValueSet Read (Unlimited Size)") { _ => + readSkippingIndex( + strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)), + cardinality) + } + .addCase("BloomFilter Read (1M NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)), + cardinality) + } + .addCase("BloomFilter Read (Optimal NDV)") { _ => + readSkippingIndex( + strategy( + BLOOM_FILTER, + Map( + BLOOM_FILTER_ADAPTIVE_KEY -> "false", + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), + cardinality) + } + .addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ => + readSkippingIndex(strategy(BLOOM_FILTER), cardinality) + } + .addCase("Adaptive BloomFilter Read (5 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")), + cardinality) + } + .addCase("Adaptive BloomFilter Read (15 Candidates)") { _ => + readSkippingIndex( + strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")), + cardinality) + } + .run() + } private def benchmark(name: String): BenchmarkBuilder = { new BenchmarkBuilder(new Benchmark(name, N, output = output)) @@ -147,8 +192,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } } - private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { - val indexName = getTestIndexName(indexCol, cardinality) + private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + val schema = + indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ") + spark.read + .format(FLINT_DATASOURCE) + .options(openSearchOptions) + .schema(schema) + .load(getTestIndexName(indexCol, cardinality)) + .where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get) + } + + private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { val namedAggCols = getNamedAggColumn(indexCol) // Generate N random numbers of cardinality and build single skipping index @@ -161,7 +216,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .format(FLINT_DATASOURCE) .options(openSearchOptions) .mode(Overwrite) - .save(indexName) + .save(getTestIndexName(indexCol, cardinality)) } private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = {