Skip to content

Commit

Permalink
Fix FlintJob IT due to multi threading
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 31, 2024
1 parent a176115 commit bafba6b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class FlintSparkIndexMonitor(
.flatMap(name => spark.streams.active.find(_.name == name))
.orElse(spark.streams.active.headOption)

if (job.isDefined) {
if (job.isDefined) { // must be present after DataFrameWriter.start() called in refreshIndex API
val name = job.get.name // use streaming job name because indexName maybe None
logInfo(s"Awaiting streaming job $name until terminated")
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,22 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
val query =
s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| year PARTITION,
| name VALUE_SET,
| age MIN_MAX
| )
| ( year PARTITION )
| WITH (auto_refresh = true)
| """.stripMargin
val jobRunId = "00ff4o3b5091080q"
threadLocalFuture.set(startJob(query, jobRunId))

// Waiting from streaming job start and complete current batch
// Waiting from streaming job start and complete current batch in Future thread in startJob
// Otherwise, active job will be None here
Thread.sleep(5000L)
pollForResultAndAssert(_ => true, jobRunId)
val activeJob = spark.streams.active.find(_.name == testIndex)
activeJob shouldBe defined
awaitStreamingComplete(activeJob.get.id.toString)

// Wait in case JobOperator has not reached condition check before awaitTermination
Thread.sleep(5000L)
try {
// Set Flint index readonly to simulate streaming job exception
setFlintIndexReadOnly(true)
Expand Down Expand Up @@ -328,6 +327,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}

private def setFlintIndexReadOnly(readonly: Boolean): Unit = {
logInfo(s"Updating index $testIndex setting with readonly [$readonly]")
openSearchClient
.indices()
.putSettings(
Expand Down

0 comments on commit bafba6b

Please sign in to comment.