Skip to content

Commit

Permalink
Add revert transient log capability for optimistic transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 3, 2023
1 parent fbbbd9e commit 967fffe
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core.metadata.log;

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -76,23 +78,45 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));

// Perform initial log check
if (initialCondition.test(latest)) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
}

// Append optional transient log
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));
}
// Append optional transient log
FlintMetadataLogEntry initialLog = latest;
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));

// Copy latest seqNo and primaryTerm to initialLog for potential revert use
initialLog = initialLog.copy(
initialLog.id(),
latest.seqNo(),
latest.primaryTerm(),
initialLog.createTime(),
initialLog.state(),
initialLog.dataSource(),
initialLog.error());
}

// Perform operation
// Perform operation
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
return result;
} else {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
} catch (Exception e) {
LOG.log(SEVERE, "Reverting transient log due to transaction operation failure", e);
try {
if (transientAction != null) {
metadataLog.add(initialLog);
}
} catch (Exception ex) {
LOG.log(WARNING, "Failed to revert transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => false)
.transientLog(latest => latest)
.transientLog(latest => latest.copy(state = ACTIVE))
.finalLog(latest => latest)
.commit(_ => {})
}

// Initial empty log should not be changed
latestLogEntry(testLatestId) should contain("state" -> "empty")
}

test("should fail if initial log entry updated by others when updating transient log entry") {
Expand Down Expand Up @@ -170,4 +173,58 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
})
}
}

test("should revert to initial log if transaction operation failed") {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => throw new RuntimeException("Mock operation error"))
}

// Should revert to initial empty log
latestLogEntry(testLatestId) should contain("state" -> "empty")
}

test("should revert to initial log if updating final log failed") {
// Use refresh index scenario in this test case
createLatestLogEntry(
FlintMetadataLogEntry(
id = testLatestId,
seqNo = UNASSIGNED_SEQ_NO,
primaryTerm = UNASSIGNED_PRIMARY_TERM,
createTime = 1234567890123L,
state = ACTIVE,
dataSource = testDataSourceName,
error = ""))

the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = REFRESHING))
.finalLog(_ => throw new RuntimeException("Mock final log error"))
.commit(_ => {})
}

// Should revert to initial active log
latestLogEntry(testLatestId) should contain("state" -> "active")
}

test("should not necessarily revert if transaction operation failed but no transient action") {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex, testDataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => throw new RuntimeException("Mock operation error"))
}

// Should revert to initial empty log
latestLogEntry(testLatestId) should contain("state" -> "empty")
}
}

0 comments on commit 967fffe

Please sign in to comment.