diff --git a/docs/img/flint-core-index-state-transition.png b/docs/img/flint-core-index-state-transition.png new file mode 100644 index 000000000..57ef31e54 Binary files /dev/null and b/docs/img/flint-core-index-state-transition.png differ diff --git a/docs/img/flint-spark-index-state-transition.png b/docs/img/flint-spark-index-state-transition.png new file mode 100644 index 000000000..19c0b0c80 Binary files /dev/null and b/docs/img/flint-spark-index-state-transition.png differ diff --git a/docs/index.md b/docs/index.md index cfbf0cbbb..7a1c44a0e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -110,6 +110,12 @@ writer.close() ``` +### Index State Transition + +Flint index state transition: + +![FlintCoreIndexState](./img/flint-core-index-state-transition.png) + ### API High level API is dependent on query engine implementation. Please see Query Engine Integration section for details. @@ -435,8 +441,17 @@ flint.materializedView() .create() flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics") + +flint.deleteIndex("flint_spark_catalog_default_alb_logs_skipping_index") +flint.vacuumIndex("flint_spark_catalog_default_alb_logs_skipping_index") ``` +#### Index State Transition + +Flint Spark index state transition: + +![FlintSparkIndexState](./img/flint-spark-index-state-transition.png) + #### Skipping Index Provider SPI ```scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 48782a303..f3ef364b3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -104,8 +104,13 @@ public T commit(Function operation) { try { T result = operation.apply(latest); - // Append final log - metadataLog.add(finalAction.apply(latest)); + // Append final log or purge log entries + FlintMetadataLogEntry finalLog = finalAction.apply(latest); + if (finalLog == NO_LOG_ENTRY) { + metadataLog.purge(); + } else { + metadataLog.add(finalLog); + } return result; } catch (Exception e) { LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 278d078df..bbbfd86b2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -26,4 +26,9 @@ public interface FlintMetadataLog { * @return latest log entry */ Optional getLatest(); + + /** + * Remove all log entries. + */ + void purge(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index eb93c7fde..e6ae565d2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -85,6 +85,7 @@ object FlintMetadataLogEntry { val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") val RECOVERING: IndexState.Value = Value("recovering") + val VACUUMING: IndexState.Value = Value("vacuuming") val UNKNOWN: IndexState.Value = Value("unknown") def from(s: String): IndexState.Value = { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 3a490a87b..d1992959c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -20,6 +20,11 @@ */ public interface OptimisticTransaction { + /** + * Constant that indicate log entry should be purged. + */ + FlintMetadataLogEntry NO_LOG_ENTRY = null; + /** * @param initialCondition initial precondition that the subsequent transition and action can proceed * @return this transaction @@ -33,7 +38,7 @@ public interface OptimisticTransaction { OptimisticTransaction transientLog(Function action); /** - * @param action action to generate final log entry + * @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY) * @return this transaction */ OptimisticTransaction finalLog(Function action); @@ -45,29 +50,4 @@ public interface OptimisticTransaction { * @return result */ T commit(Function operation); - - /** - * No optimistic transaction. - */ - class NoOptimisticTransaction implements OptimisticTransaction { - @Override - public OptimisticTransaction initialLog(Predicate initialCondition) { - return this; - } - - @Override - public OptimisticTransaction transientLog(Function action) { - return this; - } - - @Override - public OptimisticTransaction finalLog(Function action) { - return this; - } - - @Override - public T commit(Function operation) { - return operation.apply(null); - } - }; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index f51e8a628..ab38a5f60 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,8 +5,17 @@ package org.opensearch.flint.core.storage; +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -19,14 +28,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; - -import static java.util.logging.Level.SEVERE; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -98,6 +99,20 @@ public Optional getLatest() { } } + @Override + public void purge() { + LOG.info("Purging log entry with id " + latestId); + try (RestHighLevelClient client = flintClient.createClient()) { + DeleteResponse response = + client.delete( + new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + + LOG.info("Purged log entry with result " + response.getResult()); + } catch (Exception e) { + throw new IllegalStateException("Failed to purge log entry", e); + } + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index cb2e14144..a5f0f993b 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -26,6 +26,7 @@ skippingIndexStatement | refreshSkippingIndexStatement | describeSkippingIndexStatement | dropSkippingIndexStatement + | vacuumSkippingIndexStatement ; createSkippingIndexStatement @@ -48,12 +49,17 @@ dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName ; +vacuumSkippingIndexStatement + : VACUUM SKIPPING INDEX ON tableName + ; + coveringIndexStatement : createCoveringIndexStatement | refreshCoveringIndexStatement | showCoveringIndexStatement | describeCoveringIndexStatement | dropCoveringIndexStatement + | vacuumCoveringIndexStatement ; createCoveringIndexStatement @@ -80,6 +86,10 @@ dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; +vacuumCoveringIndexStatement + : VACUUM INDEX indexName ON tableName + ; + materializedViewStatement : createMaterializedViewStatement | refreshMaterializedViewStatement @@ -110,6 +120,10 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +vacuumMaterializedViewStatement + : VACUUM MATERIALIZED VIEW mvName=multipartIdentifier + ; + indexJobManagementStatement : recoverIndexJobStatement ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index fe6fd3c66..82c890a61 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -174,6 +174,7 @@ RECOVER: 'RECOVER'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; +VACUUM: 'VACUUM'; VIEW: 'VIEW'; VIEWS: 'VIEWS'; WHERE: 'WHERE'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 47ade0f87..122fea601 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -239,6 +240,38 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Delete a Flint index physically. + * + * @param indexName + * index name + * @return + * true if exist and deleted, otherwise false + */ + def vacuumIndex(indexName: String): Boolean = { + logInfo(s"Vacuuming Flint index $indexName") + if (flintClient.exists(indexName)) { + try { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == DELETED) + .transientLog(latest => latest.copy(state = VACUUMING)) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => { + flintClient.deleteIndex(indexName) + true + }) + } catch { + case e: Exception => + logError("Failed to vacuum Flint index", e) + throw new IllegalStateException("Failed to vacuum Flint index") + } + } else { + logInfo("Flint index to vacuum doesn't exist") + false + } + } + /** * Recover index job. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 83a816a58..9b4816e71 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -112,6 +112,15 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitVacuumCoveringIndexStatement( + ctx: VacuumCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint.vacuumIndex(flintIndexName) + Seq.empty + } + } + private def getFlintIndexName( flint: FlintSpark, indexNameCtx: RuleNode, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 1a990b5b0..3ab164023 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -108,6 +108,14 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitVacuumMaterializedViewStatement( + ctx: VacuumMaterializedViewStatementContext): Command = { + FlintSparkSqlCommand() { flint => + flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName)) + Seq.empty + } + } + private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = { val fullMvName = getFullTableName(flint, mvNameCtx) FlintSparkMaterializedView.getFlintIndexName(fullMvName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 2b0bb6c48..46cf7eebd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -98,6 +98,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A Seq.empty } + override def visitVacuumSkippingIndexStatement( + ctx: VacuumSkippingIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint.vacuumIndex(indexName) + Seq.empty + } + } + private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 56227533a..643a35516 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -119,6 +119,24 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latestLogEntry(testLatestId) should contain("state" -> "deleted") } + test("vacuum index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + deleteLogically(testLatestId) + flint.vacuumIndex(testFlintIndex) + + // Both index data and metadata log should be vacuumed + openSearchClient + .indices() + .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false + openSearchClient.exists( + new GetRequest(testMetaLogIndex, testLatestId), + RequestOptions.DEFAULT) shouldBe false + } + test("should recreate index if logical deleted") { flint .skippingIndex()