Skip to content

Commit

Permalink
[Segment Replication] Use deterministic mechanism to have concurrent …
Browse files Browse the repository at this point in the history
…invocation of segment replication (opensearch-project#8937)

* [Segment Replication] Use deterministic mechanism to have concurrent invocation of segment replication

Signed-off-by: Suraj Singh <[email protected]>

* Clean up

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored and baba-devv committed Jul 28, 2023
1 parent f35f32c commit 1317a7f
Showing 1 changed file with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.indices.recovery.ForceSyncRequest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
Expand All @@ -49,6 +50,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
Expand All @@ -70,10 +72,7 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private IndexShard replicaShard;
private IndexShard primaryShard;
private ReplicationCheckpoint checkpoint;
private SegmentReplicationSource replicationSource;
private SegmentReplicationTargetService sut;

private ReplicationCheckpoint initialCheckpoint;
private ReplicationCheckpoint aheadCheckpoint;

private ReplicationCheckpoint newPrimaryCheckpoint;
Expand All @@ -83,11 +82,10 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private DiscoveryNode localNode;

private IndicesService indicesService;
private ClusterService clusterService;

private SegmentReplicationState state;

private static long TRANSPORT_TIMEOUT = 30000;// 30sec
private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

@Override
public void setUp() throws Exception {
Expand All @@ -107,9 +105,6 @@ public void setUp() throws Exception {
0L,
replicaShard.getLatestReplicationCheckpoint().getCodec()
);
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource);

testThreadPool = new TestThreadPool("test", Settings.EMPTY);
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
Expand All @@ -126,7 +121,7 @@ public void setUp() throws Exception {
transportService.acceptIncomingRequests();

indicesService = mock(IndicesService.class);
clusterService = mock(ClusterService.class);
ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
RoutingTable mockRoutingTable = mock(RoutingTable.class);
when(clusterService.state()).thenReturn(clusterState);
Expand All @@ -135,7 +130,7 @@ public void setUp() throws Exception {

when(clusterState.nodes()).thenReturn(DiscoveryNodes.builder().add(localNode).build());
sut = prepareForReplication(primaryShard, replicaShard, transportService, indicesService, clusterService);
initialCheckpoint = replicaShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint initialCheckpoint = replicaShard.getLatestReplicationCheckpoint();
aheadCheckpoint = new ReplicationCheckpoint(
initialCheckpoint.getShardId(),
initialCheckpoint.getPrimaryTerm(),
Expand Down Expand Up @@ -242,7 +237,46 @@ public void testAlreadyOnNewCheckpoint() {
}

public void testShardAlreadyReplicating() {
sut.startReplication(replicaShard, mock(SegmentReplicationTargetService.SegmentReplicationListener.class));
CountDownLatch blockGetCheckpointMetadata = new CountDownLatch(1);
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
try {
blockGetCheckpointMetadata.await();
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primaryShard.shardId(), primaryShard.getLatestReplicationCheckpoint().getCodec()),
primaryShard
);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
};
final SegmentReplicationTarget target = spy(
new SegmentReplicationTarget(replicaShard, source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class))
);
// Start first round of segment replication.
sut.startReplication(target);

// Start second round of segment replication, this should fail to start as first round is still in-progress
sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Expand All @@ -255,6 +289,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
assertFalse(sendShardFailure);
}
});
blockGetCheckpointMetadata.countDown();
}

public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws InterruptedException {
Expand Down

0 comments on commit 1317a7f

Please sign in to comment.