Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 7, 2024
1 parent 2ac641a commit a1e809f
Showing 1 changed file with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.util.MockEnvironment
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -195,6 +197,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}
}

test("create skipping index with auto refresh and streaming job early exit") {
// Custom listener to force streaming job to fail at the beginning
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
logInfo("Stopping streaming job intentionally")
spark.streams.active.find(_.name == event.name).get.stop()
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

try {
spark.streams.addListener(listener)
val query =
s"""
| CREATE SKIPPING INDEX ON $testTable
| (name VALUE_SET)
| WITH (auto_refresh = true)
| """.stripMargin
val jobRunId = "00ff4o3b5091080q"
threadLocalFuture.set(startJob(query, jobRunId))

// Assert streaming job must exit
Thread.sleep(5000)
pollForResultAndAssert(_ => true, jobRunId)
spark.streams.active.exists(_.name == testIndex) shouldBe false

// Assert Flint index transitioned to FAILED state after waiting seconds
Thread.sleep(2000L)
val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes)
latestLogEntry(latestId) should contain("state" -> "failed")
} finally {
spark.streams.removeListener(listener)
}
}

test("create skipping index with non-existent table") {
val query =
s"""
Expand Down

0 comments on commit a1e809f

Please sign in to comment.