Skip to content

Commit

Permalink
Fix hybrid scan bug
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 10, 2023
1 parent 7733b61 commit e61f3dd
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.functions.isnull
import org.apache.spark.sql.functions.not
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -76,8 +76,7 @@ case class FlintSparkSkippingFileIndex(
partitions
.flatMap(_.files.map(f => f.getPath.toUri.toString))
.toDF(FILE_PATH_COLUMN)
.join(indexScan, Seq(FILE_PATH_COLUMN), "left")
.filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter))
.join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti")
.select(FILE_PATH_COLUMN)
.collect()
.map(_.getString(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,20 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should not rewrite original query if no where clause") {
val query =
s"""
| SELECT name
| FROM $testTable
|""".stripMargin

val actual = sql(query).queryExecution.optimizedPlan
withFlintOptimizerDisabled {
val expect = sql(query).queryExecution.optimizedPlan
actual shouldBe expect
}
}

test("can build partition skipping index and rewrite applicable query") {
flint
.skippingIndex()
Expand Down Expand Up @@ -394,7 +408,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should rewrite applicable query to scan latest source files if partial index") {
test("should rewrite applicable query to scan unknown source files if partial index") {
flint
.skippingIndex()
.onTable(testTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.must.Matchers.{defined, include}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.SimpleMode
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY

Expand Down Expand Up @@ -55,21 +56,82 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with filtering condition") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| year PARTITION,
| name VALUE_SET,
| age MIN_MAX
| )
| WHERE address = 'Portland'
| WITH (auto_refresh = true)
| """.stripMargin)
test("create skipping index with auto refresh and filtering condition") {
val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test"
val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable)

withTable(testTimeSeriesTable) {
createTimeSeriesTable(testTimeSeriesTable)
sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable
| ( address VALUE_SET )
| WHERE time >= '2023-10-01 01:00:00'
| WITH (auto_refresh = true)""".stripMargin)
flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined

// Only 2 rows indexed
val indexData = awaitStreamingDataComplete(testFlintTimeSeriesTable)
indexData.count() shouldBe 2

// Query without filter condition
sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5

// File indexed should be included
var query = sql(s""" SELECT name FROM $testTimeSeriesTable
| WHERE time > '2023-10-01 00:05:00'
| AND address = 'Portland' """.stripMargin)
query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex")
checkAnswer(query, Seq(Row("C"), Row("D")))

// File not indexed should be included too
query = sql(s""" SELECT name FROM $testTimeSeriesTable
| WHERE time > '2023-10-01 00:05:00'
| AND address = 'Seattle' """.stripMargin)
query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex")
checkAnswer(query, Seq(Row("B")))

flint.deleteIndex(testFlintTimeSeriesTable)
}
}

val indexData = awaitStreamingDataComplete(testIndex)
flint.describeIndex(testIndex) shouldBe defined
indexData.count() shouldBe 1
test("create skipping index with manual refresh and filtering condition") {
val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test"
val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable)

withTable(testTimeSeriesTable) {
createTimeSeriesTable(testTimeSeriesTable)
sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable
| ( address VALUE_SET )
| WHERE time >= '2023-10-01 01:00:00' AND age = 15
| """.stripMargin)
sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable")

// Only 2 rows indexed
flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined
val indexData = flint.queryIndex(testFlintTimeSeriesTable)
indexData.count() shouldBe 1

// File not indexed should be included too
sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5
var query = sql(s""" SELECT name FROM $testTimeSeriesTable
| WHERE time > '2023-10-01 00:05:00'
| AND address = 'Portland' """.stripMargin)
query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex")
checkAnswer(query, Seq(Row("C"), Row("D")))

// Generate new data
sql(s""" INSERT INTO $testTimeSeriesTable VALUES
| (TIMESTAMP '2023-10-01 04:00:00', 'F', 30, 'Vancouver')""".stripMargin)

// Latest file should be included too without refresh
sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 6
query = sql(s""" SELECT name FROM $testTimeSeriesTable
| WHERE time > '2023-10-01 00:05:00'
| AND address = 'Vancouver' """.stripMargin)
query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex")
checkAnswer(query, Seq(Row("E"), Row("F")))

flint.deleteIndex(testFlintTimeSeriesTable)
}
}

test("create skipping index with streaming job options") {
Expand Down

0 comments on commit e61f3dd

Please sign in to comment.