From e5e05bbb9b26f1d4cdcabc99cc3727c78645466b Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Mon, 16 Dec 2024 11:48:24 +0900 Subject: [PATCH] Refactoring the callbacks for the group commit error handling in `CommitHandler` (#2390) --- .../consensuscommit/CommitHandler.java | 29 ++++++++++------ .../CommitHandlerWithGroupCommit.java | 14 ++++---- .../consensuscommit/CommitHandlerTest.java | 33 +++++++++++++------ 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 6a511683c7..c8d6079a34 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -51,9 +51,20 @@ public CommitHandler( this.parallelExecutor = checkNotNull(parallelExecutor); } - protected void onPrepareFailure(Snapshot snapshot) {} + /** + * A callback invoked when any exception occurs before committing transactions. + * + * @param snapshot the failed snapshot. + */ + protected void onFailureBeforeCommit(Snapshot snapshot) {} - protected void onValidateFailure(Snapshot snapshot) {} + private void safelyCallOnFailureBeforeCommit(Snapshot snapshot) { + try { + onFailureBeforeCommit(snapshot); + } catch (Exception e) { + logger.warn("Failed to call the callback. Transaction ID: {}", snapshot.getId(), e); + } + } private Optional> invokeBeforePreparationSnapshotHook(Snapshot snapshot) throws UnknownTransactionStatusException, CommitException { @@ -65,11 +76,9 @@ private Optional> invokeBeforePreparationSnapshotHook(Snapshot snap return Optional.of( beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets())); } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); - // TODO: This method is actually a part of preparation phase. But the callback method name - // `onPrepareFailure()` should be renamed to more reasonable one. - onPrepareFailure(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -87,11 +96,9 @@ private void waitBeforePreparationSnapshotHookFuture( try { snapshotHookFuture.get(); } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); - // TODO: This method is actually a part of validation phase. But the callback method name - // `onValidateFailure()` should be renamed to more reasonable one. - onValidateFailure(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -104,6 +111,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction try { prepare(snapshot); } catch (PreparationException e) { + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof PreparationConflictException) { @@ -111,13 +119,14 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onPrepareFailure(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); throw e; } try { validate(snapshot); } catch (ValidationException e) { + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof ValidationConflictException) { @@ -125,7 +134,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onValidateFailure(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); throw e; } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index addca7809b..4f47f73b5f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -38,12 +38,7 @@ public CommitHandlerWithGroupCommit( } @Override - protected void onPrepareFailure(Snapshot snapshot) { - cancelGroupCommitIfNeeded(snapshot.getId()); - } - - @Override - protected void onValidateFailure(Snapshot snapshot) { + protected void onFailureBeforeCommit(Snapshot snapshot) { cancelGroupCommitIfNeeded(snapshot.getId()); } @@ -77,7 +72,12 @@ private void commitStateViaGroupCommit(Snapshot snapshot) } private void cancelGroupCommitIfNeeded(String id) { - groupCommitter.remove(id); + try { + groupCommitter.remove(id); + } catch (Exception e) { + logger.warn( + "Unexpectedly failed to remove the snapshot ID from the group committer. ID: {}", id); + } } @Override diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index fdf84d0c15..3136b10c77 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -183,8 +183,7 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv verify(storage, times(4)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @ParameterizedTest @@ -206,8 +205,7 @@ public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean verify(storage, times(2)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -230,6 +228,7 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -252,6 +251,7 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -274,6 +274,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -303,6 +304,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -330,6 +332,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -357,6 +360,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -382,6 +386,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -405,6 +410,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -428,6 +434,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -458,6 +465,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -486,6 +494,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -514,6 +523,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -540,6 +550,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -564,6 +575,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -587,6 +599,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -609,6 +622,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -631,6 +645,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -649,6 +664,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown() verify(storage, times(2)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -687,8 +703,7 @@ public Future handle( // This means `commit()` waited until the callback was completed before throwing // an exception from `commitState()`. assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(2)); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -710,8 +725,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); - verify(handler).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler).onFailureBeforeCommit(any()); } @Test @@ -735,8 +749,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); - verify(handler, never()).onPrepareFailure(any()); - verify(handler).onValidateFailure(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } protected void doThrowExceptionWhenCoordinatorPutState(