Skip to content

Commit

Permalink
Implement metadata log purge
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 5, 2023
1 parent db53420 commit e721546
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
// Append final log or purge log entries
FlintMetadataLogEntry finalLog = finalAction.apply(latest);
if (finalLog == NO_LOG_ENTRY) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
}
return result;
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public interface FlintMetadataLog<T> {
* @return latest log entry
*/
Optional<T> getLatest();

/**
* Remove all log entries.
*/
void purge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
public interface OptimisticTransaction<T> {

/**
* Constant that indicate log entry should be purged.
*/
FlintMetadataLogEntry NO_LOG_ENTRY = null;

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
Expand All @@ -33,7 +38,7 @@ public interface OptimisticTransaction<T> {
OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* @param action action to generate final log entry
* @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY)
* @return this transaction
*/
OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);
Expand All @@ -45,29 +50,4 @@ public interface OptimisticTransaction<T> {
* @return result
*/
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

package org.opensearch.flint.core.storage;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -19,14 +28,6 @@
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -98,6 +99,20 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}
}

@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
throw new IllegalStateException("Failed to purge metadata log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -253,7 +254,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == DELETED)
.finalLog(latest => latest.copy(state = DELETED)) // TODO: vacuum metadata log too?
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.Base64
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchTransactionSuite
Expand Down Expand Up @@ -117,13 +118,14 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
flint.deleteIndex(testFlintIndex)
latestLogEntry(testLatestId) should contain("state" -> "deleted")

// Vacuum and recreate index
// Vacuum index data and metadata log
flint.vacuumIndex(testFlintIndex)
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
openSearchClient
.indices()
.exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false
openSearchClient.exists(
new GetRequest(testMetaLogIndex, testLatestId),
RequestOptions.DEFAULT) shouldBe false
}

test("should not recreate index if index data still exists") {
Expand Down

0 comments on commit e721546

Please sign in to comment.