Skip to content

Commit

Permalink
Remove thread pool in IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 22, 2024
1 parent dc7fdb8 commit 8bfcf5c
Showing 1 changed file with 2 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

package org.opensearch.flint.spark

import java.util.concurrent.Executors

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
Expand All @@ -21,7 +18,6 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col

class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {

Expand All @@ -38,13 +34,6 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
private val testMvIndex = s"spark_catalog.default.$testMvIndexShortName"
private val testMvFlintIndex = FlintSparkMaterializedView.getFlintIndexName(testMvIndex)

/**
* Creates a custom `ExecutionContext` with a fixed thread pool for handling asynchronous
* operations in tests. TODO: move to base class if more tests require this.
*/
implicit val executionContext: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

override def beforeAll(): Unit = {
super.beforeAll()
createTimeSeriesTable(testTableQualifiedName)
Expand Down Expand Up @@ -135,7 +124,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
outputError shouldBe empty

// Trigger next micro batch after 5 seconds with index readonly
Future {
new Thread(() => {
Thread.sleep(5000)
openSearchClient
.indices()
Expand All @@ -145,7 +134,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
RequestOptions.DEFAULT)
sql(
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
}
}).start()

// Await to store exception and verify if it's as expected
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))
Expand Down

0 comments on commit 8bfcf5c

Please sign in to comment.