From 39c9adbf518dee0b26375029266d24e73f5e5349 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 1 Feb 2024 13:12:56 -0800 Subject: [PATCH 1/3] Clean up metadata log in recover index API Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 14 ++++++++++++++ .../spark/FlintSparkTransactionITSuite.scala | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index c197a0bd4..dc85affb1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -303,6 +303,20 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } else { logInfo("Index to be recovered either doesn't exist or not auto refreshed") + if (index.isEmpty) { + /* + * If execution reaches this point, it indicates that the Flint index is corrupted. + * In such cases, clean up the metadata log, as the index data no longer exists. + * There is a very small possibility that users may recreate the index in the + * interim, but metadata log get deleted by this cleanup process. + */ + logWarning("Cleaning up metadata log as index data has been deleted") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => {}) + } false } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index fc4e4638d..a2b93648e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -142,4 +142,23 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() } should have message s"Flint index $testFlintIndex already exists" } + + test("should clean up metadata log entry if index data has been deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testFlintIndex) + + // Simulate the situation that user delete index data directly and then refresh exits + spark.streams.active.find(_.name == testFlintIndex).get.stop() + deleteIndex(testFlintIndex) + + // Index state is refreshing and expect recover API clean it up + latestLogEntry(testLatestId) should contain("state" -> "refreshing") + flint.recoverIndex(testFlintIndex) + latestLogEntry(testLatestId) shouldBe empty + } } From de0e9ca7752c19dae6ced0d112f921771a520607 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 1 Feb 2024 13:41:31 -0800 Subject: [PATCH 2/3] Await termination only if there is streaming job running Signed-off-by: Chen Dai --- .../src/main/scala/org/apache/spark/sql/JobOperator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index c60d250ea..a702d2c64 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -84,8 +84,8 @@ case class JobOperator( } try { - // Stop SparkSession if streaming job succeeds - if (!exceptionThrown && streaming) { + // Wait for streaming job complete if no error and there is streaming job running + if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) { // wait if any child thread to finish before the main thread terminates spark.streams.awaitAnyTermination() } From 33cffa9c7f1b86c0408cc0dbe016c03bc351a598 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 2 Feb 2024 16:01:08 -0800 Subject: [PATCH 3/3] Update user manual Signed-off-by: Chen Dai --- docs/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/index.md b/docs/index.md index 5f9d594de..575c76a34 100644 --- a/docs/index.md +++ b/docs/index.md @@ -298,6 +298,8 @@ WITH ( Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below. +- **Recover Job**: Initiates a restart of the index refresh job and transition the Flint index to the 'refreshing' state. Additionally, it includes functionality to clean up the metadata log entry in the event that the Flint data index is no longer present in OpenSearch. + ```sql RECOVER INDEX JOB ```