From e721546626093bd00e3608d4375383d5cb9d5490 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 5 Dec 2023 14:18:13 -0800 Subject: [PATCH] Implement metadata log purge Signed-off-by: Chen Dai --- .../log/DefaultOptimisticTransaction.java | 9 ++++-- .../core/metadata/log/FlintMetadataLog.java | 5 +++ .../metadata/log/OptimisticTransaction.java | 32 ++++--------------- .../storage/FlintOpenSearchMetadataLog.java | 31 +++++++++++++----- .../opensearch/flint/spark/FlintSpark.scala | 3 +- .../spark/FlintSparkTransactionITSuite.scala | 14 ++++---- 6 files changed, 51 insertions(+), 43 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 48782a303..f3ef364b3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -104,8 +104,13 @@ public T commit(Function operation) { try { T result = operation.apply(latest); - // Append final log - metadataLog.add(finalAction.apply(latest)); + // Append final log or purge log entries + FlintMetadataLogEntry finalLog = finalAction.apply(latest); + if (finalLog == NO_LOG_ENTRY) { + metadataLog.purge(); + } else { + metadataLog.add(finalLog); + } return result; } catch (Exception e) { LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 278d078df..bbbfd86b2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -26,4 +26,9 @@ public interface FlintMetadataLog { * @return latest log entry */ Optional getLatest(); + + /** + * Remove all log entries. + */ + void purge(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 3a490a87b..d1992959c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -20,6 +20,11 @@ */ public interface OptimisticTransaction { + /** + * Constant that indicate log entry should be purged. + */ + FlintMetadataLogEntry NO_LOG_ENTRY = null; + /** * @param initialCondition initial precondition that the subsequent transition and action can proceed * @return this transaction @@ -33,7 +38,7 @@ public interface OptimisticTransaction { OptimisticTransaction transientLog(Function action); /** - * @param action action to generate final log entry + * @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY) * @return this transaction */ OptimisticTransaction finalLog(Function action); @@ -45,29 +50,4 @@ public interface OptimisticTransaction { * @return result */ T commit(Function operation); - - /** - * No optimistic transaction. - */ - class NoOptimisticTransaction implements OptimisticTransaction { - @Override - public OptimisticTransaction initialLog(Predicate initialCondition) { - return this; - } - - @Override - public OptimisticTransaction transientLog(Function action) { - return this; - } - - @Override - public OptimisticTransaction finalLog(Function action) { - return this; - } - - @Override - public T commit(Function operation) { - return operation.apply(null); - } - }; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index f51e8a628..f7eed019f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,8 +5,17 @@ package org.opensearch.flint.core.storage; +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -19,14 +28,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; - -import static java.util.logging.Level.SEVERE; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -98,6 +99,20 @@ public Optional getLatest() { } } + @Override + public void purge() { + LOG.info("Purging log entry with id " + latestId); + try (RestHighLevelClient client = flintClient.createClient()) { + DeleteResponse response = + client.delete( + new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + + LOG.info("Purged log entry with result " + response.getResult()); + } catch (Exception e) { + throw new IllegalStateException("Failed to purge metadata log entry", e); + } + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 97e500167..6ebb57f6b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -253,7 +254,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == DELETED) - .finalLog(latest => latest.copy(state = DELETED)) // TODO: vacuum metadata log too? + .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { flintClient.deleteIndex(indexName) true diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 75d31f9d3..ffa8284b2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -10,6 +10,7 @@ import java.util.Base64 import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite @@ -117,13 +118,14 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") - // Vacuum and recreate index + // Vacuum index data and metadata log flint.vacuumIndex(testFlintIndex) - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() + openSearchClient + .indices() + .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false + openSearchClient.exists( + new GetRequest(testMetaLogIndex, testLatestId), + RequestOptions.DEFAULT) shouldBe false } test("should not recreate index if index data still exists") {