diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 48782a303..f3ef364b3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -104,8 +104,13 @@ public T commit(Function operation) { try { T result = operation.apply(latest); - // Append final log - metadataLog.add(finalAction.apply(latest)); + // Append final log or purge log entries + FlintMetadataLogEntry finalLog = finalAction.apply(latest); + if (finalLog == NO_LOG_ENTRY) { + metadataLog.purge(); + } else { + metadataLog.add(finalLog); + } return result; } catch (Exception e) { LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 278d078df..bbbfd86b2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -26,4 +26,9 @@ public interface FlintMetadataLog { * @return latest log entry */ Optional getLatest(); + + /** + * Remove all log entries. + */ + void purge(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index eb93c7fde..e6ae565d2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -85,6 +85,7 @@ object FlintMetadataLogEntry { val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") val RECOVERING: IndexState.Value = Value("recovering") + val VACUUMING: IndexState.Value = Value("vacuuming") val UNKNOWN: IndexState.Value = Value("unknown") def from(s: String): IndexState.Value = { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 3a490a87b..d1992959c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -20,6 +20,11 @@ */ public interface OptimisticTransaction { + /** + * Constant that indicate log entry should be purged. + */ + FlintMetadataLogEntry NO_LOG_ENTRY = null; + /** * @param initialCondition initial precondition that the subsequent transition and action can proceed * @return this transaction @@ -33,7 +38,7 @@ public interface OptimisticTransaction { OptimisticTransaction transientLog(Function action); /** - * @param action action to generate final log entry + * @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY) * @return this transaction */ OptimisticTransaction finalLog(Function action); @@ -45,29 +50,4 @@ public interface OptimisticTransaction { * @return result */ T commit(Function operation); - - /** - * No optimistic transaction. - */ - class NoOptimisticTransaction implements OptimisticTransaction { - @Override - public OptimisticTransaction initialLog(Predicate initialCondition) { - return this; - } - - @Override - public OptimisticTransaction transientLog(Function action) { - return this; - } - - @Override - public OptimisticTransaction finalLog(Function action) { - return this; - } - - @Override - public T commit(Function operation) { - return operation.apply(null); - } - }; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index f51e8a628..f7eed019f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,8 +5,17 @@ package org.opensearch.flint.core.storage; +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -19,14 +28,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; - -import static java.util.logging.Level.SEVERE; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -98,6 +99,20 @@ public Optional getLatest() { } } + @Override + public void purge() { + LOG.info("Purging log entry with id " + latestId); + try (RestHighLevelClient client = flintClient.createClient()) { + DeleteResponse response = + client.delete( + new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + + LOG.info("Purged log entry with result " + response.getResult()); + } catch (Exception e) { + throw new IllegalStateException("Failed to purge metadata log entry", e); + } + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 47ade0f87..82096e322 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -225,7 +226,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { // TODO: share same transaction for now flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) - flintClient.deleteIndex(indexName) true }) } catch { @@ -239,6 +239,38 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Delete a Flint index physically. + * + * @param indexName + * index name + * @return + * true if exist and deleted, otherwise false + */ + def vacuumIndex(indexName: String): Boolean = { + logInfo(s"Vacuuming Flint index $indexName") + if (flintClient.exists(indexName)) { + try { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == DELETED) + .transientLog(latest => latest.copy(state = VACUUMING)) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => { + flintClient.deleteIndex(indexName) + true + }) + } catch { + case e: Exception => + logError("Failed to vacuum Flint index", e) + throw new IllegalStateException("Failed to vacuum Flint index") + } + } else { + logInfo("Flint index to vacuum doesn't exist") + false + } + } + /** * Recover index job. * diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index c79069b9b..702efd9cb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -32,6 +32,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { // Delete all test indices flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } test("create covering index with metadata successfully") { @@ -127,5 +128,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("address") .create() flint.deleteIndex(getFlintIndexName(newIndex, testTable)) + flint.vacuumIndex(getFlintIndexName(newIndex, testTable)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 27419b616..bf9da583a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -39,6 +39,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { // Delete all test indices flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } test("create covering index with auto refresh") { @@ -253,7 +254,9 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { checkAnswer(result, Seq(Row(testIndex), Row("idx_address"))) flint.deleteIndex(getFlintIndexName("idx_address", testTable)) + flint.vacuumIndex(getFlintIndexName("idx_address", testTable)) flint.deleteIndex(getSkippingIndexName(testTable)) + flint.vacuumIndex(getSkippingIndexName(testTable)) } test("describe covering index") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 8df2bc472..ef98cb3d6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -37,6 +37,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers */ try { flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) } catch { case _: IllegalStateException => deleteIndex(testIndex) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala index ddbfeeb16..8f152e4f0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -118,6 +118,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { test(new AssertionHelper(flintIndexName, checkpointDir)) } finally { flint.deleteIndex(flintIndexName) + flint.vacuumIndex(flintIndexName) } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 4af147939..ddb737a01 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -66,6 +66,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc try { flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } catch { // Index maybe end up with failed state in some test case _: IllegalStateException => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..3690fe4a3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -39,6 +39,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } test("create materialized view with metadata successfully") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f9bd3967a..ce08f7bfd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -45,6 +45,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } test("create materialized view with auto refresh") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..3673df2f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -39,6 +39,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Delete all test indices flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) } test("create skipping index with metadata successfully") { @@ -582,6 +583,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { |""".stripMargin) flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) } test("can build skipping index for varchar and char and rewrite applicable query") { @@ -628,6 +630,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) } // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..a339a6913 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -37,6 +37,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { super.afterEach() flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) } test("create skipping index with auto refresh") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 56227533a..ffa8284b2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -14,8 +14,6 @@ import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -41,6 +39,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match */ try { flint.deleteIndex(testFlintIndex) + flint.vacuumIndex(testFlintIndex) } catch { case _: IllegalStateException => deleteIndex(testFlintIndex) } @@ -108,34 +107,25 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } - test("delete index") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - flint.deleteIndex(testFlintIndex) - - latestLogEntry(testLatestId) should contain("state" -> "deleted") - } - - test("should recreate index if logical deleted") { + test("delete and vacuum index") { flint .skippingIndex() .onTable(testTable) .addPartitions("year", "month") .create() - // Simulate that user deletes index data manually + // Logical delete index flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") - // Simulate that user recreate the index - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("name") - .create() + // Vacuum index data and metadata log + flint.vacuumIndex(testFlintIndex) + openSearchClient + .indices() + .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false + openSearchClient.exists( + new GetRequest(testMetaLogIndex, testLatestId), + RequestOptions.DEFAULT) shouldBe false } test("should not recreate index if index data still exists") { @@ -146,7 +136,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() // Simulate that PPL plugin leaves index data as logical deleted - deleteLogically(testLatestId) + flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") // Simulate that user recreate the index but forgot to cleanup index data @@ -158,16 +148,4 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() } should have message s"Flint index $testFlintIndex already exists" } - - private def deleteLogically(latestId: String): Unit = { - val response = openSearchClient - .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) - - val latest = new FlintMetadataLogEntry( - latestId, - response.getSeqNo, - response.getPrimaryTerm, - response.getSourceAsMap) - updateLatestLogEntry(latest, DELETED) - } }