diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index da0aa4234..f16d57157 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core.metadata.log; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -76,23 +78,45 @@ public T commit(Function operation) { metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry())); // Perform initial log check - if (initialCondition.test(latest)) { + if (!initialCondition.test(latest)) { + LOG.warning("Initial log entry doesn't satisfy precondition " + latest); + throw new IllegalStateException( + "Transaction failed due to initial log precondition not satisfied"); + } - // Append optional transient log - if (transientAction != null) { - latest = metadataLog.add(transientAction.apply(latest)); - } + // Append optional transient log + FlintMetadataLogEntry initialLog = latest; + if (transientAction != null) { + latest = metadataLog.add(transientAction.apply(latest)); + + // Copy latest seqNo and primaryTerm to initialLog for potential revert use + initialLog = initialLog.copy( + initialLog.id(), + latest.seqNo(), + latest.primaryTerm(), + initialLog.createTime(), + initialLog.state(), + initialLog.dataSource(), + initialLog.error()); + } - // Perform operation + // Perform operation + try { T result = operation.apply(latest); // Append final log metadataLog.add(finalAction.apply(latest)); return result; - } else { - LOG.warning("Initial log entry doesn't satisfy precondition " + latest); - throw new IllegalStateException( - "Transaction failed due to initial log precondition not satisfied"); + } catch (Exception e) { + LOG.log(SEVERE, "Reverting transient log due to transaction operation failure", e); + try { + if (transientAction != null) { + metadataLog.add(initialLog); + } + } catch (Exception ex) { + LOG.log(WARNING, "Failed to revert transient log", ex); + } + throw new IllegalStateException("Failed to commit transaction operation"); } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 78e47b2f0..66eae2755 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -132,10 +132,13 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { flintClient .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => false) - .transientLog(latest => latest) + .transientLog(latest => latest.copy(state = ACTIVE)) .finalLog(latest => latest) .commit(_ => {}) } + + // Initial empty log should not be changed + latestLogEntry(testLatestId) should contain("state" -> "empty") } test("should fail if initial log entry updated by others when updating transient log entry") { @@ -170,4 +173,58 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { }) } } + + test("should revert to initial log if transaction operation failed") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = CREATING)) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should revert to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } + + test("should revert to initial log if updating final log failed") { + // Use refresh index scenario in this test case + createLatestLogEntry( + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = 1234567890123L, + state = ACTIVE, + dataSource = testDataSourceName, + error = "")) + + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = REFRESHING)) + .finalLog(_ => throw new RuntimeException("Mock final log error")) + .commit(_ => {}) + } + + // Should revert to initial active log + latestLogEntry(testLatestId) should contain("state" -> "active") + } + + test("should not necessarily revert if transaction operation failed but no transient action") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should revert to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } }