Skip to content

Commit

Permalink
Cherry pick vacuum index changes
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Feb 7, 2024
1 parent 4fe86d9 commit fd2781a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
// Append final log or purge log entries
FlintMetadataLogEntry finalLog = finalAction.apply(latest);
if (finalLog == NO_LOG_ENTRY) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
}
return result;
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
Expand All @@ -131,4 +136,4 @@ private FlintMetadataLogEntry emptyLogEntry() {
dataSourceName,
"");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public interface FlintMetadataLog<T> {
* @return latest log entry
*/
Optional<T> getLatest();

/**
* Remove all log entries.
*/
void purge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
public interface OptimisticTransaction<T> {

/**
* Constant that indicate log entry should be purged.
*/
FlintMetadataLogEntry NO_LOG_ENTRY = null;

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

package org.opensearch.flint.core.storage;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -19,14 +28,6 @@
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -98,6 +99,20 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}
}

@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
throw new IllegalStateException("Failed to purge log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down

0 comments on commit fd2781a

Please sign in to comment.