diff --git a/CHANGELOG.md b/CHANGELOG.md index 335903743baa2..0882b3491f257 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) - [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365)) +- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 122d53ab8cd32..26bec2203c599 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -158,9 +158,9 @@ public void startReplication(ActionListener listener) { final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); + cancellableThreads.checkForCancel(); logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); // Get list of files to copy from this checkpoint. - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index a674ab6151f79..7437cb22e44d1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -37,6 +39,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.eq; +import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown // of latch. doAnswer(invocation -> { - final ActionListener listener = invocation.getArgument(0); + // short circuit loop on new checkpoint request + doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); // a new checkpoint arrives before we've completed. serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); - listener.onResponse(null); - latch.countDown(); + try { + invocation.callRealMethod(); + } catch (CancellableThreads.ExecutionCancelledException e) { + latch.countDown(); + } return null; }).when(targetSpy).startReplication(any()); - doNothing().when(targetSpy).onDone(); // start replication. This adds the target to on-ongoing replication collection serviceSpy.startReplication(targetSpy); - + latch.await(); // wait for the new checkpoint to arrive, before the listener completes. - latch.await(5, TimeUnit.SECONDS); - doNothing().when(targetSpy).startReplication(any()); + assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); - closeShards(replicaShard); } public void testNewCheckpointBehindCurrentCheckpoint() { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index d9b9df0f8f155..31802c7e5aba5 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1201,6 +1201,7 @@ public void getCheckpointMetadata( listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); + copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState");