diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 6dcd2f15f..4fb79d67d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -7,16 +7,12 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.opensearch.flint.core.field.bloomfilter.{BloomFilter, BloomFilterFactory} import org.opensearch.flint.core.field.bloomfilter.adaptive.AdaptiveBloomFilter -import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} -import org.apache.spark.sql.functions.{col, xxhash64} import org.apache.spark.sql.types.{BinaryType, DataType} /** @@ -111,12 +107,3 @@ case class BloomFilterAgg( override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newOffset) } - -object BloomFilterAgg { - - def bloom_filter_agg(colName: String, params: Map[String, String]): Column = { - new Column( - new BloomFilterAgg(xxhash64(col(colName)).expr, BloomFilterFactory.of(params.asJava)) - .toAggregateExpression()) - } -} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala index fd6371f44..84fb1c439 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/benchmark/FlintSkippingIndexBenchmark.scala @@ -25,11 +25,24 @@ import org.apache.spark.sql.functions.{expr, rand} * Flint skipping index benchmark that focus on skipping data structure read and write performance * with OpenSearch based on Spark benchmark framework. * + * Test cases include reading and writing the following skipping data structures: + * {{{ + * 1. Partition + * 2. MinMax + * 3. ValueSet (default size 100) + * 4. ValueSet (unlimited size) + * 5. BloomFilter (classic, 1M NDV) + * 6. BloomFilter (classic, optimal NDV = cardinality) + * 7. BloomFilter (adaptive, default 10 candidates) + * 8. BloomFilter (adaptive, 5 candidates) + * 9. BloomFilter (adaptive, 15 candidates) + * }}} + * * To run this benchmark: * {{{ * > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark" - * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". * }}} + * Results will be written to "benchmarks/FlintSkippingIndexBenchmark--results.txt". */ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { @@ -45,18 +58,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { runBenchmark("Skipping Index Write") { runWriteBenchmark(64) - // runWriteBenchmark(512) - // runWriteBenchmark(65536) + runWriteBenchmark(512) + runWriteBenchmark(65536) } runBenchmark("Skipping Index Read") { runReadBenchmark(16) - // runReadBenchmark(64) - // runReadBenchmark(512) + runReadBenchmark(512) + runReadBenchmark(65536) } } private def runWriteBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality") + benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality", N) .addCase("Partition Write") { _ => // Partitioned column cardinality must be 1 (all values are the same in a single file0 writeSkippingIndex(strategy(PARTITION), 1) @@ -107,7 +120,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } private def runReadBenchmark(cardinality: Int): Unit = { - benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality") + benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality", 1) .addCase("Partition Read") { _ => readSkippingIndex(strategy(PARTITION), cardinality) } @@ -140,7 +153,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)), cardinality) } - .addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ => + .addCase("Adaptive BloomFilter Read (Default 10 Candidates)") { _ => readSkippingIndex(strategy(BLOOM_FILTER), cardinality) } .addCase("Adaptive BloomFilter Read (5 Candidates)") { _ => @@ -156,10 +169,11 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { .run() } - private def benchmark(name: String): BenchmarkBuilder = { - new BenchmarkBuilder(new Benchmark(name, N, output = output)) + private def benchmark(name: String, numRows: Int): BenchmarkBuilder = { + new BenchmarkBuilder(new Benchmark(name, numRows, output = output)) } + /** Benchmark builder in fluent API style */ private class BenchmarkBuilder(benchmark: Benchmark) { def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = { @@ -195,19 +209,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { val schema = indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ") + /* + * Rewrite the query on test column and run it against skipping index created in previous + * write benchmark. + */ + val indexQuery = + indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get spark.read .format(FLINT_DATASOURCE) .options(openSearchOptions) .schema(schema) .load(getTestIndexName(indexCol, cardinality)) - .where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get) + .where(indexQuery) } private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = { + /* + * Generate N random numbers with the given cardinality and build single skipping index + * data structure without group by. + */ val namedAggCols = getNamedAggColumn(indexCol) - - // Generate N random numbers of cardinality and build single skipping index - // data structure without grouping by spark .range(N) .withColumn(testColName, (rand() * cardinality + 1).cast(testColType)) @@ -220,6 +241,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark { } private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = { + // Generate unique name as skipping index name in OpenSearch val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_") s"${indexCol.kind}_${params}_$cardinality" }