From aeb88b94347071b3938c6b8f7cf97d13a8f91960 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 30 Jan 2024 13:52:42 -0800 Subject: [PATCH] Remove compounding retries within PrimaryShardReplicationSource (#12043) This change removes retries within PrimaryShardReplicationSource and relies on retries in one place at the start of replication. This is done within SegmentReplicationTargetService's processLatestReceivedCheckpoint after a failure/success occurs. The timeout on these retries is the cause of flaky failures from SegmentReplication's bwc test within IndexingIT, that can occur on node disconnect. The retries will persist for over ~1m to the same primary node that has been relocated/shut down and cause the test to timeout. This change also includes simplifications to the cancellation flow on the target service before the shard is closed. Previously we "request" a cancel that does not remove the target from the ongoing replications collection until a cancellation failure is thrown. The transport calls from PrimaryShardReplicationSource are no longer wrapped in CancellableThreads by the client so a call to "cancel" will not throw. Instead we now immediately remove the target and decref/close it. Signed-off-by: Marc Handalian Signed-off-by: Shivansh Arora --- .../org/opensearch/upgrades/IndexingIT.java | 1 - .../replication/SegmentReplicationIT.java | 61 +++++++++++++++++++ ...plicationUsingRemoteStoreDisruptionIT.java | 53 +++++++--------- .../PrimaryShardReplicationSource.java | 46 ++++++-------- .../replication/SegmentReplicationTarget.java | 22 +++---- .../SegmentReplicationTargetService.java | 28 ++++++--- .../PrimaryShardReplicationSourceTests.java | 38 ------------ .../SegmentReplicationTargetServiceTests.java | 7 ++- 8 files changed, 133 insertions(+), 123 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index f963f8d221bb5..1577260e145d4 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -262,7 +262,6 @@ public void testIndexing() throws Exception { * @throws Exception if index creation fail * @throws UnsupportedOperationException if cluster type is unknown */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7679") public void testIndexingWithSegRep() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 5511bc7945d65..4a848e92800cb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -594,6 +594,67 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + public void testCancellationDuringGetCheckpointInfo() throws Exception { + cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO); + } + + public void testCancellationDuringGetSegments() throws Exception { + cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES); + } + + private void cancelDuringReplicaAction(String actionToblock) throws Exception { + // this test stubs transport calls specific to node-node replication. + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + segmentReplicationWithRemoteEnabled() + ); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureYellow(INDEX_NAME); + + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + final SegmentReplicationTargetService targetService = internalCluster().getInstance( + SegmentReplicationTargetService.class, + replicaNode + ); + final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + CountDownLatch startCancellationLatch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); + + MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + ); + primaryTransportService.addRequestHandlingBehavior(actionToblock, (handler, request, channel, task) -> { + logger.info("action {}", actionToblock); + try { + startCancellationLatch.countDown(); + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // index a doc and trigger replication + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + // remove the replica and ensure it is cleaned up. + startCancellationLatch.await(); + SegmentReplicationTarget target = targetService.get(replicaShard.shardId()); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertEquals("Replication not closed: " + target.getId(), 0, target.refCount()); + assertEquals("Store has a positive refCount", 0, replicaShard.store().refCount()); + // stop the replica, this will do additional checks on shutDown to ensure the replica and its store are closed properly + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); + latch.countDown(); + } + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java index 8372135fc55c4..3d8d001b17ddf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -23,8 +23,6 @@ import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.common.ReplicationCollection; -import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.SlowClusterStateProcessing; @@ -33,6 +31,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + /** * This class runs tests with remote store + segRep while blocking file downloads */ @@ -59,22 +59,18 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception { indexSingleDoc(); refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); - final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); - assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage()); - ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( - state.getReplicationId() - ); - final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); - // close the target ref here otherwise it will hold a refcount - segmentReplicationTargetReplicationRef.close(); + SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId()); assertNotNull(segmentReplicationTarget); + assertEquals(SegmentReplicationState.Stage.GET_FILES, segmentReplicationTarget.state().getStage()); assertTrue(segmentReplicationTarget.refCount() > 0); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - assertBusy(() -> { - assertTrue(indexShard.routingEntry().primary()); - assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); - assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); - }); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); unblockNode(REPOSITORY_NAME, replicaNode); cleanupRepo(); } @@ -85,7 +81,6 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { final Set dataNodeNames = internalCluster().getDataNodeNames(); final String replicaNode = getNode(dataNodeNames, false); - final String primaryNode = getNode(dataNodeNames, true); SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); ensureGreen(INDEX_NAME); @@ -94,22 +89,18 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { indexSingleDoc(); refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); - final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); - assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); - ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( - state.getReplicationId() - ); - final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); - // close the target ref here otherwise it will hold a refcount - segmentReplicationTargetReplicationRef.close(); + SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId()); assertNotNull(segmentReplicationTarget); + assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, segmentReplicationTarget.state().getStage()); assertTrue(segmentReplicationTarget.refCount() > 0); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - assertBusy(() -> { - assertTrue(indexShard.routingEntry().primary()); - assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); - assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); - }); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertNull(targetService.get(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); unblockNode(REPOSITORY_NAME, replicaNode); cleanupRepo(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 02fc8feefd698..a17779810239a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -8,16 +8,14 @@ package org.opensearch.indices.replication; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; @@ -35,9 +33,7 @@ */ public class PrimaryShardReplicationSource implements SegmentReplicationSource { - private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); - - private final RetryableTransportClient transportClient; + private final TransportService transportService; private final DiscoveryNode sourceNode; private final DiscoveryNode targetNode; @@ -52,12 +48,7 @@ public PrimaryShardReplicationSource( DiscoveryNode sourceNode ) { this.targetAllocationId = targetAllocationId; - this.transportClient = new RetryableTransportClient( - transportService, - sourceNode, - recoverySettings.internalActionRetryTimeout(), - logger - ); + this.transportService = transportService; this.sourceNode = sourceNode; this.targetNode = targetNode; this.recoverySettings = recoverySettings; @@ -69,10 +60,14 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - final Writeable.Reader reader = CheckpointInfoResponse::new; - final ActionListener responseListener = ActionListener.map(listener, r -> r); final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint); - transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader); + transportService.sendRequest( + sourceNode, + GET_CHECKPOINT_INFO, + request, + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionRetryTimeout()).build(), + new ActionListenerResponseHandler<>(listener, CheckpointInfoResponse::new, ThreadPool.Names.GENERIC) + ); } @Override @@ -88,8 +83,6 @@ public void getSegmentFiles( // MultiFileWriter takes care of progress tracking for downloads in this scenario // TODO: Move state management and tracking into replication methods and use chunking and data // copy mechanisms only from MultiFileWriter - final Writeable.Reader reader = GetSegmentFilesResponse::new; - final ActionListener responseListener = ActionListener.map(listener, r -> r); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( replicationId, targetAllocationId, @@ -97,20 +90,17 @@ public void getSegmentFiles( filesToFetch, checkpoint ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionLongTimeout()) - .build(); - transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); + transportService.sendRequest( + sourceNode, + GET_SEGMENT_FILES, + request, + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.GENERIC) + ); } @Override public String getDescription() { return sourceNode.getName(); } - - @Override - public void cancel() { - transportClient.cancel(); - } - } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index cc71ef816e525..af764556b7549 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -83,6 +83,16 @@ protected void closeInternal() { } } + @Override + protected void onCancel(String reason) { + try { + notifyListener(new ReplicationFailedException(reason), false); + } finally { + source.cancel(); + cancellableThreads.cancel(reason); + } + } + @Override protected String getPrefix() { return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + "."; @@ -320,16 +330,4 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } } } - - /** - * Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service. - */ - @Override - public void cancel(String reason) { - if (finished.get() == false) { - logger.trace(new ParameterizedMessage("Cancelling replication for target {}", description())); - cancellableThreads.cancel(reason); - source.cancel(); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d6db154a4e0e3..f28f829545d59 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -84,10 +84,6 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent private final ClusterService clusterService; private final TransportService transportService; - public ReplicationRef get(long replicationId) { - return onGoingReplications.get(replicationId); - } - /** * The internal actions * @@ -158,6 +154,7 @@ protected void doStart() { @Override protected void doStop() { if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown"; clusterService.removeListener(this); } } @@ -201,7 +198,7 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { - onGoingReplications.requestCancel(indexShard.shardId(), "Shard closing"); + onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing"); latestReceivedCheckpoint.remove(shardId); } } @@ -223,7 +220,7 @@ public void afterIndexShardStarted(IndexShard indexShard) { @Override public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) { - onGoingReplications.requestCancel(indexShard.shardId(), "Shard has been promoted to primary"); + onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary"); latestReceivedCheckpoint.remove(indexShard.shardId()); } } @@ -255,6 +252,14 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) { .orElseGet(() -> getlatestCompletedEventSegmentReplicationState(shardId)); } + public ReplicationRef get(long replicationId) { + return onGoingReplications.get(replicationId); + } + + public SegmentReplicationTarget get(ShardId shardId) { + return onGoingReplications.getOngoingReplicationTarget(shardId); + } + /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -454,7 +459,13 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa latestPublishedCheckpoint ) ); - Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + Runnable runnable = () -> { + // if we retry ensure the shard is not in the process of being closed. + // it will be removed from indexService's collection before the shard is actually marked as closed. + if (indicesService.getShardOrNull(replicaShard.shardId()) != null) { + onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + } + }; // Checks if we are using same thread and forks if necessary. if (thread == Thread.currentThread()) { threadPool.generic().execute(runnable); @@ -548,9 +559,6 @@ public ReplicationRunner(long replicationId) { @Override public void onFailure(Exception e) { - try (final ReplicationRef ref = onGoingReplications.get(replicationId)) { - logger.error(() -> new ParameterizedMessage("Error during segment replication, {}", ref.get().description()), e); - } onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false); } diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index e4dd32e5c6f70..2531790ede4af 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -15,7 +15,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; @@ -27,12 +26,9 @@ import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -165,40 +161,6 @@ public void testTransportTimeoutForGetSegmentFilesAction() { assertEquals(recoverySettings.internalActionLongTimeout(), capturedRequest.options.timeout()); } - public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); - replicationSource.getSegmentFiles( - REPLICATION_ID, - checkpoint, - Arrays.asList(testMetadata), - mock(IndexShard.class), - (fileName, bytesRecovered) -> {}, - new ActionListener<>() { - @Override - public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - Assert.fail("onFailure response expected."); - } - - @Override - public void onFailure(Exception e) { - assertEquals(e.getClass(), CancellableThreads.ExecutionCancelledException.class); - latch.countDown(); - } - } - ); - replicationSource.cancel(); - latch.await(2, TimeUnit.SECONDS); - assertEquals("listener should have resolved in a failure", 0, latch.getCount()); - } - private DiscoveryNode newDiscoveryNode(String nodeName) { return new DiscoveryNode( nodeName, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index f284a425a417b..3c72dda2d8b5d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -620,6 +620,7 @@ public void testForceSegmentSyncHandlerWithFailure_AlreadyClosedException_swallo } public void testTargetCancelledBeforeStartInvoked() { + final String cancelReason = "test"; final SegmentReplicationTarget target = new SegmentReplicationTarget( replicaShard, primaryShard.getLatestReplicationCheckpoint(), @@ -633,12 +634,12 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { // failures leave state object in last entered stage. - assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); - assertTrue(e.getCause() instanceof CancellableThreads.ExecutionCancelledException); + assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(cancelReason, e.getMessage()); } } ); - target.cancel("test"); + target.cancel(cancelReason); sut.startReplication(target); }