From b4fd306dc4ab7ef23536249fd0e5c4db90433470 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 15 Nov 2023 14:01:27 -0800 Subject: [PATCH 1/4] Don't throw exception and change to daemon thread pool Signed-off-by: Chen Dai --- .../org/apache/spark/sql/flint/package.scala | 20 ++++- .../flint/spark/FlintSparkIndexMonitor.scala | 9 +- .../spark/FlintSparkIndexMonitorITSuite.scala | 83 +++++++++++++++++++ 3 files changed, 107 insertions(+), 5 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index eba99b809..cf2cd2b6e 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -5,15 +5,33 @@ package org.apache.spark.sql +import java.util.concurrent.ScheduledExecutorService + import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.util.ShutdownHookManager +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils} /** * Flint utility methods that rely on access to private code in Spark SQL package. */ package object flint { + /** + * Create daemon thread pool with the given thread group name and size. + * + * @param threadNamePrefix + * thread group name + * @param numThreads + * thread pool size + * @return + * thread pool executor + */ + def newDaemonThreadPoolScheduledExecutor( + threadNamePrefix: String, + numThreads: Int): ScheduledExecutorService = { + ThreadUtils.newDaemonThreadPoolScheduledExecutor(threadNamePrefix, numThreads) + } + /** * Add shutdown hook to SparkContext with default priority. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 28e46cb29..5c4c7376c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import scala.collection.concurrent.{Map, TrieMap} import scala.sys.addShutdownHook @@ -15,6 +15,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor /** * Flint Spark index state monitor. @@ -62,9 +63,8 @@ class FlintSparkIndexMonitor( logInfo("Index monitor task is cancelled") } } catch { - case e: Exception => + case e: Throwable => logError("Failed to update index log entry", e) - throw new IllegalStateException("Failed to update index log entry") } }, 15, // Delay to ensure final logging is complete first, otherwise version conflicts @@ -100,7 +100,8 @@ object FlintSparkIndexMonitor extends Logging { * Thread-safe ExecutorService globally shared by all FlintSpark instance and will be shutdown * in Spark application upon exit. Non-final variable for test convenience. */ - var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + var executor: ScheduledExecutorService = + newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1) /** * Tracker that stores task future handle which is required to cancel the task in future. diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala new file mode 100644 index 000000000..4296f93c4 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.Base64 +import java.util.concurrent.TimeUnit + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{doAnswer, spy} +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.spark.FlintSpark.RefreshMode._ +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor + +class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matchers { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_index_monitor_test" + private val testFlintIndex = getSkippingIndexName(testTable) + private val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) + + override def beforeAll(): Unit = { + super.beforeAll() + createPartitionedTable(testTable) + + // Replace mock executor with real one and change its delay + val realExecutor = newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1) + FlintSparkIndexMonitor.executor = spy(realExecutor) + doAnswer(invocation => { + // Delay 5 seconds to wait for refresh index done + realExecutor.scheduleWithFixedDelay(invocation.getArgument(0), 5, 1, TimeUnit.SECONDS) + }).when(FlintSparkIndexMonitor.executor) + .scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit]) + } + + test("test") { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testFlintIndex, INCREMENTAL) + + // Wait for refresh complete and monitor thread start + val jobId = spark.streams.active.find(_.name == testFlintIndex).get.id.toString + awaitStreamingComplete(jobId) + Thread.sleep(5000L) + + var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp + + // jobStartTime should stay same while lastUpdateTime keep updated + 3 times { (jobStartTime, lastUpdateTime) => + jobStartTime shouldBe prevJobStartTime + lastUpdateTime should be > prevLastUpdateTime + prevLastUpdateTime = lastUpdateTime + } + } + + private def getLatestTimestamp: (Long, Long) = { + val latest = latestLogEntry(testLatestId) + (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long]) + } + + private implicit class intWithTimes(n: Int) { + def times(f: (Long, Long) => Unit): Unit = { + 1 to n foreach { _ => + { + // Sleep longer than monitor interval 1 second + Thread.sleep(3000) + + val (jobStartTime, lastUpdateTime) = getLatestTimestamp + f(jobStartTime, lastUpdateTime) + } + } + } + } +} From 02d3827c8544eb31db18407e91b44a9236235443 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 15 Nov 2023 15:52:01 -0800 Subject: [PATCH 2/4] Add more IT Signed-off-by: Chen Dai --- .../spark/FlintSparkIndexMonitorITSuite.scala | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 4296f93c4..301379f5d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -8,8 +8,12 @@ package org.opensearch.flint.spark import java.util.Base64 import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, spy} +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.opensearch.client.RequestOptions import org.opensearch.flint.OpenSearchTransactionSuite import org.opensearch.flint.spark.FlintSpark.RefreshMode._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName @@ -27,6 +31,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc override def beforeAll(): Unit = { super.beforeAll() createPartitionedTable(testTable) + } + + override def beforeEach(): Unit = { + super.beforeEach() // Replace mock executor with real one and change its delay val realExecutor = newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1) @@ -36,9 +44,8 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc realExecutor.scheduleWithFixedDelay(invocation.getArgument(0), 5, 1, TimeUnit.SECONDS) }).when(FlintSparkIndexMonitor.executor) .scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit]) - } - test("test") { + // Create an auto refreshed index for test flint .skippingIndex() .onTable(testTable) @@ -51,10 +58,16 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc val jobId = spark.streams.active.find(_.name == testFlintIndex).get.id.toString awaitStreamingComplete(jobId) Thread.sleep(5000L) + } - var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp + override def afterEach(): Unit = { + flint.deleteIndex(testFlintIndex) + FlintSparkIndexMonitor.executor.shutdownNow() + super.afterEach() + } - // jobStartTime should stay same while lastUpdateTime keep updated + test("job start time should not change and last update time keep updated") { + var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp 3 times { (jobStartTime, lastUpdateTime) => jobStartTime shouldBe prevJobStartTime lastUpdateTime should be > prevLastUpdateTime @@ -62,6 +75,28 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + test("monitor task should not terminate if any exception") { + // Block write on metadata log index + setWriteBlockOnMetadataLogIndex(true) + Thread.sleep(2000) + + // Monitor task should stop working after blocking writes + var (_, prevLastUpdateTime) = getLatestTimestamp + 1 times { (_, lastUpdateTime) => + lastUpdateTime shouldBe prevLastUpdateTime + } + + // Unblock write and wait for monitor task attempt to update again + setWriteBlockOnMetadataLogIndex(false) + Thread.sleep(2000) + + // Monitor task continue working after unblocking write + 3 times { (_, lastUpdateTime) => + lastUpdateTime should be > prevLastUpdateTime + prevLastUpdateTime = lastUpdateTime + } + } + private def getLatestTimestamp: (Long, Long) = { val latest = latestLogEntry(testLatestId) (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long]) @@ -80,4 +115,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } } + + private def setWriteBlockOnMetadataLogIndex(isBlock: Boolean): Unit = { + val request = new UpdateSettingsRequest(testMetaLogIndex) + .settings(Map("blocks.write" -> isBlock).asJava) // Blocking write operations + openSearchClient.indices().putSettings(request, RequestOptions.DEFAULT) + } } From 6ada7d5926d380fb95cde87b4456fd1e6a7f5bcf Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 15 Nov 2023 21:15:51 -0800 Subject: [PATCH 3/4] Add more IT Signed-off-by: Chen Dai --- .../spark/FlintSparkIndexMonitorITSuite.scala | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 301379f5d..5c34ca71d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -12,6 +12,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, spy} +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.flint.OpenSearchTransactionSuite @@ -31,10 +32,6 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc override def beforeAll(): Unit = { super.beforeAll() createPartitionedTable(testTable) - } - - override def beforeEach(): Unit = { - super.beforeEach() // Replace mock executor with real one and change its delay val realExecutor = newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1) @@ -44,8 +41,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc realExecutor.scheduleWithFixedDelay(invocation.getArgument(0), 5, 1, TimeUnit.SECONDS) }).when(FlintSparkIndexMonitor.executor) .scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit]) + } - // Create an auto refreshed index for test + override def beforeEach(): Unit = { + super.beforeEach() flint .skippingIndex() .onTable(testTable) @@ -54,19 +53,31 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc .create() flint.refreshIndex(testFlintIndex, INCREMENTAL) - // Wait for refresh complete and monitor thread start + // Wait for refresh complete and another 5 seconds to make sure monitor thread start val jobId = spark.streams.active.find(_.name == testFlintIndex).get.id.toString awaitStreamingComplete(jobId) Thread.sleep(5000L) } override def afterEach(): Unit = { - flint.deleteIndex(testFlintIndex) - FlintSparkIndexMonitor.executor.shutdownNow() - super.afterEach() + // Cancel task to avoid conflict with delete operation since it runs frequently + FlintSparkIndexMonitor.indexMonitorTracker.values.foreach(_.cancel(true)) + FlintSparkIndexMonitor.indexMonitorTracker.clear() + + try { + flint.deleteIndex(testFlintIndex) + } catch { + // Index maybe end up with failed state in some test + case _: IllegalStateException => + openSearchClient + .indices() + .delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT) + } finally { + super.afterEach() + } } - test("job start time should not change and last update time keep updated") { + test("job start time should not change and last update time should keep updated") { var (prevJobStartTime, prevLastUpdateTime) = getLatestTimestamp 3 times { (jobStartTime, lastUpdateTime) => jobStartTime shouldBe prevJobStartTime @@ -75,10 +86,22 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + test("monitor task should terminate if streaming job inactive") { + val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) + + // Stop streaming job intentionally + spark.streams.active.find(_.name == testFlintIndex).get.stop() + waitForMonitorTaskRun() + + // Index state transit to failed and task is cancelled + latestLogEntry(testLatestId) should contain("state" -> "failed") + task.isCancelled shouldBe true + } + test("monitor task should not terminate if any exception") { // Block write on metadata log index setWriteBlockOnMetadataLogIndex(true) - Thread.sleep(2000) + waitForMonitorTaskRun() // Monitor task should stop working after blocking writes var (_, prevLastUpdateTime) = getLatestTimestamp @@ -88,7 +111,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Unblock write and wait for monitor task attempt to update again setWriteBlockOnMetadataLogIndex(false) - Thread.sleep(2000) + waitForMonitorTaskRun() // Monitor task continue working after unblocking write 3 times { (_, lastUpdateTime) => @@ -106,8 +129,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc def times(f: (Long, Long) => Unit): Unit = { 1 to n foreach { _ => { - // Sleep longer than monitor interval 1 second - Thread.sleep(3000) + waitForMonitorTaskRun() val (jobStartTime, lastUpdateTime) = getLatestTimestamp f(jobStartTime, lastUpdateTime) @@ -116,6 +138,11 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + private def waitForMonitorTaskRun(): Unit = { + // Interval longer than monitor schedule to make sure it has finished another run + Thread.sleep(3000L) + } + private def setWriteBlockOnMetadataLogIndex(isBlock: Boolean): Unit = { val request = new UpdateSettingsRequest(testMetaLogIndex) .settings(Map("blocks.write" -> isBlock).asJava) // Blocking write operations From e26a90997ee3620b021c7dfa1bdf6961f584bd94 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 15 Nov 2023 21:32:05 -0800 Subject: [PATCH 4/4] Add more IT Signed-off-by: Chen Dai --- .../spark/FlintSparkIndexMonitorITSuite.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 5c34ca71d..4af147939 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -86,10 +86,25 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + test("job start time should not change until recover index") { + val (prevJobStartTime, _) = getLatestTimestamp + + // Stop streaming job and wait for monitor task stopped + spark.streams.active.find(_.name == testFlintIndex).get.stop() + waitForMonitorTaskRun() + + // Restart streaming job and monitor task + flint.recoverIndex(testFlintIndex) + waitForMonitorTaskRun() + + val (jobStartTime, _) = getLatestTimestamp + jobStartTime should be > prevJobStartTime + } + test("monitor task should terminate if streaming job inactive") { val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) - // Stop streaming job intentionally + // Stop streaming job and wait for monitor task stopped spark.streams.active.find(_.name == testFlintIndex).get.stop() waitForMonitorTaskRun()