diff --git a/docs/index.md b/docs/index.md index ea6778f39..3ceac9088 100644 --- a/docs/index.md +++ b/docs/index.md @@ -266,6 +266,8 @@ WITH ( Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below. +- **Recover Job**: Initiates a restart of the index refresh job and transition the Flint index to the 'refreshing' state. Additionally, it includes functionality to clean up the metadata log entry in the event that the Flint data index is no longer present in OpenSearch. + ```sql RECOVER INDEX JOB ``` diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 48782a303..5576a0a06 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -104,8 +104,13 @@ public T commit(Function operation) { try { T result = operation.apply(latest); - // Append final log - metadataLog.add(finalAction.apply(latest)); + // Append final log or purge log entries + FlintMetadataLogEntry finalLog = finalAction.apply(latest); + if (finalLog == NO_LOG_ENTRY) { + metadataLog.purge(); + } else { + metadataLog.add(finalLog); + } return result; } catch (Exception e) { LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); @@ -131,4 +136,4 @@ private FlintMetadataLogEntry emptyLogEntry() { dataSourceName, ""); } -} +} \ No newline at end of file diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 278d078df..bbbfd86b2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -26,4 +26,9 @@ public interface FlintMetadataLog { * @return latest log entry */ Optional getLatest(); + + /** + * Remove all log entries. + */ + void purge(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 3a490a87b..d2b1e7952 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -20,6 +20,11 @@ */ public interface OptimisticTransaction { + /** + * Constant that indicate log entry should be purged. + */ + FlintMetadataLogEntry NO_LOG_ENTRY = null; + /** * @param initialCondition initial precondition that the subsequent transition and action can proceed * @return this transaction diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index f51e8a628..ab38a5f60 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,8 +5,17 @@ package org.opensearch.flint.core.storage; +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -19,14 +28,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; - -import static java.util.logging.Level.SEVERE; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -98,6 +99,20 @@ public Optional getLatest() { } } + @Override + public void purge() { + LOG.info("Purging log entry with id " + latestId); + try (RestHighLevelClient client = flintClient.createClient()) { + DeleteResponse response = + client.delete( + new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + + LOG.info("Purged log entry with result " + response.getResult()); + } catch (Exception e) { + throw new IllegalStateException("Failed to purge log entry", e); + } + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 175436fbf..03bc3a685 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -270,6 +271,20 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } else { logInfo("Index to be recovered either doesn't exist or not auto refreshed") + if (index.isEmpty) { + /* + * If execution reaches this point, it indicates that the Flint index is corrupted. + * In such cases, clean up the metadata log, as the index data no longer exists. + * There is a very small possibility that users may recreate the index in the + * interim, but metadata log get deleted by this cleanup process. + */ + logWarning("Cleaning up metadata log as index data has been deleted") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => {}) + } false } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 56227533a..8fa60f8ad 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -159,6 +159,25 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match } should have message s"Flint index $testFlintIndex already exists" } + test("should clean up metadata log entry if index data has been deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testFlintIndex, INCREMENTAL) + + // Simulate the situation that user delete index data directly and then refresh exits + spark.streams.active.find(_.name == testFlintIndex).get.stop() + deleteIndex(testFlintIndex) + + // Index state is refreshing and expect recover API clean it up + latestLogEntry(testLatestId) should contain("state" -> "refreshing") + flint.recoverIndex(testFlintIndex) + latestLogEntry(testLatestId) shouldBe empty + } + private def deleteLogically(latestId: String): Unit = { val response = openSearchClient .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 3b5aa474a..a2edbe98e 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -83,8 +83,8 @@ case class JobOperator( } try { - // Stop SparkSession if streaming job succeeds - if (!exceptionThrown && streaming) { + // Wait for streaming job complete if no error and there is streaming job running + if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) { // wait if any child thread to finish before the main thread terminates spark.streams.awaitAnyTermination() }