Skip to content

Commit

Permalink
Refactor test case
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Mar 20, 2024
1 parent 207ba49 commit 7867bee
Showing 1 changed file with 38 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.apache.spark.sql.benchmark

import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory.{ADAPTIVE_NUMBER_CANDIDATE_KEY, BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind._
import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
Expand Down Expand Up @@ -60,68 +61,49 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
private def runWriteBenchmark(cardinality: Int): Unit = {
benchmark(s"Skipping Index Write $N Rows with Cardinality $cardinality")
.addCase("Partition Write") { _ =>
buildSkippingIndex(
PartitionSkippingStrategy(columnName = testColName, columnType = testColType),
1 // partition column value should be the same in a single file
)
// Partitioned column cardinality must be 1 (all values are the same in a single file0
buildSkippingIndex(strategy(PARTITION), 1)
}
.addCase("MinMax Write") { _ =>
buildSkippingIndex(
MinMaxSkippingStrategy(columnName = testColName, columnType = testColType),
cardinality)
buildSkippingIndex(strategy(MIN_MAX), cardinality)
}
.addCase("ValueSet Write") { _ =>
buildSkippingIndex(
ValueSetSkippingStrategy(columnName = testColName, columnType = testColType),
cardinality)
.addCase("ValueSet Write (Default Size 100") { _ =>
buildSkippingIndex(strategy(VALUE_SET), cardinality)
}
.addCase("ValueSet Write (Unlimited Size)") { _ =>
buildSkippingIndex(
ValueSetSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)),
strategy(VALUE_SET, Map(VALUE_SET_MAX_SIZE_KEY -> Integer.MAX_VALUE.toString)),
cardinality)
}
.addCase("BloomFilter Write") { _ =>
.addCase("BloomFilter Write (1M NDV)") { _ =>
buildSkippingIndex(
BloomFilterSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = Map(
strategy(
BLOOM_FILTER,
Map(
BLOOM_FILTER_ADAPTIVE_KEY -> "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> N.toString)),
cardinality)
}
.addCase("BloomFilter Write (Optimal NDV)") { _ =>
buildSkippingIndex(
BloomFilterSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = Map(
strategy(
BLOOM_FILTER,
Map(
BLOOM_FILTER_ADAPTIVE_KEY -> "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> cardinality.toString)),
cardinality)
}
.addCase("Adaptive BloomFilter Write") { _ =>
buildSkippingIndex(
BloomFilterSkippingStrategy(columnName = testColName, columnType = testColType),
cardinality)
.addCase("Adaptive BloomFilter Write (Default, 10 Candidates)") { _ =>
buildSkippingIndex(strategy(BLOOM_FILTER), cardinality)
}
.addCase("Adaptive BloomFilter Write (5 Candidates)") { _ =>
buildSkippingIndex(
BloomFilterSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")),
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "5")),
cardinality)
}
.addCase("Adaptive BloomFilter Write (15 Candidates)") { _ =>
buildSkippingIndex(
BloomFilterSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")),
strategy(BLOOM_FILTER, params = Map(ADAPTIVE_NUMBER_CANDIDATE_KEY -> "15")),
cardinality)
}
.run()
Expand All @@ -145,6 +127,26 @@ object FlintSkippingIndexBenchmark extends FlintSparkBenchmark {
}
}

private def strategy(
kind: SkippingKind,
params: Map[String, String] = Map.empty): FlintSparkSkippingStrategy = {
kind match {
case PARTITION =>
PartitionSkippingStrategy(columnName = testColName, columnType = testColType)
case MIN_MAX => MinMaxSkippingStrategy(columnName = testColName, columnType = testColType)
case VALUE_SET =>
ValueSetSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = params)
case BLOOM_FILTER =>
BloomFilterSkippingStrategy(
columnName = testColName,
columnType = testColType,
params = params)
}
}

private def buildSkippingIndex(indexCol: FlintSparkSkippingStrategy, cardinality: Int): Unit = {
val indexName = getTestIndexName(indexCol, cardinality)
val namedAggCols = getNamedAggColumn(indexCol)
Expand Down

0 comments on commit 7867bee

Please sign in to comment.