Skip to content

Commit

Permalink
Add more comment and Javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Mar 20, 2024
1 parent 55ab746 commit 61c4dc4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ package org.opensearch.flint.spark.skipping.bloomfilter

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.field.bloomfilter.{BloomFilter, BloomFilterFactory}
import org.opensearch.flint.core.field.bloomfilter.adaptive.AdaptiveBloomFilter

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
import org.apache.spark.sql.functions.{col, xxhash64}
import org.apache.spark.sql.types.{BinaryType, DataType}

/**
Expand Down Expand Up @@ -111,12 +107,3 @@ case class BloomFilterAgg(
override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate =
copy(inputAggBufferOffset = newOffset)
}

object BloomFilterAgg {

def bloom_filter_agg(colName: String, params: Map[String, String]): Column = {
new Column(
new BloomFilterAgg(xxhash64(col(colName)).expr, BloomFilterFactory.of(params.asJava))
.toAggregateExpression())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,24 @@ import org.apache.spark.sql.functions.{expr, rand}
* Flint skipping index benchmark that focus on skipping data structure read and write performance
* with OpenSearch based on Spark benchmark framework.
*
* Test cases include reading and writing the following skipping data structures:
* {{{
* 1. Partition
* 2. MinMax
* 3. ValueSet (default size 100)
* 4. ValueSet (unlimited size)
* 5. BloomFilter (classic, 1M NDV)
* 6. BloomFilter (classic, optimal NDV = cardinality)
* 7. BloomFilter (adaptive, default 10 candidates)
* 8. BloomFilter (adaptive, 5 candidates)
* 9. BloomFilter (adaptive, 15 candidates)
* }}}
*
* To run this benchmark:
* {{{
* > SPARK_GENERATE_BENCHMARK_FILES=1 sbt clean "set every Test / test := {}" "integtest/test:runMain org.apache.spark.sql.benchmark.FlintSkippingIndexBenchmark"
* Results will be written to "benchmarks/FlintSkippingIndexBenchmark-<JDK>-results.txt".
* }}}
* Results will be written to "benchmarks/FlintSkippingIndexBenchmark-<JDK>-results.txt".
*/
object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {

Expand All @@ -45,18 +58,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {

runBenchmark("Skipping Index Write") {
runWriteBenchmark(64)
// runWriteBenchmark(512)
// runWriteBenchmark(65536)
runWriteBenchmark(512)
runWriteBenchmark(65536)
}
runBenchmark("Skipping Index Read") {
runReadBenchmark(16)
// runReadBenchmark(64)
// runReadBenchmark(512)
runReadBenchmark(512)
runReadBenchmark(65536)
}
}

private def runWriteBenchmark(cardinality: Int): Unit = {
benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality")
benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality", N)
.addCase("Partition Write") { _ =>
// Partitioned column cardinality must be 1 (all values are the same in a single file0
writeSkippingIndex(strategy(PARTITION), 1)
Expand Down Expand Up @@ -107,7 +120,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
}

private def runReadBenchmark(cardinality: Int): Unit = {
benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality")
benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality", 1)
.addCase("Partition Read") { _ =>
readSkippingIndex(strategy(PARTITION), cardinality)
}
Expand Down Expand Up @@ -140,7 +153,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)),
cardinality)
}
.addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ =>
.addCase("Adaptive BloomFilter Read (Default 10 Candidates)") { _ =>
readSkippingIndex(strategy(BLOOM_FILTER), cardinality)
}
.addCase("Adaptive BloomFilter Read (5 Candidates)") { _ =>
Expand All @@ -156,10 +169,11 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
.run()
}

private def benchmark(name: String): BenchmarkBuilder = {
new BenchmarkBuilder(new Benchmark(name, N, output = output))
private def benchmark(name: String, numRows: Int): BenchmarkBuilder = {
new BenchmarkBuilder(new Benchmark(name, numRows, output = output))
}

/** Benchmark builder in fluent API style */
private class BenchmarkBuilder(benchmark: Benchmark) {

def addCase(name: String)(f: Int => Unit): BenchmarkBuilder = {
Expand Down Expand Up @@ -195,19 +209,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
val schema =
indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ")
/*
* Rewrite the query on test column and run it against skipping index created in previous
* write benchmark.
*/
val indexQuery =
indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get
spark.read
.format(FLINT_DATASOURCE)
.options(openSearchOptions)
.schema(schema)
.load(getTestIndexName(indexCol, cardinality))
.where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get)
.where(indexQuery)
}

private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
/*
* Generate N random numbers with the given cardinality and build single skipping index
* data structure without group by.
*/
val namedAggCols = getNamedAggColumn(indexCol)

// Generate N random numbers of cardinality and build single skipping index
// data structure without grouping by
spark
.range(N)
.withColumn(testColName, (rand() * cardinality + 1).cast(testColType))
Expand All @@ -220,6 +241,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
}

private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = {
// Generate unique name as skipping index name in OpenSearch
val params = indexCol.parameters.map { case (name, value) => s"${name}_$value" }.mkString("_")
s"${indexCol.kind}_${params}_$cardinality"
}
Expand Down

0 comments on commit 61c4dc4

Please sign in to comment.