Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.1] Fix recover index bug when Flint data index is deleted accidentally #247

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ WITH (

Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below.

- **Recover Job**: Initiates a restart of the index refresh job and transition the Flint index to the 'refreshing' state. Additionally, it includes functionality to clean up the metadata log entry in the event that the Flint data index is no longer present in OpenSearch.

```sql
RECOVER INDEX JOB <id>
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
// Append final log or purge log entries
FlintMetadataLogEntry finalLog = finalAction.apply(latest);
if (finalLog == NO_LOG_ENTRY) {
metadataLog.purge();
} else {
metadataLog.add(finalLog);
}
return result;
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
Expand All @@ -131,4 +136,4 @@ private FlintMetadataLogEntry emptyLogEntry() {
dataSourceName,
"");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public interface FlintMetadataLog<T> {
* @return latest log entry
*/
Optional<T> getLatest();

/**
* Remove all log entries.
*/
void purge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
public interface OptimisticTransaction<T> {

/**
* Constant that indicate log entry should be purged.
*/
FlintMetadataLogEntry NO_LOG_ENTRY = null;

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

package org.opensearch.flint.core.storage;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -19,14 +28,6 @@
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;

import static java.util.logging.Level.SEVERE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
Expand Down Expand Up @@ -98,6 +99,20 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}
}

@Override
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
throw new IllegalStateException("Failed to purge log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -270,6 +271,20 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
}
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,25 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
} should have message s"Flint index $testFlintIndex already exists"
}

test("should clean up metadata log entry if index data has been deleted") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testFlintIndex, INCREMENTAL)

// Simulate the situation that user delete index data directly and then refresh exits
spark.streams.active.find(_.name == testFlintIndex).get.stop()
deleteIndex(testFlintIndex)

// Index state is refreshing and expect recover API clean it up
latestLogEntry(testLatestId) should contain("state" -> "refreshing")
flint.recoverIndex(testFlintIndex)
latestLogEntry(testLatestId) shouldBe empty
}

private def deleteLogically(latestId: String): Unit = {
val response = openSearchClient
.get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ case class JobOperator(
}

try {
// Stop SparkSession if streaming job succeeds
if (!exceptionThrown && streaming) {
// Wait for streaming job complete if no error and there is streaming job running
if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) {
// wait if any child thread to finish before the main thread terminates
spark.streams.awaitAnyTermination()
}
Expand Down
Loading