From d1280cfd86d99002fd4b6ea21ce626f497833799 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 7 Nov 2023 08:11:35 -0800 Subject: [PATCH] Add more info in Flint metadata log (#125) * Add missing latestId field Signed-off-by: Chen Dai * Add create time and UT Signed-off-by: Chen Dai * Change FlintSpark API to update job start time Signed-off-by: Chen Dai * Remove unused EMR info in Flint metadata Signed-off-by: Chen Dai * Singleton scheduler executor service and add shutdown hook Signed-off-by: Chen Dai * Support recreate index from logical deleted index Signed-off-by: Chen Dai * Add revert transient log capability for optimistic transaction Signed-off-by: Chen Dai * Add transaction IT with recover command Signed-off-by: Chen Dai * Extract new index monitor class Signed-off-by: Chen Dai * Fix flaky IT due to background scheduler task Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- .../log/DefaultOptimisticTransaction.java | 46 +++++-- .../metadata/log/FlintMetadataLogEntry.scala | 13 +- .../storage/FlintOpenSearchMetadataLog.java | 2 + .../org/apache/spark/sql/flint/package.scala | 13 ++ .../opensearch/flint/spark/FlintSpark.scala | 58 ++------- .../flint/spark/FlintSparkIndex.scala | 22 ---- .../flint/spark/FlintSparkIndexMonitor.scala | 122 ++++++++++++++++++ .../flint/OpenSearchTransactionSuite.scala | 5 - .../flint/core/FlintTransactionITSuite.scala | 112 +++++++++++++++- .../spark/FlintSparkIndexJobITSuite.scala | 2 +- .../flint/spark/FlintSparkSuite.scala | 15 +++ .../spark/FlintSparkTransactionITSuite.scala | 80 +++++++++++- 12 files changed, 400 insertions(+), 90 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 2019d8812..48782a303 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core.metadata.log; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -76,23 +78,46 @@ public T commit(Function operation) { metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry())); // Perform initial log check - if (initialCondition.test(latest)) { + if (!initialCondition.test(latest)) { + LOG.warning("Initial log entry doesn't satisfy precondition " + latest); + throw new IllegalStateException( + "Transaction failed due to initial log precondition not satisfied"); + } - // Append optional transient log - if (transientAction != null) { - latest = metadataLog.add(transientAction.apply(latest)); - } + // Append optional transient log + FlintMetadataLogEntry initialLog = latest; + if (transientAction != null) { + latest = metadataLog.add(transientAction.apply(latest)); + + // Copy latest seqNo and primaryTerm to initialLog for potential rollback use + initialLog = initialLog.copy( + initialLog.id(), + latest.seqNo(), + latest.primaryTerm(), + initialLog.createTime(), + initialLog.state(), + initialLog.dataSource(), + initialLog.error()); + } - // Perform operation + // Perform operation + try { T result = operation.apply(latest); // Append final log metadataLog.add(finalAction.apply(latest)); return result; - } else { - LOG.warning("Initial log entry doesn't satisfy precondition " + latest); - throw new IllegalStateException( - "Transaction failed due to initial log precondition not satisfied"); + } catch (Exception e) { + LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); + try { + // Roll back transient log if any + if (transientAction != null) { + metadataLog.add(initialLog); + } + } catch (Exception ex) { + LOG.log(WARNING, "Failed to rollback transient log", ex); + } + throw new IllegalStateException("Failed to commit transaction operation"); } } @@ -101,6 +126,7 @@ private FlintMetadataLogEntry emptyLogEntry() { "", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, + 0L, IndexState$.MODULE$.EMPTY(), dataSourceName, ""); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index 1761a142a..fea9974c6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -29,8 +29,13 @@ case class FlintMetadataLogEntry( id: String, seqNo: Long, primaryTerm: Long, + /** + * This is currently used as streaming job start time. In future, this should represent the + * create timestamp of the log entry + */ + createTime: Long, state: IndexState, - dataSource: String, // TODO: get from Spark conf + dataSource: String, error: String) { def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) { @@ -38,6 +43,8 @@ case class FlintMetadataLogEntry( id, seqNo, primaryTerm, + /* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */ + map.get("jobStartTime").asInstanceOf[Number].longValue(), IndexState.from(map.get("state").asInstanceOf[String]), map.get("dataSourceName").asInstanceOf[String], map.get("error").asInstanceOf[String]) @@ -48,12 +55,14 @@ case class FlintMetadataLogEntry( s""" |{ | "version": "1.0", + | "latestId": "$id", | "type": "flintindexstate", | "state": "$state", | "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}", | "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}", | "dataSourceName": "$dataSource", - | "lastUpdateTime": "${System.currentTimeMillis()}", + | "jobStartTime": $createTime, + | "lastUpdateTime": ${System.currentTimeMillis()}, | "error": "$error" |} |""".stripMargin diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index dc2efc595..07029d608 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -98,6 +98,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { latestId, logEntry.seqNo(), logEntry.primaryTerm(), + logEntry.createTime(), logEntry.state(), logEntry.dataSource(), logEntry.error()); @@ -135,6 +136,7 @@ private FlintMetadataLogEntry writeLogEntry( logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), + logEntry.createTime(), logEntry.state(), logEntry.dataSource(), logEntry.error()); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index 0bac6ac73..eba99b809 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -7,12 +7,25 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.util.ShutdownHookManager /** * Flint utility methods that rely on access to private code in Spark SQL package. */ package object flint { + /** + * Add shutdown hook to SparkContext with default priority. + * + * @param hook + * hook with the code to run during shutdown + * @return + * a handle that can be used to unregister the shutdown hook. + */ + def addShutdownHook(hook: () => Unit): AnyRef = { + ShutdownHookManager.addShutdownHook(hook) + } + /** * Convert the given logical plan to Spark data frame. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 461011ce5..e9331113a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark -import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} - import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} @@ -46,9 +44,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** Scheduler for updating index state regularly as needed, such as incremental refreshing */ - var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) - /** * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in * future @@ -56,6 +51,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { private val dataSourceName: String = spark.conf.getOption("spark.flint.datasource.name").getOrElse("") + /** Flint Spark index monitor */ + private val flintIndexMonitor: FlintSparkIndexMonitor = + new FlintSparkIndexMonitor(spark, flintClient, dataSourceName) + /** * Create index builder for creating index with fluent API. * @@ -106,7 +105,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == EMPTY) + .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(latest => @@ -144,7 +143,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE) - .transientLog(latest => latest.copy(state = REFRESHING)) + .transientLog(latest => + latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly if (mode == FULL) { @@ -152,8 +152,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { latest.copy(state = ACTIVE) } else { // Schedule regular update and return log entry as refreshing state - logInfo("Scheduling index state updater") - scheduleIndexStateUpdate(indexName) + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(indexName) latest } }) @@ -223,6 +223,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { .finalLog(latest => latest.copy(state = DELETED)) .commit(_ => { // TODO: share same transaction for now + flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) flintClient.deleteIndex(indexName) true @@ -252,9 +253,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) - .transientLog(latest => latest.copy(state = RECOVERING)) + .transientLog(latest => + latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) .finalLog(latest => { - scheduleIndexStateUpdate(indexName) + flintIndexMonitor.startMonitor(indexName) latest.copy(state = REFRESHING) }) .commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL)) @@ -287,40 +289,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { private def isIncrementalRefreshing(indexName: String): Boolean = spark.streams.active.exists(_.name == indexName) - private def scheduleIndexStateUpdate(indexName: String): Unit = { - var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel() - task = executor.scheduleAtFixedRate( - () => { - logInfo(s"Scheduler triggers index log entry update for $indexName") - try { - if (isIncrementalRefreshing(indexName)) { - logInfo("Streaming job is still active") - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(_ => {}) - } else { - logError("Streaming job is not active. Cancelling update task") - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(_ => true) - .finalLog(latest => latest.copy(state = FAILED)) - .commit(_ => {}) - task.cancel(true) - logInfo("Update task is cancelled") - } - } catch { - case e: Exception => - logError("Failed to update index log entry", e) - throw new IllegalStateException("Failed to update index log entry") - } - }, - 15, // Delay to ensure final logging is complete first, otherwise version conflicts - 60, // TODO: make interval configurable - TimeUnit.SECONDS) - } - // TODO: move to separate class private def doRefreshIndex( index: FlintSparkIndex, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index fe5329739..8a7ef5b3e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -90,22 +90,6 @@ object FlintSparkIndex { def flintIndexNamePrefix(fullTableName: String): String = s"flint_${fullTableName.replace(".", "_")}_" - /** - * Populate environment variables to persist in Flint metadata. - * - * @return - * env key value mapping to populate - */ - def populateEnvToMetadata: Map[String, String] = { - // TODO: avoid hardcoding env name below by providing another config - val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID") - envNames - .flatMap(key => - Option(System.getenv(key)) - .map(value => key -> value)) - .toMap - } - /** * Create Flint metadata builder with common fields. * @@ -120,12 +104,6 @@ object FlintSparkIndex { builder.kind(index.kind) builder.options(index.options.optionsWithDefault.mapValues(_.asInstanceOf[AnyRef]).asJava) - // Index properties - val envs = populateEnvToMetadata - if (envs.nonEmpty) { - builder.addProperty("env", envs.asJava) - } - // Optional index settings val settings = index.options.indexSettings() if (settings.isDefined) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala new file mode 100644 index 000000000..28e46cb29 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} + +import scala.collection.concurrent.{Map, TrieMap} +import scala.sys.addShutdownHook + +import org.opensearch.flint.core.FlintClient +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +/** + * Flint Spark index state monitor. + * + * @param spark + * Spark session + * @param flintClient + * Flint client + * @param dataSourceName + * data source name + */ +class FlintSparkIndexMonitor( + spark: SparkSession, + flintClient: FlintClient, + dataSourceName: String) + extends Logging { + + /** + * Start monitoring task on the given Flint index. + * + * @param indexName + * Flint index name + */ + def startMonitor(indexName: String): Unit = { + val task = FlintSparkIndexMonitor.executor.scheduleWithFixedDelay( + () => { + logInfo(s"Scheduler trigger index monitor task for $indexName") + try { + if (isStreamingJobActive(indexName)) { + logInfo("Streaming job is still active") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest) // timestamp will update automatically + .commit(_ => {}) + } else { + logError("Streaming job is not active. Cancelling monitor task") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + + stopMonitor(indexName) + logInfo("Index monitor task is cancelled") + } + } catch { + case e: Exception => + logError("Failed to update index log entry", e) + throw new IllegalStateException("Failed to update index log entry") + } + }, + 15, // Delay to ensure final logging is complete first, otherwise version conflicts + 60, // TODO: make interval configurable + TimeUnit.SECONDS) + + FlintSparkIndexMonitor.indexMonitorTracker.put(indexName, task) + } + + /** + * Cancel scheduled task on the given Flint index. + * + * @param indexName + * Flint index name + */ + def stopMonitor(indexName: String): Unit = { + logInfo(s"Cancelling scheduled task for index $indexName") + val task = FlintSparkIndexMonitor.indexMonitorTracker.remove(indexName) + if (task.isDefined) { + task.get.cancel(true) + } else { + logInfo(s"Cannot find scheduled task") + } + } + + private def isStreamingJobActive(indexName: String): Boolean = + spark.streams.active.exists(_.name == indexName) +} + +object FlintSparkIndexMonitor extends Logging { + + /** + * Thread-safe ExecutorService globally shared by all FlintSpark instance and will be shutdown + * in Spark application upon exit. Non-final variable for test convenience. + */ + var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + + /** + * Tracker that stores task future handle which is required to cancel the task in future. + */ + val indexMonitorTracker: Map[String, ScheduledFuture[_]] = + new TrieMap[String, ScheduledFuture[_]]() + + /* + * Register shutdown hook to SparkContext with default priority (higher than SparkContext.close itself) + */ + addShutdownHook(() => { + logInfo("Shutdown scheduled executor service") + try { + executor.shutdownNow() + } catch { + case e: Exception => logWarning("Failed to shutdown scheduled executor service", e) + } + }) +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 35b6688e5..1e7077799 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -6,7 +6,6 @@ package org.opensearch.flint import java.util.Collections -import java.util.concurrent.ScheduledExecutorService import scala.collection.JavaConverters.mapAsScalaMapConverter @@ -21,7 +20,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite -import org.scalatestplus.mockito.MockitoSugar.mock /** * Transaction test base suite that creates the metadata log index which enables transaction @@ -35,9 +33,6 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() spark.conf.set("spark.flint.datasource.name", testDataSourceName) - - // Replace executor to avoid impact on IT - flint.executor = mock[ScheduledExecutorService] } override def beforeEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index a6a1dd889..a8b5a1fa2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -27,6 +27,56 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) } + test("empty metadata log entry content") { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => { + latest.id shouldBe testLatestId + latest.state shouldBe EMPTY + latest.createTime shouldBe 0L + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + true + }) + .finalLog(latest => latest) + .commit(_ => {}) + } + + test("should preserve original values when transition") { + val testCreateTime = 1234567890123L + createLatestLogEntry( + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = testCreateTime, + state = ACTIVE, + dataSource = testDataSourceName, + error = "")) + + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => { + latest.id shouldBe testLatestId + latest.createTime shouldBe testCreateTime + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + true + }) + .transientLog(latest => latest.copy(state = DELETING)) + .finalLog(latest => latest.copy(state = DELETED)) + .commit(latest => { + latest.id shouldBe testLatestId + latest.createTime shouldBe testCreateTime + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + }) + + latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and + contain("dataSourceName" -> testDataSourceName) and + contain("error" -> "")) + } + test("should transit from initial to final log if initial log is empty") { flintClient .startTransaction(testFlintIndex, testDataSourceName) @@ -59,8 +109,9 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { id = testLatestId, seqNo = UNASSIGNED_SEQ_NO, primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = 1234567890123L, state = ACTIVE, - dataSource = "mys3", + dataSource = testDataSourceName, error = "")) flintClient @@ -81,10 +132,13 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { flintClient .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => false) - .transientLog(latest => latest) + .transientLog(latest => latest.copy(state = ACTIVE)) .finalLog(latest => latest) .commit(_ => {}) } + + // Initial empty log should not be changed + latestLogEntry(testLatestId) should contain("state" -> "empty") } test("should fail if initial log entry updated by others when updating transient log entry") { @@ -119,4 +173,58 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { }) } } + + test("should rollback to initial log if transaction operation failed") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = CREATING)) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should rollback to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } + + test("should rollback to initial log if updating final log failed") { + // Use refresh index scenario in this test case + createLatestLogEntry( + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = 1234567890123L, + state = ACTIVE, + dataSource = testDataSourceName, + error = "")) + + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = REFRESHING)) + .finalLog(_ => throw new RuntimeException("Mock final log error")) + .commit(_ => {}) + } + + // Should rollback to initial active log + latestLogEntry(testLatestId) should contain("state" -> "active") + } + + test("should not necessarily rollback if transaction operation failed but no transient action") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should rollback to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 27693c63b..365aab83d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -19,7 +19,7 @@ import org.scalatest.matchers.should.Matchers class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers { /** Test table and index name */ - private val testTable = "spark_catalog.default.test" + private val testTable = "spark_catalog.default.index_job_test" private val testIndex = getSkippingIndexName(testTable) private val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 168279eb3..29b8b95a6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -5,7 +5,15 @@ package org.opensearch.flint.spark +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture} + +import scala.concurrent.duration.TimeUnit + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock import org.opensearch.flint.OpenSearchSuite +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.QueryTest @@ -29,6 +37,13 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit // Disable mandatory checkpoint for test convenience setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + + // Replace executor to avoid impact on IT. + // TODO: Currently no IT test scheduler so no need to restore it back. + val mockExecutor = mock[ScheduledExecutorService] + when(mockExecutor.scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit])) + .thenAnswer((_: InvocationOnMock) => mock[ScheduledFuture[_]]) + FlintSparkIndexMonitor.executor = mockExecutor } protected def awaitStreamingComplete(jobId: String): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 5376617dd..294449a48 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -10,9 +10,12 @@ import java.util.Base64 import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -41,7 +44,10 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .addPartitions("year", "month") .create() - latestLogEntry(testLatestId) should contain("state" -> "active") + latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) + and contain("state" -> "active") + and contain("jobStartTime" -> 0) + and contain("dataSourceName" -> testDataSourceName)) implicit val formats: Formats = Serialization.formats(NoTypeHints) val mapping = @@ -63,7 +69,9 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex, FULL) - latestLogEntry(testLatestId) should contain("state" -> "active") + val latest = latestLogEntry(testLatestId) + latest should contain("state" -> "active") + latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L } test("incremental refresh index") { @@ -71,9 +79,23 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .skippingIndex() .onTable(testTable) .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() flint.refreshIndex(testFlintIndex, INCREMENTAL) - latestLogEntry(testLatestId) should contain("state" -> "refreshing") + + // Job start time should be assigned + var latest = latestLogEntry(testLatestId) + latest should contain("state" -> "refreshing") + val prevStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + prevStartTime should be > 0L + + // Restart streaming job + spark.streams.active.head.stop() + flint.recoverIndex(testFlintIndex) + + // Make sure job start time is updated + latest = latestLogEntry(testLatestId) + latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } test("delete index") { @@ -86,4 +108,56 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latestLogEntry(testLatestId) should contain("state" -> "deleted") } + + test("should recreate index if logical deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate that user deletes index data manually + flint.deleteIndex(testFlintIndex) + latestLogEntry(testLatestId) should contain("state" -> "deleted") + + // Simulate that user recreate the index + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .create() + } + + test("should not recreate index if index data still exists") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate that PPL plugin leaves index data as logical deleted + deleteLogically(testLatestId) + latestLogEntry(testLatestId) should contain("state" -> "deleted") + + // Simulate that user recreate the index but forgot to cleanup index data + the[IllegalStateException] thrownBy { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .create() + } should have message s"Flint index $testFlintIndex already exists" + } + + private def deleteLogically(latestId: String): Unit = { + val response = openSearchClient + .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) + + val latest = new FlintMetadataLogEntry( + latestId, + response.getSeqNo, + response.getPrimaryTerm, + response.getSourceAsMap) + updateLatestLogEntry(latest, DELETED) + } }