Skip to content

Commit

Permalink
Add conjunction check for skipping and covering index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 6, 2023
1 parent bc50661 commit cf4034a
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions.{Expression, Or}
import org.apache.spark.sql.functions.expr

/**
* Flint Spark index utility methods.
*/
object FlintSparkIndexUtils {

/**
* Is the given Spark predicate string a conjunction
*
* @param condition
* predicate condition string
* @return
* true if yes, otherwise false
*/
def isConjunction(condition: String): Boolean = {
isConjunction(expr(condition).expr)
}

/**
* Is the given Spark predicate a conjunction
*
* @param condition
* predicate condition
* @return
* true if yes, otherwise false
*/
def isConjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
}.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql._
Expand All @@ -34,6 +35,7 @@ case class FlintSparkCoveringIndex(
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
require(filterCondition.forall(isConjunction), "filtering condition must be conjunction")

override val kind: String = COVERING_INDEX_TYPE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
package org.opensearch.flint.spark.skipping

import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
Expand All @@ -33,7 +34,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
_,
Some(table),
false))
if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
val index = flint.describeIndex(getIndexName(table))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
Expand Down Expand Up @@ -67,12 +68,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
getSkippingIndexName(qualifiedTableName)
}

private def hasNoDisjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
}.isEmpty
}

private def rewriteToIndexFilter(
index: FlintSparkSkippingIndex,
condition: Expression): Option[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex._
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression
import org.apache.spark.sql.functions.{col, input_file_name, sha1}
import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1}

/**
* Flint skipping index in Spark.
Expand All @@ -36,6 +37,7 @@ case class FlintSparkSkippingIndex(
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
require(filterCondition.forall(isConjunction), "filtering condition must be conjunction")

/** Skipping index type */
override val kind: String = SKIPPING_INDEX_TYPE
Expand Down Expand Up @@ -85,7 +87,11 @@ case class FlintSparkSkippingIndex(

// Add optional filtering condition
if (filterCondition.isDefined) {
job = job.where(filterCondition.get)
if (isConjunction(expr(filterCondition.get).expr)) { // TODO: do the same for covering and add UT/IT
job = job.where(filterCondition.get)
} else {
throw new IllegalStateException("Filtering condition is not conjunction")
}
}

job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
}
}

test("should succeed if filtering condition is conjunction") {
new FlintSparkCoveringIndex(
"ci",
"test",
Map("name" -> "string"),
Some("test_field1 = 1 AND test_field2 = 2"))
}

test("should fail if filtering condition is not conjunction") {
assertThrows[IllegalArgumentException] {
new FlintSparkCoveringIndex(
"ci",
"test",
Map("name" -> "string"),
Some("test_field1 = 1 OR test_field2 = 2"))
}
}

test("should fail if no indexed column given") {
assertThrows[IllegalArgumentException] {
new FlintSparkCoveringIndex("ci", "default.test", Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
"columnType" -> "integer").asJava)
}

test("should succeed if filtering condition is conjunction") {
new FlintSparkSkippingIndex(
testTable,
Seq(mock[FlintSparkSkippingStrategy]),
Some("test_field1 = 1 AND test_field2 = 2"))
}

test("should fail if filtering condition is not conjunction") {
assertThrows[IllegalArgumentException] {
new FlintSparkSkippingIndex(
testTable,
Seq(mock[FlintSparkSkippingStrategy]),
Some("test_field1 = 1 OR test_field2 = 2"))
}
}

test("can build index building job with unique ID column") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("name" -> "string"))
Expand Down

0 comments on commit cf4034a

Please sign in to comment.