Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logical index deletion and vacuum index API #182

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
// Append final log or purge log entries
FlintMetadataLogEntry finalLog = finalAction.apply(latest);
if (finalLog == NO_LOG_ENTRY) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
}
return result;
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public interface FlintMetadataLog<T> {
* @return latest log entry
*/
Optional<T> getLatest();

/**
* Remove all log entries.
*/
void purge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object FlintMetadataLogEntry {
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val RECOVERING: IndexState.Value = Value("recovering")
val VACUUMING: IndexState.Value = Value("vacuuming")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
public interface OptimisticTransaction<T> {

/**
* Constant that indicate log entry should be purged.
*/
FlintMetadataLogEntry NO_LOG_ENTRY = null;

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
Expand All @@ -33,7 +38,7 @@ public interface OptimisticTransaction<T> {
OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* @param action action to generate final log entry
* @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY)
* @return this transaction
*/
OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);
Expand All @@ -45,29 +50,4 @@ public interface OptimisticTransaction<T> {
* @return result
*/
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

package org.opensearch.flint.core.storage;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -19,14 +28,6 @@
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -98,6 +99,20 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}
}

@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
throw new IllegalStateException("Failed to purge metadata log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -225,7 +226,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
// TODO: share same transaction for now
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
})
} catch {
Expand All @@ -239,6 +239,38 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Delete a Flint index physically.
*
* @param indexName
* index name
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

// Delete all test indices
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
}

test("create covering index with metadata successfully") {
Expand Down Expand Up @@ -127,5 +128,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.addIndexColumns("address")
.create()
flint.deleteIndex(getFlintIndexName(newIndex, testTable))
flint.vacuumIndex(getFlintIndexName(newIndex, testTable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

// Delete all test indices
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
}

test("create covering index with auto refresh") {
Expand Down Expand Up @@ -253,7 +254,9 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(result, Seq(Row(testIndex), Row("idx_address")))

flint.deleteIndex(getFlintIndexName("idx_address", testTable))
flint.vacuumIndex(getFlintIndexName("idx_address", testTable))
flint.deleteIndex(getSkippingIndexName(testTable))
flint.vacuumIndex(getSkippingIndexName(testTable))
}

test("describe covering index") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
*/
try {
flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
} catch {
case _: IllegalStateException => deleteIndex(testIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
test(new AssertionHelper(flintIndexName, checkpointDir))
} finally {
flint.deleteIndex(flintIndexName)
flint.vacuumIndex(flintIndexName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc

try {
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
} catch {
// Index maybe end up with failed state in some test
case _: IllegalStateException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
}

test("create materialized view with metadata successfully") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
}

test("create materialized view with auto refresh") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

// Delete all test indices
flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
}

test("create skipping index with metadata successfully") {
Expand Down Expand Up @@ -582,6 +583,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
|""".stripMargin)

flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
}

test("can build skipping index for varchar and char and rewrite applicable query") {
Expand Down Expand Up @@ -628,6 +630,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
col("varchar_col") === "sample varchar" && col("char_col") === paddedChar))

flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
super.afterEach()

flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
}

test("create skipping index with auto refresh") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.should.Matchers
Expand All @@ -41,6 +39,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
*/
try {
flint.deleteIndex(testFlintIndex)
flint.vacuumIndex(testFlintIndex)
} catch {
case _: IllegalStateException => deleteIndex(testFlintIndex)
}
Expand Down Expand Up @@ -108,34 +107,25 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime
}

test("delete index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.deleteIndex(testFlintIndex)

latestLogEntry(testLatestId) should contain("state" -> "deleted")
}

test("should recreate index if logical deleted") {
test("delete and vacuum index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

// Simulate that user deletes index data manually
// Logical delete index
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")

// Simulate that user recreate the index
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("name")
.create()
// Vacuum index data and metadata log
flint.vacuumIndex(testFlintIndex)
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
openSearchClient.exists(
new GetRequest(testMetaLogIndex, testLatestId),
RequestOptions.DEFAULT) shouldBe false
}

test("should not recreate index if index data still exists") {
Expand All @@ -146,7 +136,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()

// Simulate that PPL plugin leaves index data as logical deleted
deleteLogically(testLatestId)
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")

// Simulate that user recreate the index but forgot to cleanup index data
Expand All @@ -158,16 +148,4 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()
} should have message s"Flint index $testFlintIndex already exists"
}

private def deleteLogically(latestId: String): Unit = {
val response = openSearchClient
.get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT)

val latest = new FlintMetadataLogEntry(
latestId,
response.getSeqNo,
response.getPrimaryTerm,
response.getSourceAsMap)
updateLatestLogEntry(latest, DELETED)
}
}
Loading