Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Mar 26, 2024
1 parent 0a5ef5b commit c517504
Show file tree
Hide file tree
Showing 44 changed files with 687 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,8 @@ public static final IndexShard newIndexShard(
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
false
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class DocrepToRemoteDualReplicationIT extends MigrationBaseTestCase {
private String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";
private String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica";
private String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteDualMigrationIT extends MigrationBaseTestCase {
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";
private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica";
private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(InternalSettingsPlugin.class);
}

/*
Scenario:
Expand All @@ -43,12 +52,10 @@ public class DocrepToRemoteDualReplicationIT extends MigrationBaseTestCase {
- Assert primary-replica consistency
*/
public void testRemotePrimaryDocRepReplica() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 2 docrep data nodes");
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

Expand Down Expand Up @@ -95,16 +102,8 @@ public void testRemotePrimaryDocRepReplica() throws Exception {
logger.info("---> Indexing another {} docs", secondBatch);
indexBulk(REMOTE_PRI_DOCREP_REP, secondBatch);
// Defensive check to ensure that doc count in replica shard catches up to the primary copy
flush(REMOTE_PRI_DOCREP_REP);
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(REMOTE_PRI_DOCREP_REP)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertReplicaAndPrimaryConsistencySingle(shardStatsMap, initialBatch, secondBatch, nodes);
refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REP);
assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REP, initialBatch, secondBatch);
}

/*
Expand All @@ -120,7 +119,6 @@ public void testRemotePrimaryDocRepReplica() throws Exception {
- Assert primary-replica consistency
*/
public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data nodes");
Expand All @@ -129,7 +127,10 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
Settings zeroReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
Settings zeroReplicas = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build();
createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, zeroReplicas);
ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP);
initDocRepToRemoteMigration();
Expand Down Expand Up @@ -187,24 +188,16 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
logger.info("---> Indexing another {} docs", secondBatch);
indexBulk(REMOTE_PRI_DOCREP_REMOTE_REP, secondBatch);
// Defensive check to ensure that doc count in replica shard catches up to the primary copy
flush(REMOTE_PRI_DOCREP_REMOTE_REP);

Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertReplicaAndPrimaryConsistencyMultiCopy(shardStatsMap, firstBatch, secondBatch, nodes);
refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REMOTE_REP);
assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REMOTE_REP, firstBatch, secondBatch);
}

public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Exception {
/*
Checks if retention leases are published on primary shard and it's docrep copies, but not on remote copies
*/
public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Exception {
testRemotePrimaryDocRepAndRemoteReplica();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
// Retention lease background sync job runs every 30mins.
// Waiting for 40 seconds in assertBusy to ensure that the background task runs
assertBusy(() -> {
for (ShardStats shardStats : internalCluster().client()
.admin()
Expand All @@ -227,7 +220,7 @@ public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Excepti
assertRetentionLeaseConsistency(shardStats, retentionLeases);
}
}
}, 40, TimeUnit.SECONDS);
});
}

/*
Expand All @@ -242,7 +235,6 @@ public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Excepti
- Index some more docs to ensure working of failed-over primary
*/
public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data nodes");
Expand Down Expand Up @@ -300,7 +292,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);

flush(FAILOVER_REMOTE_TO_DOCREP);
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP);
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
Expand All @@ -316,7 +308,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
assertReplicaAndPrimaryConsistencySingle(shardStatsMap, firstBatch, 0, nodes);
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_DOCREP, firstBatch, 0);

logger.info("---> Stop remote store enabled node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName));
Expand All @@ -336,10 +328,10 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {

logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs");
int secondBatch = randomIntBetween(1, 10);
logger.info("---> Indexing {} more docs", firstBatch);
logger.info("---> Indexing {} more docs", secondBatch);
indexingService = new SyncIndexingService(FAILOVER_REMOTE_TO_DOCREP, secondBatch);
indexingService.startIndexing();
flush(FAILOVER_REMOTE_TO_DOCREP);
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap();
assertEquals(1, shardStatsMap.size());
Expand All @@ -348,13 +340,88 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
}

private void assertReplicaAndPrimaryConsistencyMultiCopy(
Map<ShardRouting, ShardStats> shardStatsMap,
int firstBatch,
int secondBatch,
DiscoveryNodes nodes
) throws Exception {
/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 1 remote backed data node
- Move primary copy from docrep to remote through _cluster/reroute
- Expands index to 1 replica
- Stops remote enabled node
- Ensure doc count is same after failover
- Index some more docs to ensure working of failed-over primary
- Starts another remote node
- Move primary copy from docrep to remote through _cluster/reroute
- Ensure that remote store is seeded in the new remote node by asserting remote uploads from that node > 0
*/
public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() throws Exception {
testFailoverRemotePrimaryToDocrepReplica();

logger.info("---> Removing replica copy");
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings(FAILOVER_REMOTE_TO_DOCREP)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);

logger.info("---> Starting a new remote enabled node");
addRemote = true;
String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
internalCluster().client()
.admin()
.cluster()
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
.get()
.repositories()
.size(),
2
);

String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_DOCREP);
logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_DOCREP, 0, primaryShardHostingNode, remoteNodeName))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);

Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(FAILOVER_REMOTE_TO_DOCREP)
.get()
.asMap();
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertEquals(1, shardStatsMap.size());
shardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}
});
}

