Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 7, 2024
1 parent 2ac641a commit 14df017
Showing 1 changed file with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.spark.FlintSparkSuite
import org.opensearch.flint.spark.{FlintSparkIndexMonitor, FlintSparkSuite}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.{contain, defined}
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.util.MockEnvironment
import org.apache.spark.util.ThreadUtils

Expand All @@ -46,6 +48,11 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {

protected override def beforeEach(): Unit = {
super.beforeEach()

// Clear up because awaitMonitor will assume single name in tracker
FlintSparkIndexMonitor.indexMonitorTracker.values.foreach(_.cancel(true))
FlintSparkIndexMonitor.indexMonitorTracker.clear()

createPartitionedMultiRowAddressTable(testTable)
}

Expand Down Expand Up @@ -195,6 +202,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}
}

test("create skipping index with auto refresh and streaming job early exit") {
// Custom listener to force streaming job to fail at the beginning
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
logInfo("Stopping streaming job intentionally")
spark.streams.active.find(_.name == event.name).get.stop()
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

try {
spark.streams.addListener(listener)
val query =
s"""
| CREATE SKIPPING INDEX ON $testTable
| (name VALUE_SET)
| WITH (auto_refresh = true)
| """.stripMargin
val jobRunId = "00ff4o3b5091080q"
threadLocalFuture.set(startJob(query, jobRunId))

// Assert streaming job must exit
Thread.sleep(5000)
pollForResultAndAssert(_ => true, jobRunId)
spark.streams.active.exists(_.name == testIndex) shouldBe false

// Assert Flint index transitioned to FAILED state after waiting seconds
Thread.sleep(2000L)
val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes)
latestLogEntry(latestId) should contain("state" -> "failed")
} finally {
spark.streams.removeListener(listener)
}
}

test("create skipping index with non-existent table") {
val query =
s"""
Expand Down

0 comments on commit 14df017

Please sign in to comment.