diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 9a89e9637..434feb980 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -103,7 +103,7 @@ class FlintSparkIndexMonitor( .flatMap(name => spark.streams.active.find(_.name == name)) .orElse(spark.streams.active.headOption) - if (job.isDefined) { + if (job.isDefined) { // must be present after DataFrameWriter.start() called in refreshIndex API val name = job.get.name // use streaming job name because indexName maybe None logInfo(s"Awaiting streaming job $name until terminated") try { diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala index 4ff43d347..bcb1a16c5 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -152,23 +152,22 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { val query = s""" | CREATE SKIPPING INDEX ON $testTable - | ( - | year PARTITION, - | name VALUE_SET, - | age MIN_MAX - | ) + | ( year PARTITION ) | WITH (auto_refresh = true) | """.stripMargin val jobRunId = "00ff4o3b5091080q" threadLocalFuture.set(startJob(query, jobRunId)) - // Waiting from streaming job start and complete current batch + // Waiting from streaming job start and complete current batch in Future thread in startJob + // Otherwise, active job will be None here Thread.sleep(5000L) pollForResultAndAssert(_ => true, jobRunId) val activeJob = spark.streams.active.find(_.name == testIndex) activeJob shouldBe defined awaitStreamingComplete(activeJob.get.id.toString) + // Wait in case JobOperator has not reached condition check before awaitTermination + Thread.sleep(5000L) try { // Set Flint index readonly to simulate streaming job exception setFlintIndexReadOnly(true) @@ -328,6 +327,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } private def setFlintIndexReadOnly(readonly: Boolean): Unit = { + logInfo(s"Updating index $testIndex setting with readonly [$readonly]") openSearchClient .indices() .putSettings(