private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch, int secondBatch) throws Exception {
assertBusy(() -> {
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(indexName)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
CommonStats shardStats = shardStatsMap.get(shardRouting).getStats();
if (shardRouting.primary()) {
Expand Down Expand Up @@ -383,36 +450,6 @@ private void assertReplicaAndPrimaryConsistencyMultiCopy(
});
}

private void assertReplicaAndPrimaryConsistencySingle(
Map<ShardRouting, ShardStats> shardStatsMap,
int initialBatch,
int secondBatch,
DiscoveryNodes nodes
) throws Exception {
assertBusy(() -> {
long primaryDocCount = 0, replicaDocCount = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
CommonStats shardStats = shardStatsMap.get(shardRouting).getStats();
if (shardRouting.primary()) {
primaryDocCount = shardStats.getDocs().getCount();
assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
} else {
replicaDocCount = shardStats.getDocs().getCount();
assertFalse(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats();
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}
}
assertTrue(replicaDocCount > 0);
assertEquals(replicaDocCount, initialBatch + secondBatch);
assertEquals(primaryDocCount, replicaDocCount);
});
}

private static void assertRetentionLeaseConsistency(ShardStats shardStats, RetentionLeases retentionLeases) {
long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(retentionLeases.leases().stream().allMatch(l -> l.retainingSequenceNumber() == maxSeqNo + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,11 @@ protected ReplicationOperation.Replicas<BulkShardRequest> primaryTermValidationR
/**
* This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is
* used for primary term validation and is only relevant for TransportShardBulkAction replication action.
*
* <p>
* Visible for tests
* @opensearch.internal
*/
private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {
public final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {

@Override
public void performOn(
Expand Down Expand Up @@ -442,7 +443,7 @@ protected long primaryOperationSize(BulkShardRequest request) {

@Override
public ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.routingEntry().isAssignedToRemoteStoreNode()) {
if (indexShard.indexSettings().isRemoteNode()) {
return ReplicationMode.PRIMARY_TERM_VALIDATION;
}
return super.getReplicationMode(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.support.replication;

import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionListener;

Expand All @@ -31,14 +32,18 @@ public class ReplicationModeAwareProxy<ReplicaRequest extends ReplicationRequest
*/
private final ReplicationOperation.Replicas<ReplicaRequest> primaryTermValidationProxy;

private final DiscoveryNodes discoveryNodes;

public ReplicationModeAwareProxy(
ReplicationMode replicationModeOverride,
DiscoveryNodes discoveryNodes,
ReplicationOperation.Replicas<ReplicaRequest> replicasProxy,
ReplicationOperation.Replicas<ReplicaRequest> primaryTermValidationProxy
) {
super(replicasProxy);
this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride);
this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy);
this.discoveryNodes = discoveryNodes;
}

@Override
Expand All @@ -58,20 +63,20 @@ protected void performOnReplicaProxy(
}

@Override
ReplicationMode determineReplicationMode(ShardRouting targetShardRouting, ShardRouting primaryRouting) {
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {
// If the current routing is the primary, then it does not need to be replicated
if (targetShardRouting.isSameAllocation(primaryRouting)) {
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}
// Perform full replication during primary relocation
if (primaryRouting.relocating() && targetShardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}
/*
Perform full replication if replica is hosted on a non-remote node.
Only applicable during remote migration
*/
if (targetShardRouting.isAssignedToRemoteStoreNode() == false) {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() == false) {
return ReplicationMode.FULL_REPLICATION;
}
return replicationModeOverride;
Expand Down
Loading

0 comments on commit c517504

Please sign in to comment.