diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index feb821a2a..a5744271f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -5,10 +5,7 @@ package org.opensearch.flint.spark -import java.util.concurrent.Executors - import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions @@ -21,7 +18,6 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.Row -import org.apache.spark.sql.functions.col class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { @@ -38,13 +34,6 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { private val testMvIndex = s"spark_catalog.default.$testMvIndexShortName" private val testMvFlintIndex = FlintSparkMaterializedView.getFlintIndexName(testMvIndex) - /** - * Creates a custom `ExecutionContext` with a fixed thread pool for handling asynchronous - * operations in tests. TODO: move to base class if more tests require this. - */ - implicit val executionContext: ExecutionContext = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - override def beforeAll(): Unit = { super.beforeAll() createTimeSeriesTable(testTableQualifiedName) @@ -135,7 +124,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { outputError shouldBe empty // Trigger next micro batch after 5 seconds with index readonly - Future { + new Thread(() => { Thread.sleep(5000) openSearchClient .indices() @@ -145,7 +134,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { RequestOptions.DEFAULT) sql( s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')") - } + }).start() // Await to store exception and verify if it's as expected flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))