Skip to content

Commit

Permalink
Stop streaming job and monitor task upon data deleted
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 29, 2024
1 parent c6ab291 commit 1c13448
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w

/** Flint Spark index monitor */
val flintIndexMonitor: FlintSparkIndexMonitor =
new FlintSparkIndexMonitor(spark, flintMetadataLogService)
new FlintSparkIndexMonitor(spark, flintClient, flintMetadataLogService)

/**
* Create index builder for creating index with fluent API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

Expand All @@ -36,6 +37,7 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
*/
class FlintSparkIndexMonitor(
spark: SparkSession,
flintClient: FlintClient,
flintMetadataLogService: FlintMetadataLogService)
extends Logging {

Expand Down Expand Up @@ -157,7 +159,13 @@ class FlintSparkIndexMonitor(
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)

if (flintClient.exists(indexName)) {
flintMetadataLogService.recordHeartbeat(indexName)
} else {
logInfo("Streaming job is active but data is deleted")
stopStreamingJobAndMonitor(indexName)
}
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
Expand All @@ -172,10 +180,7 @@ class FlintSparkIndexMonitor(

// Stop streaming job and its monitor if max retry limit reached
if (errorCnt >= MAX_ERROR_COUNT) {
logInfo(s"Terminating streaming job and index monitor for $indexName")
stopStreamingJob(indexName)
stopMonitor(indexName)
logInfo(s"Streaming job and index monitor terminated")
stopStreamingJobAndMonitor(indexName)
}
}
}
Expand All @@ -184,6 +189,13 @@ class FlintSparkIndexMonitor(
private def isStreamingJobActive(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

private def stopStreamingJobAndMonitor(indexName: String): Unit = {
logInfo(s"Terminating streaming job and index monitor for $indexName")
stopStreamingJob(indexName)
stopMonitor(indexName)
logInfo(s"Streaming job and index monitor terminated")
}

private def stopStreamingJob(indexName: String): Unit = {
val job = spark.streams.active.find(_.name == indexName)
if (job.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import java.util.Base64
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.concurrent.duration.{DurationInt, FiniteDuration}

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy}
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.OpenSearchTransactionSuite
Expand Down Expand Up @@ -148,12 +150,25 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false
}

test("monitor task and streaming job should terminate if data index is deleted") {
val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex)
async {
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT)
}

// Wait for index monitor execution and assert
Thread.sleep(5000)
task.isCancelled shouldBe true
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false
}

test("await monitor terminated without exception should stay refreshing state") {
// Setup a timer to terminate the streaming job
new Thread(() => {
Thread.sleep(3000L)
asyncAfter(3.seconds) {
spark.streams.active.find(_.name == testFlintIndex).get.stop()
}).start()
}

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()
Expand All @@ -164,9 +179,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
}

test("await monitor terminated with exception should update index state to failed with error") {
new Thread(() => {
Thread.sleep(3000L)

asyncAfter(3.seconds) {
// Set Flint index readonly to simulate streaming job exception
val settings = Map("index.blocks.write" -> true)
val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava)
Expand All @@ -178,7 +191,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
| PARTITION (year=2023, month=6)
| VALUES ('Test', 35, 'Vancouver')
| """.stripMargin)
}).start()
}

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()
Expand All @@ -204,6 +217,19 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
latestLog should not contain "error"
}

private def async(block: => Unit): Unit = {
new Thread(() => {
block
}).start()
}

private def asyncAfter(delay: FiniteDuration)(block: => Unit): Unit = {
new Thread(() => {
Thread.sleep(delay.toMillis)
block
}).start()
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand Down

0 comments on commit 1c13448

Please sign in to comment.