diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index d9f318e78597c..ae722b8742e94 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -57,14 +57,9 @@ public ReplicationCheckpoint getCheckpoint() { return this.checkpoint; } - public SegmentReplicationTarget( - ReplicationCheckpoint checkpoint, - IndexShard indexShard, - SegmentReplicationSource source, - ReplicationListener listener - ) { + public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); - this.checkpoint = checkpoint; + this.checkpoint = indexShard.getLatestReplicationCheckpoint(); this.source = source; this.state = new SegmentReplicationState( indexShard.routingEntry(), @@ -101,7 +96,7 @@ public SegmentReplicationState state() { } public SegmentReplicationTarget retryCopy() { - return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); + return new SegmentReplicationTarget(indexShard, source, listener); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1858449e13ae8..2d76c7afe64cf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -222,7 +222,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { - startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + startReplication(replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.trace( @@ -272,17 +272,8 @@ public void onReplicationFailure( } } - public SegmentReplicationTarget startReplication( - final ReplicationCheckpoint checkpoint, - final IndexShard indexShard, - final SegmentReplicationListener listener - ) { - final SegmentReplicationTarget target = new SegmentReplicationTarget( - checkpoint, - indexShard, - sourceFactory.get(indexShard), - listener - ); + public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) { + final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener); startReplication(target); return target; } @@ -400,57 +391,49 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha channel.sendResponse(TransportResponse.Empty.INSTANCE); return; } - startReplication( - ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), - indexShard, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - indexShard.getLatestReplicationCheckpoint(), - state.getTimingData() - ) - ); - try { - // Promote engine type for primary target - if (indexShard.recoveryState().getPrimary() == true) { - indexShard.resetToWriteableEngine(); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (InterruptedException | TimeoutException | IOException e) { - throw new RuntimeException(e); + startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + indexShard.getLatestReplicationCheckpoint(), + state.getTimingData() + ) + ); + try { + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); } + } - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - if (sendShardFailure == true) { - indexShard.failShard("replication failure", e); - } - try { - channel.sendResponse(e); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + try { + channel.sendResponse(e); + } catch (IOException ex) { + throw new RuntimeException(ex); } } - ); + }); } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0d95f40652523..764d60dbd5b82 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -40,6 +40,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -294,7 +295,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication(primaryShard, null); + sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -314,7 +315,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(primaryShard); } @@ -1027,7 +1028,10 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); + final CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), + primary + ); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1041,7 +1045,6 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 357a88c27fc46..caf20dcd60d1b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -11,22 +11,34 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.OpenSearchException; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; + +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.ForceSyncRequest; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.EmptyTransportResponseHandler; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; +import org.opensearch.test.transport.CapturingTransport; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,6 +49,13 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; import static org.mockito.Mockito.spy; @@ -56,6 +75,14 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private ReplicationCheckpoint newPrimaryCheckpoint; + private TransportService transportService; + private TestThreadPool testThreadPool; + private DiscoveryNode localNode; + + private IndicesService indicesService; + + private static long TRANSPORT_TIMEOUT = 30000;// 30sec + @Override public void setUp() throws Exception { super.setUp(); @@ -63,43 +90,66 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); - CodecService codecService = new CodecService(null, null); - String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); + String primaryCodec = primaryShard.getLatestReplicationCheckpoint().getCodec(); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); + checkpoint = new ReplicationCheckpoint( + replicaShard.shardId(), + 0L, + 0L, + 0L, + replicaShard.getLatestReplicationCheckpoint().getCodec() + ); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); - sut = prepareForReplication(primaryShard, null); + testThreadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + CapturingTransport transport = new CapturingTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + testThreadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> localNode, + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + indicesService = mock(IndicesService.class); + + sut = prepareForReplication(primaryShard, null, transportService, indicesService); initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); } @Override public void tearDown() throws Exception { closeShards(primaryShard, replicaShard); + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + testThreadPool = null; super.tearDown(); } public void testsSuccessfulReplication_listenerCompletes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - sut.startReplication(checkpoint, replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { + sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); @@ -142,7 +192,6 @@ public void getSegmentFiles( } }; final SegmentReplicationTarget target = new SegmentReplicationTarget( - checkpoint, replicaShard, source, new SegmentReplicationTargetService.SegmentReplicationListener() { @@ -168,14 +217,13 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile public void testAlreadyOnNewCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); } public void testShardAlreadyReplicating() throws InterruptedException { // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. SegmentReplicationTargetService serviceSpy = spy(sut); final SegmentReplicationTarget target = new SegmentReplicationTarget( - initialCheckpoint, replicaShard, replicationSource, mock(SegmentReplicationTargetService.SegmentReplicationListener.class) @@ -201,7 +249,7 @@ public void testShardAlreadyReplicating() throws InterruptedException { // wait for the new checkpoint to arrive, before the listener completes. latch.await(30, TimeUnit.SECONDS); verify(targetSpy, times(0)).cancel(any()); - verify(serviceSpy, times(0)).startReplication(eq(aheadCheckpoint), eq(replicaShard), any()); + verify(serviceSpy, times(0)).startReplication(eq(replicaShard), any()); } public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws IOException, InterruptedException { @@ -210,7 +258,6 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // Create a Mockito spy of target to stub response of few method calls. final SegmentReplicationTarget targetSpy = spy( new SegmentReplicationTarget( - initialCheckpoint, replicaShard, replicationSource, mock(SegmentReplicationTargetService.SegmentReplicationListener.class) @@ -222,7 +269,7 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // of latch. doAnswer(invocation -> { // short circuit loop on new checkpoint request - doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); + doReturn(null).when(serviceSpy).startReplication(eq(replicaShard), any()); // a new checkpoint arrives before we've completed. serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); try { @@ -239,20 +286,20 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // wait for the new checkpoint to arrive, before the listener completes. assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); - verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); + verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any()); } public void testNewCheckpointBehindCurrentCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(checkpoint, replicaShard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); } public void testShardNotStarted() throws IOException { SegmentReplicationTargetService spy = spy(sut); IndexShard shard = newShard(false); spy.onNewCheckpoint(checkpoint, shard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(shard); } @@ -268,7 +315,43 @@ public void testRejectCheckpointOnShardPrimaryMode() throws IOException { spy.onNewCheckpoint(aheadCheckpoint, spyShard); // Verify that checkpoint is not processed as shard is in PrimaryMode. - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(primaryShard); } + + public void testForceSegmentSyncHandler() throws Exception { + ForceSyncRequest forceSyncRequest = new ForceSyncRequest(1L, 1L, replicaShard.shardId()); + when(indicesService.getShardOrNull(forceSyncRequest.getShardId())).thenReturn(replicaShard); + TransportResponse response = transportService.submitRequest( + localNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + forceSyncRequest, + TransportRequestOptions.builder().withTimeout(TRANSPORT_TIMEOUT).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); + assertEquals(TransportResponse.Empty.INSTANCE, response); + } + + public void testForceSegmentSyncHandlerWithFailure() throws Exception { + IndexShard spyReplicaShard = spy(replicaShard); + ForceSyncRequest forceSyncRequest = new ForceSyncRequest(1L, 1L, replicaShard.shardId()); + when(indicesService.getShardOrNull(forceSyncRequest.getShardId())).thenReturn(spyReplicaShard); + IOException exception = new IOException("dummy failure"); + doThrow(exception).when(spyReplicaShard).finalizeReplication(any()); + + // prevent shard failure to avoid test setup assertion + doNothing().when(spyReplicaShard).failShard(eq("replication failure"), any()); + Exception finalizeException = expectThrows(Exception.class, () -> { + transportService.submitRequest( + localNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + forceSyncRequest, + TransportRequestOptions.builder().withTimeout(TRANSPORT_TIMEOUT).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); + }); + Throwable nestedException = finalizeException.getCause().getCause(); + assertTrue(nestedException instanceof IOException); + assertTrue(nestedException.getMessage().contains("dummy failure")); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index b36dbdba40be8..fa7e8ada71746 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -108,7 +107,7 @@ public void setUp() throws Exception { spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), testSegmentInfos.version, - Codec.getDefault().getName() + indexShard.getLatestReplicationCheckpoint().getCodec() ); } @@ -141,7 +140,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -189,7 +188,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -232,7 +231,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -275,7 +274,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -320,7 +319,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -364,7 +363,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override @@ -416,7 +415,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a63ab1fc4ea13..250898ba1ce0e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1283,13 +1283,17 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * @param primaryShard {@link IndexShard} - The primary shard to replicate from. * @param target {@link IndexShard} - The target replica shard in segment replication. */ - public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { + public final SegmentReplicationTargetService prepareForReplication( + IndexShard primaryShard, + IndexShard target, + TransportService transportService, + IndicesService indicesService + ) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final IndicesService indicesService = mock(IndicesService.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - mock(TransportService.class), + transportService, sourceFactory, indicesService ); @@ -1302,7 +1306,7 @@ public void getCheckpointMetadata( ) { try { final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getLatestReplicationCheckpoint().getCodec()), primaryShard ); listener.onResponse( @@ -1356,9 +1360,13 @@ public final List replicateSegments(IndexShard primary } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { - final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); + final SegmentReplicationTargetService targetService = prepareForReplication( + primaryShard, + replica, + mock(TransportService.class), + mock(IndicesService.class) + ); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override