Skip to content

Commit

Permalink
Backup code
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 27, 2023
1 parent 13536ac commit 45e893d
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object FlintSparkIndexFactory {
ValueSetSkippingStrategy(
columnName = columnName,
columnType = columnType,
limit = colInfo.get("limit").asInstanceOf[Int])
properties = getSkippingProperties(colInfo))
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
Expand Down Expand Up @@ -93,4 +93,13 @@ object FlintSparkIndexFactory {
Some(value.asInstanceOf[String])
}
}

private def getSkippingProperties(
colInfo: java.util.Map[String, AnyRef]): Map[String, String] = {
colInfo
.get("properties")
.asInstanceOf[java.util.Map[String, String]]
.asScala
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ case class FlintSparkSkippingIndex(
Map[String, AnyRef](
"kind" -> col.kind.toString,
"columnName" -> col.columnName,
"columnType" -> col.columnType).asJava)
"columnType" -> col.columnType,
"properties" -> col.properties.asJava).asJava)
.toArray

val fieldTypes =
Expand Down Expand Up @@ -155,31 +156,20 @@ object FlintSparkSkippingIndex {
*
* @param colName
* indexed column name
* @param properties
* value set skipping properties
* @return
* index builder
*/
def addValueSet(colName: String): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
this
}

/**
* Add value set skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def addValueSet(colName: String, limit: Int): Builder = {
def addValueSet(colName: String, properties: Map[String, String] = Map.empty): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(
ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType, limit = limit))
ValueSetSkippingStrategy(
columnName = col.name,
columnType = col.dataType,
properties = properties))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ trait FlintSparkSkippingStrategy {
*/
val columnType: String

/**
* Skipping algorithm properties.
*/
val properties: Map[String, String] = Map.empty

/**
* @return
* output schema mapping from Flint field name to Flint field type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping.valueset
import org.opensearch.flint.spark.function.CollectSetLimit.collect_set_limit
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.functions._
Expand All @@ -19,19 +20,21 @@ case class ValueSetSkippingStrategy(
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String,
limit: Int = 100)
override val properties: Map[String, String] = Map.empty)
extends FlintSparkSkippingStrategy {

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[Expression] = {
val limit = getValueSetSizeLimit()
val aggregator =
if (limit == 0) {
collect_set(columnName)
} else {
val collectSetLimit = collect_set_limit(columnName, limit + 1)
when(size(collectSetLimit) === limit + 1, lit(null))
// val limitPlusOne = limit + 1
val collectSetLimit = collect_set(columnName) // collect_set_limit(columnName, limitPlusOne)
when(size(collectSetLimit) > limit, lit(null))
.otherwise(collectSetLimit)
}
Seq(aggregator.expr)
Expand All @@ -47,4 +50,13 @@ case class ValueSetSkippingStrategy(
Some((col(columnName) === value).expr)
case _ => None
}

private def getValueSetSizeLimit(): Int =
properties.get("limit").map(_.toInt).getOrElse(DEFAULT_VALUE_SET_SIZE_LIMIT)
}

object ValueSetSkippingStrategy {

/** Default limit for value set size collected */
val DEFAULT_VALUE_SET_SIZE_LIMIT = 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
if (colTypeCtx.valueSetType().limit == null) {
indexBuilder.addValueSet(colName)
} else {
indexBuilder.addValueSet(colName, colTypeCtx.valueSetType().limit.getText.toInt)
indexBuilder
.addValueSet(colName, Map("limit" -> colTypeCtx.valueSetType().limit.getText))
}
} else {
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
Expand Down
Loading

0 comments on commit 45e893d

Please sign in to comment.