Skip to content

Commit

Permalink
Add vacuum index API and SQL support (#189)
Browse files Browse the repository at this point in the history
* Add vacuum API

Signed-off-by: Chen Dai <[email protected]>

* Add vacuuming state

Signed-off-by: Chen Dai <[email protected]>

* Add vacuum SQL support and IT

Signed-off-by: Chen Dai <[email protected]>

* Add vacuum transaction IT

Signed-off-by: Chen Dai <[email protected]>

* Remove SQL IT due to dependency on logical delete

Signed-off-by: Chen Dai <[email protected]>

* Update javadoc and user manual

Signed-off-by: Chen Dai <[email protected]>

* Upload image

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jan 5, 2024
1 parent 1f89aaf commit 804b3aa
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 36 deletions.
Binary file added docs/img/flint-core-index-state-transition.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/flint-spark-index-state-transition.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ writer.close()

```

### Index State Transition

Flint index state transition:

![FlintCoreIndexState](./img/flint-core-index-state-transition.png)

### API

High level API is dependent on query engine implementation. Please see Query Engine Integration section for details.
Expand Down Expand Up @@ -435,8 +441,17 @@ flint.materializedView()
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics")

flint.deleteIndex("flint_spark_catalog_default_alb_logs_skipping_index")
flint.vacuumIndex("flint_spark_catalog_default_alb_logs_skipping_index")
```

#### Index State Transition

Flint Spark index state transition:

![FlintSparkIndexState](./img/flint-spark-index-state-transition.png)

#### Skipping Index Provider SPI

```scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
// Append final log or purge log entries
FlintMetadataLogEntry finalLog = finalAction.apply(latest);
if (finalLog == NO_LOG_ENTRY) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
}
return result;
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public interface FlintMetadataLog<T> {
* @return latest log entry
*/
Optional<T> getLatest();

/**
* Remove all log entries.
*/
void purge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object FlintMetadataLogEntry {
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val RECOVERING: IndexState.Value = Value("recovering")
val VACUUMING: IndexState.Value = Value("vacuuming")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
public interface OptimisticTransaction<T> {

/**
* Constant that indicate log entry should be purged.
*/
FlintMetadataLogEntry NO_LOG_ENTRY = null;

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
Expand All @@ -33,7 +38,7 @@ public interface OptimisticTransaction<T> {
OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* @param action action to generate final log entry
* @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY)
* @return this transaction
*/
OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);
Expand All @@ -45,29 +50,4 @@ public interface OptimisticTransaction<T> {
* @return result
*/
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

package org.opensearch.flint.core.storage;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -19,14 +28,6 @@
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -98,6 +99,20 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}
}

@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
throw new IllegalStateException("Failed to purge log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand Down
14 changes: 14 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ skippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
;

createSkippingIndexStatement
Expand All @@ -48,12 +49,17 @@ dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;

vacuumSkippingIndexStatement
: VACUUM SKIPPING INDEX ON tableName
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;

createCoveringIndexStatement
Expand All @@ -80,6 +86,10 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

vacuumCoveringIndexStatement
: VACUUM INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
Expand Down Expand Up @@ -110,6 +120,10 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

vacuumMaterializedViewStatement
: VACUUM MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VACUUM: 'VACUUM';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WHERE: 'WHERE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -239,6 +240,38 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Delete a Flint index physically.
*
* @param indexName
* index name
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitVacuumCoveringIndexStatement(
ctx: VacuumCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.vacuumIndex(flintIndexName)
Seq.empty
}
}

private def getFlintIndexName(
flint: FlintSpark,
indexNameCtx: RuleNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitVacuumMaterializedViewStatement(
ctx: VacuumMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
}

private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = {
val fullMvName = getFullTableName(flint, mvNameCtx)
FlintSparkMaterializedView.getFlintIndexName(fullMvName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
Seq.empty
}

override def visitVacuumSkippingIndexStatement(
ctx: VacuumSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.vacuumIndex(indexName)
Seq.empty
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
latestLogEntry(testLatestId) should contain("state" -> "deleted")
}

test("vacuum index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
deleteLogically(testLatestId)
flint.vacuumIndex(testFlintIndex)

// Both index data and metadata log should be vacuumed
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
openSearchClient.exists(
new GetRequest(testMetaLogIndex, testLatestId),
RequestOptions.DEFAULT) shouldBe false
}

test("should recreate index if logical deleted") {
flint
.skippingIndex()
Expand Down

0 comments on commit 804b3aa

Please sign in to comment.