Skip to content

Commit

Permalink
Add skipping index read benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Mar 20, 2024
1 parent 7867bee commit 679569a
Showing 1 changed file with 78 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.functions.{expr, rand}

/**
* Flint skipping index benchmark that focus on skipping data structure read and write performance
Expand All @@ -45,38 +45,35 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {

runBenchmark("Skipping Index Write") {
runWriteBenchmark(64)
runWriteBenchmark(512)
runWriteBenchmark(65536)
// runWriteBenchmark(512)
// runWriteBenchmark(65536)
}

/*
runBenchmark("Skipping Index Read") {
runReadBenchmark(16)
runReadBenchmark(64)
runReadBenchmark(512)
// runReadBenchmark(64)
// runReadBenchmark(512)
}
*/
}

private def runWriteBenchmark(cardinality: Int): Unit = {
benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality")
.addCase("Partition Write") { _ =>
// Partitioned column cardinality must be 1 (all values are the same in a single file0
buildSkippingIndex(strategy(PARTITION), 1)
writeSkippingIndex(strategy(PARTITION), 1)
}
.addCase("MinMax Write") { _ =>
buildSkippingIndex(strategy(MIN_MAX), cardinality)
writeSkippingIndex(strategy(MIN_MAX), cardinality)
}
.addCase("ValueSet Write (Default Size 100") { _ =>
buildSkippingIndex(strategy(VALUE_SET), cardinality)
.addCase("ValueSet Write (Default Size 100)") { _ =>
writeSkippingIndex(strategy(VALUE_SET), cardinality)
}
.addCase("ValueSet Write (Unlimited Size)") { _ =>
buildSkippingIndex(
writeSkippingIndex(
strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)),
cardinality)
}
.addCase("BloomFilter Write (1M NDV)") { _ =>
buildSkippingIndex(
writeSkippingIndex(
strategy(
BLOOM_FILTER,
Map(
Expand All @@ -85,31 +82,79 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
cardinality)
}
.addCase("BloomFilter Write (Optimal NDV)") { _ =>
buildSkippingIndex(
writeSkippingIndex(
strategy(
BLOOM_FILTER,
Map(
BLOOM_FILTER_ADAPTIVE_KEY -> "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)),
cardinality)
}
.addCase("Adaptive BloomFilter Write (Default, 10 Candidates)") { _ =>
buildSkippingIndex(strategy(BLOOM_FILTER), cardinality)
.addCase("Adaptive BloomFilter Write (Default 10 Candidates)") { _ =>
writeSkippingIndex(strategy(BLOOM_FILTER), cardinality)
}
.addCase("Adaptive BloomFilter Write (5 Candidates)") { _ =>
buildSkippingIndex(
writeSkippingIndex(
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")),
cardinality)
}
.addCase("Adaptive BloomFilter Write (15 Candidates)") { _ =>
buildSkippingIndex(
writeSkippingIndex(
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")),
cardinality)
}
.run()
}

private def runReadBenchmark(cardinality: Int): Unit = {}
private def runReadBenchmark(cardinality: Int): Unit = {
benchmark(s"Skipping Index Read $N Rows with Cardinality $cardinality")
.addCase("Partition Read") { _ =>
readSkippingIndex(strategy(PARTITION), cardinality)
}
.addCase("MinMax Read") { _ =>
readSkippingIndex(strategy(MIN_MAX), cardinality)
}
.addCase("ValueSet Read (Default Size 100)") { _ =>
readSkippingIndex(strategy(VALUE_SET), cardinality)
}
.addCase("ValueSet Read (Unlimited Size)") { _ =>
readSkippingIndex(
strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)),
cardinality)
}
.addCase("BloomFilter Read (1M NDV)") { _ =>
readSkippingIndex(
strategy(
BLOOM_FILTER,
Map(
BLOOM_FILTER_ADAPTIVE_KEY -> "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)),
cardinality)
}
.addCase("BloomFilter Read (Optimal NDV)") { _ =>
readSkippingIndex(
strategy(
BLOOM_FILTER,
Map(
BLOOM_FILTER_ADAPTIVE_KEY -> "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)),
cardinality)
}
.addCase("Adaptive BloomFilter Read (Default, 10 Candidates)") { _ =>
readSkippingIndex(strategy(BLOOM_FILTER), cardinality)
}
.addCase("Adaptive BloomFilter Read (5 Candidates)") { _ =>
readSkippingIndex(
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")),
cardinality)
}
.addCase("Adaptive BloomFilter Read (15 Candidates)") { _ =>
readSkippingIndex(
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")),
cardinality)
}
.run()
}

private def benchmark(name: String): BenchmarkBuilder = {
new BenchmarkBuilder(new Benchmark(name, N, output = output))
Expand Down Expand Up @@ -147,8 +192,18 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
}
}

private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
val indexName = getTestIndexName(indexCol, cardinality)
private def readSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
val schema =
indexCol.outputSchema().map { case (key, value) => s"$key $value" }.mkString(", ")
spark.read
.format(FLINT_DATASOURCE)
.options(openSearchOptions)
.schema(schema)
.load(getTestIndexName(indexCol, cardinality))
.where(indexCol.rewritePredicate(expr(s"$testColName = 1").expr).map(new Column(_)).get)
}

private def writeSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
val namedAggCols = getNamedAggColumn(indexCol)

// Generate N random numbers of cardinality and build single skipping index
Expand All @@ -161,7 +216,7 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
.format(FLINT_DATASOURCE)
.options(openSearchOptions)
.mode(Overwrite)
.save(indexName)
.save(getTestIndexName(indexCol, cardinality))
}

private def getTestIndexName(indexCol: FlintSparkSkippingStrategy, cardinality: Int): String = {
Expand Down

0 comments on commit 679569a

Please sign in to comment.