Skip to content

Commit

Permalink
IGNITE-23303 Fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ascherbakoff committed Nov 18, 2024
1 parent c8c31db commit e60db81
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public class IgniteStripedReadWriteLock implements ReadWriteLock {
/** Default concurrency. */
public static int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
private static final int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);

/** Index generator. */
private static final AtomicInteger IDX_GEN = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ private CompletableFuture<TransactionResult> finishTransaction(
throw new TransactionException(commit ? TX_COMMIT_ERR : TX_ROLLBACK_ERR, ex);
}

TransactionResult result = (TransactionResult) ((ApplyCommandResult) txOutcome).result;
TransactionResult result = (TransactionResult) ((ResultWrapper) txOutcome).result;

markFinished(txId, result.transactionState(), result.commitTimestamp());

Expand Down Expand Up @@ -2620,8 +2620,8 @@ private static <T> boolean allElementsAreNull(List<T> list) {
* @param cmd Raft command.
* @return Raft future or raft decorated future with command that was processed.
*/
private CompletableFuture<ApplyCommandResult<Object>> applyCmdWithExceptionHandling(Command cmd) {
CompletableFuture<ApplyCommandResult<Object>> resultFuture = new CompletableFuture<>();
private CompletableFuture<ResultWrapper<Object>> applyCmdWithExceptionHandling(Command cmd) {
CompletableFuture<ResultWrapper<Object>> resultFuture = new CompletableFuture<>();

applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);

Expand Down Expand Up @@ -2677,7 +2677,7 @@ private <T> void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, Comple
resultFuture.completeExceptionally(ex);
}
} else {
resultFuture.complete((T) new ApplyCommandResult<>(cmd, res));
resultFuture.complete((T) new ResultWrapper<>(cmd, res));
}
});
}
Expand Down Expand Up @@ -2751,7 +2751,7 @@ private CompletableFuture<ApplyResult> applyUpdateCommand(

if (updateCommandResult != null && updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()).thenApply(ignored -> null)
.thenApply(ret -> new ApplyResult(cmd.safeTime(), null));
.thenApply(ret -> new ApplyResult(((UpdateCommand) res.getCommand()).safeTime(), null));
} else {
if (!SKIP_UPDATES) {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
Expand All @@ -2768,7 +2768,8 @@ private CompletableFuture<ApplyResult> applyUpdateCommand(
);
}

return completedFuture(new ApplyResult(cmd.safeTime(), null));
// getCommand provides actual assigned safeTime (may be reassigned due to reorder)
return completedFuture(new ApplyResult(((UpdateCommand) res.getCommand()).safeTime(), null));
}
});
}
Expand Down Expand Up @@ -2885,7 +2886,7 @@ private CompletableFuture<ApplyResult> applyUpdateAllCommand(
}
if (updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateAllCommand) res.getCommand()).safeTime())
.thenApply(ret -> new ApplyResult(cmd.safeTime(), null));
.thenApply(ret -> new ApplyResult(((UpdateAllCommand) res.getCommand()).safeTime(), null));
} else {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
Expand All @@ -2898,7 +2899,7 @@ private CompletableFuture<ApplyResult> applyUpdateAllCommand(
indexIdsAtRwTxBeginTs(txId)
);

return completedFuture(new ApplyResult(cmd.safeTime(), null));
return completedFuture(new ApplyResult(((UpdateAllCommand) res.getCommand()).safeTime(), null));
}
});
}
Expand Down Expand Up @@ -4147,11 +4148,11 @@ private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<Tab
* Wrapper for the update(All)Command processing result that besides result itself stores actual command that was processed. It helps to
* manage commands substitutions on SafeTimeReorderException where cloned command with adjusted safeTime is sent.
*/
private static class ApplyCommandResult<T> {
private static class ResultWrapper<T> {
private final Command command;
private final T result;

ApplyCommandResult(Command command, T result) {
ResultWrapper(Command command, T result) {
this.command = command;
this.result = result;
}
Expand Down

0 comments on commit e60db81

Please sign in to comment.