Skip to content

Commit

Permalink
Add IT to verify skipping index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 17, 2024
1 parent e5c6e10 commit a6f1d8e
Showing 1 changed file with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.{ExplainSuiteHelper, Row}
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED

class FlintSparkIcebergMetadataITSuite extends FlintSparkIcebergSuite with Matchers {
class FlintSparkIcebergMetadataITSuite
extends FlintSparkIcebergSuite
with ExplainSuiteHelper
with Matchers {

private val testIcebergTable = "spark_catalog.default.covering_sql_iceberg_test"
private val testFlintIndex = getFlintIndexName("all", testIcebergTable)
Expand Down Expand Up @@ -84,6 +89,40 @@ class FlintSparkIcebergMetadataITSuite extends FlintSparkIcebergSuite with Match
flint.queryIndex(testFlintIndex).count() shouldBe 3
}

// Iceberg relation doesn't support FlinkSparkSkippingFileIndex
ignore("data compaction impact on skipping index") {
deleteTestIndex(testFlintIndex)

sql(s"""
| CREATE SKIPPING INDEX ON $testIcebergTable (
| status_code VALUE_SET
| )
| WITH (
| auto_refresh = true
| )
|""".stripMargin)

val testFlintSkippingIndex = getSkippingIndexName(testIcebergTable)
val job = spark.streams.active.find(_.name == testFlintSkippingIndex)
awaitStreamingComplete(job.get.id.toString)

// Skipping index works before compaction
val query = sql(s"SELECT action FROM $testIcebergTable WHERE status_code = '400'")
checkKeywordsExistsInExplain(query, "FlintSparkSkippingFileIndex")
checkAnswer(query, Row("Reject"))

sql(s"""
| CALL spark_catalog.system.rewrite_data_files (
| table => 'covering_sql_iceberg_test',
| options => map('rewrite-all', true)
| )
| """.stripMargin)

// Skipping index is invalid afterwards
awaitStreamingComplete(job.get.id.toString)
checkAnswer(query, Row("Reject"))
}

test("schema evolution") {
flint.queryIndex(testFlintIndex).count() shouldBe 3

Expand Down

0 comments on commit a6f1d8e

Please sign in to comment.