Skip to content

Commit

Permalink
Fix broken IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 23, 2024
1 parent da88787 commit b148c6f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ object ExpressionUtils {
// Disable Flint rule to avoid stackoverflow during analysis and optimization
withFlintOptimizerDisabled {
val analyzed = sessionState.analyzer.execute(filter)
val optimized = sessionState.optimizer.execute(analyzed)

// Unwrap to get resolved expr
optimized
analyzed
.asInstanceOf[Filter]
.condition
.asInstanceOf[PredicateWrapper]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.resolveExprString
import org.scalatest.matchers.{Matcher, MatchResult}
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.functions.{col, isnull}

Expand Down Expand Up @@ -663,49 +662,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
deleteTestIndex(testIndex)
}

test("build skipping index for nested field and rewrite applicable query") {
val testTable = "spark_catalog.default.nested_field_table"
val testIndex = getSkippingIndexName(testTable)
sql(s"""
| CREATE TABLE $testTable
| (
| int_col INT,
| struct_col STRUCT<field1: STRUCT<subfield:STRING>, field2: INT>
| )
| USING JSON
|""".stripMargin)
sql(s"""
| INSERT INTO $testTable
| VALUES (
| 30,
| STRUCT(STRUCT("subfieldValue1"),123)
| )
|""".stripMargin)

flint
.skippingIndex()
.onTable(testTable)
.addValueSet("struct_col.field1.subfield")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(s"""
| SELECT struct_col
| FROM $testTable
| WHERE struct_col.field1.subfield = "subfieldValue1"
|""".stripMargin)

val relation = query.queryExecution.analyzed.find(_.isInstanceOf[LogicalRelation]).get
val indexCol = new Column(resolveExprString("struct_col.field1.subfield", relation))

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(indexCol) || indexCol === "subfieldValue1"))
checkAnswer(query, Seq(Row(30)))

deleteTestIndex(testIndex)
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
def useFlintSparkSkippingFileIndex(
subMatcher: Matcher[FlintSparkSkippingFileIndex]): Matcher[SparkPlan] = {
Expand Down

0 comments on commit b148c6f

Please sign in to comment.