Skip to content

Commit

Permalink
Terminate streaming job when index data is deleted (#500)
Browse files Browse the repository at this point in the history
* Stop streaming job and monitor task upon data deleted

Signed-off-by: Chen Dai <[email protected]>

* Add more assertion in IT

Signed-off-by: Chen Dai <[email protected]>

* Address PR comments

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Aug 8, 2024
1 parent c2e4020 commit bcd1942
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w

/** Flint Spark index monitor */
val flintIndexMonitor: FlintSparkIndexMonitor =
new FlintSparkIndexMonitor(spark, flintMetadataLogService)
new FlintSparkIndexMonitor(spark, flintClient, flintMetadataLogService)

/**
* Create index builder for creating index with fluent API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

Expand All @@ -31,11 +32,14 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
*
* @param spark
* Spark session
* @param flintClient
* Flint client
* @param flintMetadataLogService
* Flint metadata log service
*/
class FlintSparkIndexMonitor(
spark: SparkSession,
flintClient: FlintClient,
flintMetadataLogService: FlintMetadataLogService)
extends Logging {

Expand Down Expand Up @@ -158,6 +162,11 @@ class FlintSparkIndexMonitor(
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)

if (!flintClient.exists(indexName)) {
logWarning("Streaming job is active but data is deleted")
stopStreamingJobAndMonitor(indexName)
}
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
Expand All @@ -172,10 +181,7 @@ class FlintSparkIndexMonitor(

// Stop streaming job and its monitor if max retry limit reached
if (errorCnt >= MAX_ERROR_COUNT) {
logInfo(s"Terminating streaming job and index monitor for $indexName")
stopStreamingJob(indexName)
stopMonitor(indexName)
logInfo(s"Streaming job and index monitor terminated")
stopStreamingJobAndMonitor(indexName)
}
}
}
Expand All @@ -184,6 +190,13 @@ class FlintSparkIndexMonitor(
private def isStreamingJobActive(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

private def stopStreamingJobAndMonitor(indexName: String): Unit = {
logInfo(s"Terminating streaming job and index monitor for $indexName")
stopStreamingJob(indexName)
stopMonitor(indexName)
logInfo(s"Streaming job and index monitor terminated")
}

private def stopStreamingJob(indexName: String): Unit = {
val job = spark.streams.active.find(_.name == indexName)
if (job.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import java.util.Base64
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.concurrent.duration.{DurationInt, FiniteDuration}

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy}
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.OpenSearchTransactionSuite
Expand Down Expand Up @@ -148,25 +150,38 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false
}

test("monitor task and streaming job should terminate if data index is deleted") {
val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex)
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT)

// Wait for index monitor execution and assert
waitForMonitorTaskRun()
task.isCancelled shouldBe true
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false

// Assert index state is still refreshing
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "refreshing")
}

test("await monitor terminated without exception should stay refreshing state") {
// Setup a timer to terminate the streaming job
new Thread(() => {
Thread.sleep(3000L)
asyncAfter(3.seconds) {
spark.streams.active.find(_.name == testFlintIndex).get.stop()
}).start()
}

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()

// Assert index state is active now
// Assert index state is still refreshing
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "refreshing")
}

test("await monitor terminated with exception should update index state to failed with error") {
new Thread(() => {
Thread.sleep(3000L)

asyncAfter(3.seconds) {
// Set Flint index readonly to simulate streaming job exception
val settings = Map("index.blocks.write" -> true)
val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava)
Expand All @@ -178,7 +193,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
| PARTITION (year=2023, month=6)
| VALUES ('Test', 35, 'Vancouver')
| """.stripMargin)
}).start()
}

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()
Expand All @@ -204,6 +219,19 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
latestLog should not contain "error"
}

private def async(block: => Unit): Unit = {
new Thread(() => {
block
}).start()
}

private def asyncAfter(delay: FiniteDuration)(block: => Unit): Unit = {
new Thread(() => {
Thread.sleep(delay.toMillis)
block
}).start()
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
} catch {
case _: IllegalStateException =>
// Forcefully delete index data and log entry in case of any errors, such as version conflict
case _: Exception =>
if (openSearchClient
.indices()
.exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) {
Expand Down

0 comments on commit bcd1942

Please sign in to comment.