From a1e809f69dcf1806447076626ca612acb41da191 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 7 Jun 2024 15:00:43 -0700 Subject: [PATCH] Add more IT Signed-off-by: Chen Dai --- .../apache/spark/sql/FlintJobITSuite.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala index 990b9e449..f1192717a 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -24,6 +24,8 @@ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf._ +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.util.MockEnvironment import org.apache.spark.util.ThreadUtils @@ -195,6 +197,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } } + test("create skipping index with auto refresh and streaming job early exit") { + // Custom listener to force streaming job to fail at the beginning + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = { + logInfo("Stopping streaming job intentionally") + spark.streams.active.find(_.name == event.name).get.stop() + } + override def onQueryProgress(event: QueryProgressEvent): Unit = {} + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + + try { + spark.streams.addListener(listener) + val query = + s""" + | CREATE SKIPPING INDEX ON $testTable + | (name VALUE_SET) + | WITH (auto_refresh = true) + | """.stripMargin + val jobRunId = "00ff4o3b5091080q" + threadLocalFuture.set(startJob(query, jobRunId)) + + // Assert streaming job must exit + Thread.sleep(5000) + pollForResultAndAssert(_ => true, jobRunId) + spark.streams.active.exists(_.name == testIndex) shouldBe false + + // Assert Flint index transitioned to FAILED state after waiting seconds + Thread.sleep(2000L) + val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes) + latestLogEntry(latestId) should contain("state" -> "failed") + } finally { + spark.streams.removeListener(listener) + } + } + test("create skipping index with non-existent table") { val query = s"""