From c517504017ee72ae0dbd67234926ef7e416c0824 Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Tue, 26 Mar 2024 19:59:34 +0530 Subject: [PATCH] Addressing comments Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../opensearch/index/shard/IndexShardIT.java | 3 +- ...tionIT.java => RemoteDualMigrationIT.java} | 191 +++++++++++------- .../action/bulk/TransportShardBulkAction.java | 7 +- .../ReplicationModeAwareProxy.java | 13 +- .../TransportReplicationAction.java | 11 +- .../cluster/routing/ShardRouting.java | 9 - .../org/opensearch/index/IndexService.java | 7 +- .../index/remote/RemoteStoreUtils.java | 1 + .../index/seqno/ReplicationTracker.java | 55 ++--- .../opensearch/index/shard/IndexShard.java | 46 ++++- .../opensearch/indices/IndicesService.java | 7 +- .../cluster/IndicesClusterStateService.java | 31 +-- .../RecoverySourceHandlerFactory.java | 3 +- .../checkpoint/PublishCheckpointAction.java | 5 +- ...portVerifyShardBeforeCloseActionTests.java | 32 --- .../flush/TransportShardFlushActionTests.java | 32 --- ...sportVerifyShardIndexBlockActionTests.java | 23 --- .../TransportShardRefreshActionTests.java | 32 --- .../bulk/TransportShardBulkActionTests.java | 29 +-- ...TransportResyncReplicationActionTests.java | 23 --- .../ReplicationModeAwareProxyTests.java | 130 ++++++++++++ .../ReplicationOperationTests.java | 73 ++++--- .../TransportReplicationActionTests.java | 7 + .../index/remote/RemoteStoreTestsHelper.java | 23 +-- .../RetentionLeasesReplicationTests.java | 4 +- .../GlobalCheckpointSyncActionTests.java | 14 -- ...PeerRecoveryRetentionLeaseExpiryTests.java | 3 +- ...ReplicationTrackerRetentionLeaseTests.java | 48 +++-- .../seqno/ReplicationTrackerTestCase.java | 23 ++- .../index/seqno/ReplicationTrackerTests.java | 32 +-- ...tentionLeaseBackgroundSyncActionTests.java | 23 --- .../seqno/RetentionLeaseSyncActionTests.java | 23 --- .../index/shard/IndexShardTests.java | 21 +- .../shard/PrimaryReplicaSyncerTests.java | 6 +- ...overyWithRemoteTranslogOnPrimaryTests.java | 17 +- ...dicesLifecycleListenerSingleNodeTests.java | 4 +- ...actIndicesClusterStateServiceTestCase.java | 7 +- .../PeerRecoverySourceServiceTests.java | 5 +- .../PeerRecoveryTargetServiceTests.java | 5 +- .../PublishCheckpointActionTests.java | 25 +-- .../index/engine/EngineTestCase.java | 3 +- ...enSearchIndexLevelReplicationTestCase.java | 44 +++- .../index/shard/IndexShardTestCase.java | 65 ++++-- .../index/shard/IndexShardTestUtils.java | 67 ++++++ 44 files changed, 687 insertions(+), 545 deletions(-) rename server/src/internalClusterTest/java/org/opensearch/remotemigration/{DocrepToRemoteDualReplicationIT.java => RemoteDualMigrationIT.java} (75%) create mode 100644 server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java create mode 100644 test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 7e0c1630a76e4..a76fc3af91cce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -714,7 +714,8 @@ public static final IndexShard newIndexShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, nodeId, null, - false + false, + IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting) ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java similarity index 75% rename from server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java index 2599c021a0fbd..9e0fbaaeda1e2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocrepToRemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualMigrationIT.java @@ -16,21 +16,30 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import java.util.Collection; +import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) -public class DocrepToRemoteDualReplicationIT extends MigrationBaseTestCase { - private String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; - private String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; - private String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteDualMigrationIT extends MigrationBaseTestCase { + private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; + private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica"; + private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep"; + + @Override + protected Collection> nodePlugins() { + return List.of(InternalSettingsPlugin.class); + } /* Scenario: @@ -43,12 +52,10 @@ public class DocrepToRemoteDualReplicationIT extends MigrationBaseTestCase { - Assert primary-replica consistency */ public void testRemotePrimaryDocRepReplica() throws Exception { - internalCluster().setBootstrapClusterManagerNodeIndex(0); internalCluster().startClusterManagerOnlyNode(); logger.info("---> Starting 2 docrep data nodes"); - internalCluster().startDataOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNodes(2); internalCluster().validateClusterFormed(); assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); @@ -95,16 +102,8 @@ public void testRemotePrimaryDocRepReplica() throws Exception { logger.info("---> Indexing another {} docs", secondBatch); indexBulk(REMOTE_PRI_DOCREP_REP, secondBatch); // Defensive check to ensure that doc count in replica shard catches up to the primary copy - flush(REMOTE_PRI_DOCREP_REP); - Map shardStatsMap = internalCluster().client() - .admin() - .indices() - .prepareStats(REMOTE_PRI_DOCREP_REP) - .setDocs(true) - .get() - .asMap(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - assertReplicaAndPrimaryConsistencySingle(shardStatsMap, initialBatch, secondBatch, nodes); + refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REP); + assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REP, initialBatch, secondBatch); } /* @@ -120,7 +119,6 @@ public void testRemotePrimaryDocRepReplica() throws Exception { - Assert primary-replica consistency */ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { - internalCluster().setBootstrapClusterManagerNodeIndex(0); internalCluster().startClusterManagerOnlyNode(); logger.info("---> Starting 1 docrep data nodes"); @@ -129,7 +127,10 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); logger.info("---> Creating index with 0 replica"); - Settings zeroReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + Settings zeroReplicas = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build(); createIndex(REMOTE_PRI_DOCREP_REMOTE_REP, zeroReplicas); ensureGreen(REMOTE_PRI_DOCREP_REMOTE_REP); initDocRepToRemoteMigration(); @@ -187,24 +188,16 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { logger.info("---> Indexing another {} docs", secondBatch); indexBulk(REMOTE_PRI_DOCREP_REMOTE_REP, secondBatch); // Defensive check to ensure that doc count in replica shard catches up to the primary copy - flush(REMOTE_PRI_DOCREP_REMOTE_REP); - - Map shardStatsMap = internalCluster().client() - .admin() - .indices() - .prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP) - .setDocs(true) - .get() - .asMap(); - DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - assertReplicaAndPrimaryConsistencyMultiCopy(shardStatsMap, firstBatch, secondBatch, nodes); + refreshAndWaitForReplication(REMOTE_PRI_DOCREP_REMOTE_REP); + assertReplicaAndPrimaryConsistency(REMOTE_PRI_DOCREP_REMOTE_REP, firstBatch, secondBatch); } - public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Exception { + /* + Checks if retention leases are published on primary shard and it's docrep copies, but not on remote copies + */ + public void testRetentionLeasePresentOnDocrepReplicaButNotRemote() throws Exception { testRemotePrimaryDocRepAndRemoteReplica(); DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); - // Retention lease background sync job runs every 30mins. - // Waiting for 40 seconds in assertBusy to ensure that the background task runs assertBusy(() -> { for (ShardStats shardStats : internalCluster().client() .admin() @@ -227,7 +220,7 @@ public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Excepti assertRetentionLeaseConsistency(shardStats, retentionLeases); } } - }, 40, TimeUnit.SECONDS); + }); } /* @@ -242,7 +235,6 @@ public void testRetentionLeasePresentOnDocrepCopyButNotOnRemote() throws Excepti - Index some more docs to ensure working of failed-over primary */ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { - internalCluster().setBootstrapClusterManagerNodeIndex(0); internalCluster().startClusterManagerOnlyNode(); logger.info("---> Starting 1 docrep data nodes"); @@ -300,7 +292,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { ); ensureGreen(FAILOVER_REMOTE_TO_DOCREP); - flush(FAILOVER_REMOTE_TO_DOCREP); + refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP); Map shardStatsMap = internalCluster().client() .admin() .indices() @@ -316,7 +308,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); } } - assertReplicaAndPrimaryConsistencySingle(shardStatsMap, firstBatch, 0, nodes); + assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_DOCREP, firstBatch, 0); logger.info("---> Stop remote store enabled node"); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName)); @@ -336,10 +328,10 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs"); int secondBatch = randomIntBetween(1, 10); - logger.info("---> Indexing {} more docs", firstBatch); + logger.info("---> Indexing {} more docs", secondBatch); indexingService = new SyncIndexingService(FAILOVER_REMOTE_TO_DOCREP, secondBatch); indexingService.startIndexing(); - flush(FAILOVER_REMOTE_TO_DOCREP); + refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP); shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap(); assertEquals(1, shardStatsMap.size()); @@ -348,13 +340,88 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { ); } - private void assertReplicaAndPrimaryConsistencyMultiCopy( - Map shardStatsMap, - int firstBatch, - int secondBatch, - DiscoveryNodes nodes - ) throws Exception { + /* + Scenario: + - Starts 1 docrep backed data node + - Creates an index with 0 replica + - Starts 1 remote backed data node + - Move primary copy from docrep to remote through _cluster/reroute + - Expands index to 1 replica + - Stops remote enabled node + - Ensure doc count is same after failover + - Index some more docs to ensure working of failed-over primary + - Starts another remote node + - Move primary copy from docrep to remote through _cluster/reroute + - Ensure that remote store is seeded in the new remote node by asserting remote uploads from that node > 0 + */ + public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() throws Exception { + testFailoverRemotePrimaryToDocrepReplica(); + + logger.info("---> Removing replica copy"); + assertAcked( + internalCluster().client() + .admin() + .indices() + .prepareUpdateSettings(FAILOVER_REMOTE_TO_DOCREP) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + .get() + ); + ensureGreen(FAILOVER_REMOTE_TO_DOCREP); + + logger.info("---> Starting a new remote enabled node"); + addRemote = true; + String remoteNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals( + internalCluster().client() + .admin() + .cluster() + .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) + .get() + .repositories() + .size(), + 2 + ); + + String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_DOCREP); + logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, remoteNodeName); + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_DOCREP, 0, primaryShardHostingNode, remoteNodeName)) + .get() + ); + ensureGreen(FAILOVER_REMOTE_TO_DOCREP); + + Map shardStatsMap = internalCluster().client() + .admin() + .indices() + .prepareStats(FAILOVER_REMOTE_TO_DOCREP) + .get() + .asMap(); + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + assertEquals(1, shardStatsMap.size()); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + } + + private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch, int secondBatch) throws Exception { assertBusy(() -> { + Map shardStatsMap = internalCluster().client() + .admin() + .indices() + .prepareStats(indexName) + .setDocs(true) + .get() + .asMap(); + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); for (ShardRouting shardRouting : shardStatsMap.keySet()) { CommonStats shardStats = shardStatsMap.get(shardRouting).getStats(); if (shardRouting.primary()) { @@ -383,36 +450,6 @@ private void assertReplicaAndPrimaryConsistencyMultiCopy( }); } - private void assertReplicaAndPrimaryConsistencySingle( - Map shardStatsMap, - int initialBatch, - int secondBatch, - DiscoveryNodes nodes - ) throws Exception { - assertBusy(() -> { - long primaryDocCount = 0, replicaDocCount = 0; - for (ShardRouting shardRouting : shardStatsMap.keySet()) { - CommonStats shardStats = shardStatsMap.get(shardRouting).getStats(); - if (shardRouting.primary()) { - primaryDocCount = shardStats.getDocs().getCount(); - assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); - RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); - assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); - assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); - } else { - replicaDocCount = shardStats.getDocs().getCount(); - assertFalse(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); - RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats(); - assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); - assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); - } - } - assertTrue(replicaDocCount > 0); - assertEquals(replicaDocCount, initialBatch + secondBatch); - assertEquals(primaryDocCount, replicaDocCount); - }); - } - private static void assertRetentionLeaseConsistency(ShardStats shardStats, RetentionLeases retentionLeases) { long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); assertTrue(retentionLeases.leases().stream().allMatch(l -> l.retainingSequenceNumber() == maxSeqNo + 1)); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 2ecbfa1d19080..18cf4e33b8c0a 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -363,10 +363,11 @@ protected ReplicationOperation.Replicas primaryTermValidationR /** * This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is * used for primary term validation and is only relevant for TransportShardBulkAction replication action. - * + *

+ * Visible for tests * @opensearch.internal */ - private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { + public final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { @Override public void performOn( @@ -442,7 +443,7 @@ protected long primaryOperationSize(BulkShardRequest request) { @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.routingEntry().isAssignedToRemoteStoreNode()) { + if (indexShard.indexSettings().isRemoteNode()) { return ReplicationMode.PRIMARY_TERM_VALIDATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index 6dd596642a0f0..bd7eb90c5dbb0 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -9,6 +9,7 @@ package org.opensearch.action.support.replication; import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.core.action.ActionListener; @@ -31,14 +32,18 @@ public class ReplicationModeAwareProxy primaryTermValidationProxy; + private final DiscoveryNodes discoveryNodes; + public ReplicationModeAwareProxy( ReplicationMode replicationModeOverride, + DiscoveryNodes discoveryNodes, ReplicationOperation.Replicas replicasProxy, ReplicationOperation.Replicas primaryTermValidationProxy ) { super(replicasProxy); this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride); this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy); + this.discoveryNodes = discoveryNodes; } @Override @@ -58,20 +63,20 @@ protected void performOnReplicaProxy( } @Override - ReplicationMode determineReplicationMode(ShardRouting targetShardRouting, ShardRouting primaryRouting) { + ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) { // If the current routing is the primary, then it does not need to be replicated - if (targetShardRouting.isSameAllocation(primaryRouting)) { + if (shardRouting.isSameAllocation(primaryRouting)) { return ReplicationMode.NO_REPLICATION; } // Perform full replication during primary relocation - if (primaryRouting.relocating() && targetShardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) { + if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) { return ReplicationMode.FULL_REPLICATION; } /* Perform full replication if replica is hosted on a non-remote node. Only applicable during remote migration */ - if (targetShardRouting.isAssignedToRemoteStoreNode() == false) { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() == false) { return ReplicationMode.FULL_REPLICATION; } return replicationModeOverride; diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 55b72777e536b..6451dae25b6d2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -356,7 +356,7 @@ public void performOn( * @return the overridden replication mode. */ public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.routingEntry().isAssignedToRemoteStoreNode()) { + if (indexShard.indexSettings().isRemoteNode()) { return ReplicationMode.NO_REPLICATION; } return ReplicationMode.FULL_REPLICATION; @@ -642,8 +642,13 @@ public void handleException(TransportException exp) { primaryRequest.getPrimaryTerm(), initialRetryBackoffBound, retryTimeout, - indexShard.routingEntry().isAssignedToRemoteStoreNode() - ? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy) + indexShard.indexSettings().isRemoteNode() + ? new ReplicationModeAwareProxy<>( + getReplicationMode(indexShard), + clusterState.getNodes(), + replicasProxy, + termValidationProxy + ) : new FanoutReplicationProxy<>(replicasProxy) ).execute(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 474dc712dfd77..743b0128fbc16 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -75,8 +75,6 @@ public class ShardRouting implements Writeable, ToXContentObject { private final long expectedShardSize; @Nullable private final ShardRouting targetRelocatingShard; - @Nullable - private Boolean assignedToRemoteStoreNode; /** * A constructor to internally create shard routing instances, note, the internal flag should only be set to true @@ -881,11 +879,4 @@ public boolean unassignedReasonIndexCreated() { return false; } - public boolean isAssignedToRemoteStoreNode() { - return assignedToRemoteStoreNode != null && assignedToRemoteStoreNode; - } - - public void setAssignedToRemoteStoreNode(boolean assignedToRemoteStoreNode) { - this.assignedToRemoteStoreNode = assignedToRemoteStoreNode; - } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 11dc4474cfa42..eb2ff39e4af19 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; @@ -461,7 +462,8 @@ public synchronized IndexShard createShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final RepositoriesService repositoriesService, final DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode + @Nullable DiscoveryNode sourceNode, + DiscoveryNodes discoveryNodes ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -551,7 +553,8 @@ public synchronized IndexShard createShard( clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), recoverySettings, - seedRemote + seedRemote, + discoveryNodes ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 6738b90d87222..b4c33d781af86 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -100,4 +100,5 @@ public static void verifyNoMultipleWriters(List mdFiles, Function isShardOnRemoteEnabledNode; + /** * Get all retention leases tracked on this shard. * @@ -957,7 +959,7 @@ private boolean invariant() { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId()); - if (cps.tracked && cps.replicated && shardRouting.isAssignedToRemoteStoreNode() == false) { + if (cps.tracked && cps.replicated) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( @@ -999,7 +1001,8 @@ public ReplicationTracker( final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, final BiConsumer> onSyncRetentionLeases, - final Supplier safeCommitInfoSupplier + final Supplier safeCommitInfoSupplier, + final Function checkForRemoteDiscoveryNode ) { this( shardId, @@ -1011,7 +1014,8 @@ public ReplicationTracker( currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, - x -> {} + x -> {}, + checkForRemoteDiscoveryNode ); } @@ -1037,7 +1041,8 @@ public ReplicationTracker( final LongSupplier currentTimeMillisSupplier, final BiConsumer> onSyncRetentionLeases, final Supplier safeCommitInfoSupplier, - final Consumer onReplicationGroupUpdated + final Consumer onReplicationGroupUpdated, + final Function isShardOnRemoteEnabledNode ) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; @@ -1060,6 +1065,7 @@ public ReplicationTracker( this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null; + this.isShardOnRemoteEnabledNode = isShardOnRemoteEnabledNode; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -1088,11 +1094,12 @@ private ReplicationGroup calculateReplicationGroup() { } else { newVersion = replicationGroup.getVersion() + 1; } - - assert indexSettings().isRemoteTranslogStoreEnabled() + assert indexSettings.isRemoteNode() // Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node || (replicationGroup != null - && replicationGroup.getReplicationTargets().stream().anyMatch(ShardRouting::isAssignedToRemoteStoreNode)) + && replicationGroup.getReplicationTargets() + .stream() + .anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))) || checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated) : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION"; @@ -1252,13 +1259,8 @@ private void createReplicationLagTimers() { && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false && isPrimaryRelocation(allocationId) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) - /* - Handle remote store migration cases. Replication Lag timers would be created if the node on which primary is hosted is either: - - Segrep enabled without remote store - - Destination replica shard is hosted on a remote store enabled node (Remote store enabled nodes have segrep enabled implicitly) - */ && (indexSettings.isSegRepLocalEnabled() == true - || routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) { + || isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer()); logger.trace( () -> new ParameterizedMessage( @@ -1376,9 +1378,7 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) - || indexSettings().isRemoteTranslogStoreEnabled() - || primaryShard.isAssignedToRemoteStoreNode()) { + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) || indexSettings.isRemoteNode()) { assert primaryShard.allocationId().getId().equals(shardAllocationId) : routingTable.assignedShards() + " vs " + shardAllocationId; @@ -1457,8 +1457,9 @@ public synchronized void updateFromClusterManager( + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId) - .isAssignedToRemoteStoreNode(); + final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() + || (routingTable.getByAllocationId(initializingId) != null + && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId())); checkpoints.put( initializingId, new CheckpointState( @@ -1478,7 +1479,9 @@ public synchronized void updateFromClusterManager( for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - final boolean assignedToRemoteStoreNode = routingTable.getByAllocationId(initializingId).isAssignedToRemoteStoreNode(); + final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() + || (routingTable.getByAllocationId(initializingId) != null + && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(initializingId).currentNodeId())); checkpoints.put( initializingId, new CheckpointState( @@ -1494,8 +1497,9 @@ public synchronized void updateFromClusterManager( final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; // Handling cases where node leaves the cluster but the insyncAids are not yet updated - final ShardRouting shardRouting = routingTable.getByAllocationId(inSyncId); - final boolean assignedToRemoteStoreNode = shardRouting != null && shardRouting.isAssignedToRemoteStoreNode() == true; + final boolean assignedToRemoteStoreNode = indexSettings.isRemoteNode() + || (routingTable.getByAllocationId(inSyncId) != null + && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(inSyncId).currentNodeId())); checkpoints.put( inSyncId, new CheckpointState( @@ -1534,12 +1538,11 @@ private boolean isReplicated( String primaryTargetAllocationId, boolean assignedToRemoteStoreNode ) { - /* - - If assigned to a remote node, returns true if given allocation id matches the primary or it's relocation target allocation primary and primary target allocation id. - - During an ongoing remote migration, the above-mentioned checks are considered when the shard is assigned to a remote store backed node - */ + // If assigned to a remote node, returns true if given allocation id matches the primary or it's relocation target allocation + // primary and primary target allocation id. if (assignedToRemoteStoreNode == true) { - return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId)); + boolean toReturn = allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId); + return toReturn; } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. return true; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2c7c9b55548d8..84e737a98b657 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -67,6 +67,8 @@ import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -355,6 +357,7 @@ Runnable getGlobalCheckpointSyncer() { On source remote node , it will be REMOTE_MIGRATING_UNSEEDED when relocating from docrep node */ private final ShardMigrationState shardMigrationState; + private DiscoveryNodes discoveryNodes; public IndexShard( final ShardRouting shardRouting, @@ -384,7 +387,8 @@ public IndexShard( final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, final RecoverySettings recoverySettings, - boolean seedRemote + boolean seedRemote, + final DiscoveryNodes discoveryNodes ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -446,7 +450,8 @@ public IndexShard( threadPool::absoluteTimeInMillis, (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), this::getSafeCommitInfo, - pendingReplicationActions + pendingReplicationActions, + isShardOnRemoteEnabledNode ); // the query cache is a node-level thing, however we want the most popular filters @@ -483,6 +488,7 @@ public boolean shouldCache(Query query) { this.recoverySettings = recoverySettings; this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); + this.discoveryNodes = discoveryNodes; } public ThreadPool getThreadPool() { @@ -503,6 +509,33 @@ public boolean shouldSeedRemoteStore() { return shardMigrationState == REMOTE_MIGRATING_UNSEEDED; } + /** + * To be delegated to {@link ReplicationTracker} so that relevant remote store based + * operations can be ignored during engine migration + *

+ * Has explicit null checks to ensure that the {@link ReplicationTracker#invariant()} + * checks does not fail during a cluster manager state update when the latest replication group + * calculation is not yet done and the cached replication group details are available + */ + public Function isShardOnRemoteEnabledNode = (shardId) -> { + DiscoveryNode discoveryNode = this.discoveryNodes.get(shardId); + if (discoveryNode != null) { + logger.trace( + "ShardID {} is assigned to Node {} which has remote_enabled as {}", + shardId, + discoveryNode, + discoveryNode.isRemoteStoreNode() + ); + return discoveryNode.isRemoteStoreNode(); + } + return false; + }; + + // Only to be used for Unit Tests + public void setDiscoveryNodes(DiscoveryNodes discoveryNodes) { + this.discoveryNodes = discoveryNodes; + } + public boolean isRemoteSeeded() { return shardMigrationState == REMOTE_MIGRATING_SEEDED; } @@ -609,8 +642,10 @@ public void updateShardState( final BiConsumer> primaryReplicaSyncer, final long applyingClusterStateVersion, final Set inSyncAllocationIds, - final IndexShardRoutingTable routingTable + final IndexShardRoutingTable routingTable, + DiscoveryNodes discoveryNodes ) throws IOException { + this.discoveryNodes = discoveryNodes; final ShardRouting currentRouting; synchronized (mutex) { currentRouting = this.shardRouting; @@ -3488,9 +3523,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * When remote translog is enabled for an index, replication operation is limited to primary term validation and does not * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ - assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) - || indexSettings.isRemoteTranslogStoreEnabled() - || indexSettings.isRemoteNode() : "supposedly in-sync shard copy received a global checkpoint [" + assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) || indexSettings.isRemoteNode() + : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "that is higher than its local checkpoint [" diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 9bc81c1826c2d..93589a9914f50 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -54,6 +54,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; @@ -1035,7 +1036,8 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final DiscoveryNodes discoveryNodes ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -1050,7 +1052,8 @@ public IndexShard createShard( remoteStoreStatsTrackerFactory, repositoriesService, targetNode, - sourceNode + sourceNode, + discoveryNodes ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 6190f20863b6e..e26aed52d563b 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -669,11 +669,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR try { final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); - DiscoveryNode targetNode = nodes.getLocalNode(); - // Set remote attributes on Shard routing if the target node for the shards has remote attributes - if (targetNode.isRemoteStoreNode()) { - shardRouting.setAssignedToRemoteStoreNode(true); - } + DiscoveryNode localNode = nodes.getLocalNode(); indicesService.createShard( shardRouting, checkpointPublisher, @@ -683,9 +679,10 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR failedShardHandler, globalCheckpointSyncer, retentionLeaseSyncer, - targetNode, + localNode, sourceNode, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + nodes ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -713,14 +710,14 @@ private void updateShard( primaryTerm = indexMetadata.primaryTerm(shard.shardId().id()); final Set inSyncIds = indexMetadata.inSyncAllocationIds(shard.shardId().id()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - updateShardRoutingWithNode(indexShardRoutingTable, nodes); shard.updateShardState( shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), inSyncIds, - indexShardRoutingTable + indexShardRoutingTable, + nodes ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -928,7 +925,8 @@ void updateShardState( BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, - IndexShardRoutingTable routingTable + IndexShardRoutingTable routingTable, + DiscoveryNodes discoveryNodes ) throws IOException; } @@ -1046,7 +1044,8 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + DiscoveryNodes discoveryNodes ) throws IOException; /** @@ -1102,14 +1101,4 @@ enum IndexRemovalReason { REOPENED, } } - - private void updateShardRoutingWithNode(IndexShardRoutingTable indexShardRoutingTable, DiscoveryNodes nodes) { - for (ShardRouting routing : indexShardRoutingTable.assignedShards()) { - if (routing.relocating()) { - final ShardRouting targetRouting = routing.getTargetRelocatingShard(); - targetRouting.setAssignedToRemoteStoreNode(nodes.get(targetRouting.currentNodeId()).isRemoteStoreNode()); - } - routing.setAssignedToRemoteStoreNode(nodes.get(routing.currentNodeId()).isRemoteStoreNode()); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java index 0ccb1ac2133cf..fbaa7d94c3129 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -24,7 +24,8 @@ public static RecoverySourceHandler create( RecoverySettings recoverySettings ) { boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false - && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()); + && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote()) + && request.targetNode().isRemoteStoreNode(); if (isReplicaRecoveryWithRemoteTranslog) { return new RemoteStorePeerRecoverySourceHandler( shard, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 9a94d23c7b030..2d5893d144e6a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -98,7 +98,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.routingEntry().isAssignedToRemoteStoreNode()) { + if (indexShard.indexSettings().isRemoteNode()) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); @@ -199,6 +199,9 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh Objects.requireNonNull(replica); ActionListener.completeWith(listener, () -> { logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); + if (replica.indexSettings().isRemoteNode() == false) { + logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp."); + } if (request.getCheckpoint().getShardId().equals(replica.shardId())) { replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 36b85d3d12591..5ca5f53f180be 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -83,7 +83,6 @@ import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.arrayWithSize; @@ -334,45 +333,14 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { TransportVerifyShardBeforeCloseAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportVerifyShardBeforeCloseAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigration() { - TransportVerifyShardBeforeCloseAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - TransportVerifyShardBeforeCloseAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - TransportVerifyShardBeforeCloseAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java index 10db1d6819389..c9d3a6c4c7605 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java @@ -21,7 +21,6 @@ import org.opensearch.transport.TransportService; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,45 +29,14 @@ public class TransportShardFlushActionTests extends OpenSearchTestCase { public void testGetReplicationModeWithRemoteTranslog() { TransportShardFlushAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardFlushAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigration() { - TransportShardFlushAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - TransportShardFlushAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - TransportShardFlushAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java index 147698a6f0c5d..90498d6d35700 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java @@ -21,7 +21,6 @@ import org.opensearch.transport.TransportService; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,36 +29,14 @@ public class TransportVerifyShardIndexBlockActionTests extends OpenSearchTestCas public void testGetReplicationModeWithRemoteTranslog() { TransportVerifyShardIndexBlockAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportVerifyShardIndexBlockAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - TransportVerifyShardIndexBlockAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - TransportVerifyShardIndexBlockAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java index 77d6828938490..bc0b7e5cf14b2 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java @@ -21,7 +21,6 @@ import org.opensearch.transport.TransportService; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,45 +29,14 @@ public class TransportShardRefreshActionTests extends OpenSearchTestCase { public void testGetReplicationModeWithRemoteTranslog() { TransportShardRefreshAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardRefreshAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigration() { - TransportShardRefreshAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - TransportShardRefreshAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - TransportShardRefreshAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 0d1b7cf1ac0af..6331861c3dcb9 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -107,6 +107,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; +import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; @@ -1237,38 +1238,14 @@ public void testHandlePrimaryTermValidationRequestSuccess() { public void testGetReplicationModeWithRemoteTranslog() { TransportShardBulkAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - ShardRouting mockShardRouting = mock(ShardRouting.class); - when(indexShard.routingEntry()).thenReturn(mockShardRouting); - when(mockShardRouting.isAssignedToRemoteStoreNode()).thenReturn(true); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); assertEquals(ReplicationMode.PRIMARY_TERM_VALIDATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { TransportShardBulkAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - ShardRouting mockShardRouting = mock(ShardRouting.class); - when(indexShard.routingEntry()).thenReturn(mockShardRouting); - when(mockShardRouting.isAssignedToRemoteStoreNode()).thenReturn(false); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringDualReplicationRemoteStore() { - TransportShardBulkAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - ShardRouting mockShardRouting = mock(ShardRouting.class); - when(indexShard.routingEntry()).thenReturn(mockShardRouting); - when(mockShardRouting.isAssignedToRemoteStoreNode()).thenReturn(true); - assertEquals(ReplicationMode.PRIMARY_TERM_VALIDATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringDualReplicationDocrep() { - TransportShardBulkAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - ShardRouting mockShardRouting = mock(ShardRouting.class); - when(indexShard.routingEntry()).thenReturn(mockShardRouting); - when(mockShardRouting.isAssignedToRemoteStoreNode()).thenReturn(false); + when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index 90c3b49838128..a2fefd6278321 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -85,7 +85,6 @@ import static java.util.Collections.emptyMap; import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.opensearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; @@ -235,36 +234,14 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { final TransportResyncReplicationAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final TransportResyncReplicationAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - final TransportResyncReplicationAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - final TransportResyncReplicationAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java new file mode 100644 index 0000000000000..b977cc741b731 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationModeAwareProxyTests.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.replication; + +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShardTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; + +public class ReplicationModeAwareProxyTests extends OpenSearchTestCase { + public void testDetermineReplicationModeTargetRoutingCurrentPrimary() { + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + null, + true, + ShardRoutingState.STARTED, + AllocationId.newInitializing("abc") + ); + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + null, + true, + ShardRoutingState.STARTED, + AllocationId.newInitializing("abc") + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + ReplicationMode.NO_REPLICATION, + DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode("dummy-node")).build(), + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class) + ); + assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); + } + + public void testDetermineReplicationModeTargetRoutingRelocatingPrimary() { + AllocationId primaryId = AllocationId.newRelocation(AllocationId.newInitializing()); + AllocationId relocationTargetId = AllocationId.newTargetRelocation(primaryId); + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node-2", + null, + true, + ShardRoutingState.INITIALIZING, + relocationTargetId + ); + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + "dummy-node-2", + true, + ShardRoutingState.RELOCATING, + primaryId + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + ReplicationMode.NO_REPLICATION, + DiscoveryNodes.builder() + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) + .build(), + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class) + ); + assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); + } + + public void testDetermineReplicationModeTargetRoutingDocrepShard() { + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + false, + ShardRoutingState.STARTED + ); + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node-2", + true, + ShardRoutingState.STARTED + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + ReplicationMode.NO_REPLICATION, + DiscoveryNodes.builder() + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) + .add(IndexShardTestUtils.getFakeDiscoNode(targetRouting.currentNodeId())) + .build(), + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class) + ); + assertEquals(ReplicationMode.FULL_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); + } + + public void testDetermineReplicationModeTargetRoutingRemoteShard() { + ShardRouting primaryRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node", + false, + ShardRoutingState.STARTED + ); + ShardRouting targetRouting = TestShardRouting.newShardRouting( + new ShardId(new Index("test_index", "_na_"), 0), + "dummy-node-2", + true, + ShardRoutingState.STARTED + ); + final ReplicationModeAwareProxy replicationModeAwareProxy = new ReplicationModeAwareProxy( + ReplicationMode.NO_REPLICATION, + DiscoveryNodes.builder() + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(targetRouting.currentNodeId())) + .add(IndexShardTestUtils.getFakeRemoteEnabledNode(primaryRouting.currentNodeId())) + .build(), + mock(TransportReplicationAction.ReplicasProxy.class), + mock(TransportReplicationAction.ReplicasProxy.class) + ); + assertEquals(ReplicationMode.NO_REPLICATION, replicationModeAwareProxy.determineReplicationMode(targetRouting, primaryRouting)); + } +} diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index b6c666a0c95b8..28be9507e70bd 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -59,6 +59,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShardNotStartedException; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.node.NodeClosedException; import org.opensearch.test.OpenSearchTestCase; @@ -209,16 +210,11 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception { ShardRoutingState.STARTED, primaryId ); - primaryShard.setAssignedToRemoteStoreNode(true); initializingIds.forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId); - routing.setAssignedToRemoteStoreNode(true); - builder.addShard(routing); + builder.addShard(newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId)); }); activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> { - ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); - routing.setAssignedToRemoteStoreNode(true); - builder.addShard(routing); + builder.addShard(newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId)); }); builder.addShard(primaryShard); IndexShardRoutingTable routingTable = builder.build(); @@ -242,7 +238,12 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception { listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) + new ReplicationModeAwareProxy<>( + ReplicationMode.NO_REPLICATION, + buildRemoteStoreEnabledDiscoveryNodes(routingTable), + replicasProxy, + replicasProxy + ) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -275,23 +276,11 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex ShardRoutingState.RELOCATING, primaryId ); - primaryShard.setAssignedToRemoteStoreNode(true); initializingIds.forEach(aId -> { - ShardRouting shardRouting = newShardRouting( - shardId, - nodeIdFromAllocationId(aId), - null, - false, - ShardRoutingState.INITIALIZING, - aId - ); - shardRouting.setAssignedToRemoteStoreNode(true); - builder.addShard(shardRouting); + builder.addShard(newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId)); }); activeIds.forEach(aId -> { - ShardRouting shardRouting = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); - shardRouting.setAssignedToRemoteStoreNode(true); - builder.addShard(shardRouting); + builder.addShard(newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId)); }); builder.addShard(primaryShard); IndexShardRoutingTable routingTable = builder.build(); @@ -319,7 +308,12 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) + new ReplicationModeAwareProxy<>( + ReplicationMode.NO_REPLICATION, + buildRemoteStoreEnabledDiscoveryNodes(routingTable), + replicasProxy, + replicasProxy + ) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -413,18 +407,12 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception { ShardRoutingState.STARTED, primaryId ); - // Marking primary as assigned to remote - primaryShard.setAssignedToRemoteStoreNode(true); initializingIds.forEach(aId -> { ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.INITIALIZING, aId); - // Ensuring all other shard copies are docrep - routing.setAssignedToRemoteStoreNode(false); builder.addShard(routing); }); activeIds.stream().filter(aId -> !aId.equals(primaryId)).forEach(aId -> { ShardRouting routing = newShardRouting(shardId, nodeIdFromAllocationId(aId), null, false, ShardRoutingState.STARTED, aId); - // Ensuring all other shard copies are docrep - routing.setAssignedToRemoteStoreNode(false); builder.addShard(routing); }); builder.addShard(primaryShard); @@ -449,7 +437,12 @@ public void testReplicationInDualModeWithDocrepReplica() throws Exception { listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) + new ReplicationModeAwareProxy<>( + ReplicationMode.NO_REPLICATION, + buildMixedModeEnabledDiscoveryNodes(routingTable), + replicasProxy, + replicasProxy + ) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -901,6 +894,26 @@ private Set getExpectedReplicas(ShardId shardId, ClusterState stat return expectedReplicas; } + private DiscoveryNodes buildRemoteStoreEnabledDiscoveryNodes(IndexShardRoutingTable routingTable) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (ShardRouting shardRouting : routingTable) { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); + } + return builder.build(); + } + + private DiscoveryNodes buildMixedModeEnabledDiscoveryNodes(IndexShardRoutingTable routingTable) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (ShardRouting shardRouting : routingTable) { + if (shardRouting.primary()) { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(shardRouting.currentNodeId())); + } else { + builder.add(IndexShardTestUtils.getFakeDiscoNode(shardRouting.currentNodeId())); + } + } + return builder.build(); + } + public static class Request extends ReplicationRequest { public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean(); diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java index dad0fa0efd3ec..4a18778cc0b2b 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java @@ -78,6 +78,7 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; +import org.opensearch.index.remote.RemoteStoreTestsHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexShardState; @@ -1589,9 +1590,15 @@ private IndexService mockIndexService(final IndexMetadata indexMetadata, Cluster @SuppressWarnings("unchecked") private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { + return mockIndexShard(shardId, clusterService, false); + } + + @SuppressWarnings("unchecked") + private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService, boolean remote) { final IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(shardId); when(indexShard.state()).thenReturn(IndexShardState.STARTED); + when(indexShard.indexSettings()).thenReturn(RemoteStoreTestsHelper.createIndexSettings(remote)); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; if (isPrimaryMode.get()) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java index 5d9580518677b..043b4493e8989 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreTestsHelper.java @@ -9,9 +9,6 @@ package org.opensearch.index.remote; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; @@ -21,7 +18,6 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; -import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,27 +40,20 @@ static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) } public static IndexSettings createIndexSettings(boolean remote) { + return createIndexSettings(remote, Settings.EMPTY); + } + + public static IndexSettings createIndexSettings(boolean remote, Settings settings) { IndexSettings indexSettings; if (remote) { Settings nodeSettings = Settings.builder() .put("node.name", "xyz") .put("node.attr.remote_store.translog.repository", "seg_repo") .build(); - indexSettings = IndexSettingsModule.newIndexSettings(new Index("test_index", "_na_"), Settings.EMPTY, nodeSettings); + indexSettings = IndexSettingsModule.newIndexSettings(new Index("test_index", "_na_"), settings, nodeSettings); } else { - indexSettings = IndexSettingsModule.newIndexSettings("test_index", Settings.EMPTY); + indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); } return indexSettings; } - - public static ShardRouting createShardRouting(boolean isPrimary, boolean remote) { - ShardRouting shardRouting = TestShardRouting.newShardRouting( - new ShardId(new Index("test_index", "_na_"), 0), - randomAlphaOfLength(4), - isPrimary, - ShardRoutingState.STARTED - ); - shardRouting.setAssignedToRemoteStoreNode(remote); - return shardRouting; - } } diff --git a/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java index 8c59e92a3fe8a..904c9a70e61e0 100644 --- a/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RetentionLeasesReplicationTests.java @@ -45,6 +45,7 @@ import org.opensearch.index.seqno.RetentionLeaseUtils; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.test.VersionUtils; import java.util.ArrayList; @@ -182,7 +183,8 @@ public void testTurnOffTranslogRetentionAfterAllShardStarted() throws Exception null, 1L, group.getPrimary().getReplicationGroup().getInSyncAllocationIds(), - group.getPrimary().getReplicationGroup().getRoutingTable() + group.getPrimary().getReplicationGroup().getRoutingTable(), + IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) ); } group.syncGlobalCheckpoint(); diff --git a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java index 0e10ab70f292f..47cf025d3ec68 100644 --- a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -55,7 +55,6 @@ import java.util.Collections; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -160,30 +159,17 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { public void testGetReplicationModeWithRemoteTranslog() { final GlobalCheckpointSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final GlobalCheckpointSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } - public void testGetReplicationModeDuringMigration() { - final GlobalCheckpointSyncAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - private GlobalCheckpointSyncAction createAction() { final IndicesService indicesService = mock(IndicesService.class); return new GlobalCheckpointSyncAction( diff --git a/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java index ca80c7b9c4884..7a9f1d7baa12e 100644 --- a/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -93,7 +93,8 @@ public void setUpReplicationTracker() throws InterruptedException { value -> {}, currentTimeMillis::get, (leases, listener) -> {}, - () -> safeCommitInfo + () -> safeCommitInfo, + sId -> false ); replicationTracker.updateFromClusterManager( 1L, diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 3cd60ac973709..fdbe89422a2aa 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -84,7 +84,8 @@ public void testAddOrRenewRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -132,7 +133,8 @@ public void testAddDuplicateRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -164,7 +166,8 @@ public void testRenewNotFoundRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -202,7 +205,8 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { equalTo(retainingSequenceNumbers) ); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); reference.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -241,7 +245,8 @@ public void testRemoveRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -305,7 +310,8 @@ public void testCloneRetentionLease() { assertTrue(synced.compareAndSet(false, true)); listener.onResponse(new ReplicationResponse()); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTrackerRef.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -351,7 +357,8 @@ public void testCloneNonexistentRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -380,7 +387,8 @@ public void testCloneDuplicateRetentionLease() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -413,7 +421,8 @@ public void testRemoveNotFound() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -451,7 +460,8 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { equalTo(retainingSequenceNumbers) ); }, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); reference.set(replicationTracker); replicationTracker.updateFromClusterManager( @@ -504,7 +514,8 @@ private void runExpirationTest(final boolean primaryMode) { value -> {}, currentTimeMillis::get, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -583,7 +594,8 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -636,7 +648,8 @@ public void testLoadAndPersistRetentionLeases() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -671,7 +684,8 @@ public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -724,7 +738,8 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), @@ -788,7 +803,8 @@ public void testRenewLeaseWithLowerRetainingSequenceNumber() throws Exception { value -> {}, () -> 0L, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); replicationTracker.updateFromClusterManager( randomNonNegativeLong(), diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java index e61d27695a5e5..daeefeff59c94 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTestCase.java @@ -40,11 +40,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.SafeCommitInfo; +import org.opensearch.index.remote.RemoteStoreTestsHelper; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import java.util.Collections; import java.util.Set; +import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -57,18 +59,20 @@ ReplicationTracker newTracker( final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint, final LongSupplier currentTimeMillisSupplier, - final Settings settings + final Settings settings, + final boolean remote ) { return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), - IndexSettingsModule.newIndexSettings("test", settings), + remote ? RemoteStoreTestsHelper.createIndexSettings(true, settings) : IndexSettingsModule.newIndexSettings("test", settings), randomNonNegativeLong(), UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, (leases, listener) -> {}, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + remote ? REMOTE_DISCOVERY_NODE : NON_REMOTE_DISCOVERY_NODE ); } @@ -80,8 +84,21 @@ ReplicationTracker newTracker( return newTracker(allocationId, updatedGlobalCheckpoint, currentTimeMillisSupplier, Settings.EMPTY); } + ReplicationTracker newTracker( + final AllocationId allocationId, + final LongConsumer updatedGlobalCheckpoint, + final LongSupplier currentTimeMillisSupplier, + final Settings settings + ) { + return newTracker(allocationId, updatedGlobalCheckpoint, currentTimeMillisSupplier, settings, false); + } + static final Supplier OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY; + static final Function NON_REMOTE_DISCOVERY_NODE = shardId -> false; + + static final Function REMOTE_DISCOVERY_NODE = shardId -> true; + static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 7971591e82bab..233a99cbe4a73 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -446,6 +446,10 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); + private ReplicationTracker newTracker(final AllocationId allocationId, Settings settings, boolean remote) { + return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L, settings, remote); + } + private ReplicationTracker newTracker(final AllocationId allocationId, Settings settings) { return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L, settings); } @@ -759,7 +763,8 @@ public void testPrimaryContextHandoff() throws IOException { onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); ReplicationTracker newPrimary = new ReplicationTracker( shardId, @@ -770,7 +775,8 @@ public void testPrimaryContextHandoff() throws IOException { onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + NON_REMOTE_DISCOVERY_NODE ); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); @@ -1300,7 +1306,7 @@ public void testGlobalCheckpointUpdateWithRemoteTranslogEnabled() { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); long primaryLocalCheckpoint = activeWithCheckpoints.get(primaryId); @@ -1378,7 +1384,7 @@ public void testUpdateFromClusterManagerWithRemoteTranslogEnabled() { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); long primaryLocalCheckpoint = activeWithCheckpoints.get(primaryId); @@ -1476,7 +1482,7 @@ public void testMarkAllocationIdAsInSyncWithRemoteTranslogEnabled() throws Excep .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); tracker.activatePrimaryMode(localCheckpoint); @@ -1504,7 +1510,7 @@ public void testMissingActiveIdsDoesNotPreventAdvanceWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); tracker.updateFromClusterManager(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); List initializingRandomSubset = randomSubsetOf(initializing.keySet()); @@ -1537,7 +1543,7 @@ public void testMissingInSyncIdsDoesNotPreventAdvanceWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); tracker.updateFromClusterManager(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach( @@ -1606,8 +1612,8 @@ public void testInSyncIdsAreRemovedIfNotValidatedByClusterManagerWithRemoteTrans .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); - tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId)); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); + tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId)); tracker.activatePrimaryMode(NO_OPS_PERFORMED); if (randomBoolean()) { initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); @@ -1655,7 +1661,7 @@ public void testUpdateAllocationIdsFromClusterManagerWithRemoteTranslogEnabled() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .build(); - final ReplicationTracker tracker = newTracker(primaryId, settings); + final ReplicationTracker tracker = newTracker(primaryId, settings, true); tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); tracker.activatePrimaryMode(NO_OPS_PERFORMED); assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); @@ -2080,7 +2086,8 @@ public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOExcept onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + REMOTE_DISCOVERY_NODE ); ReplicationTracker newPrimary = new ReplicationTracker( shardId, @@ -2091,7 +2098,8 @@ public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOExcept onUpdate, () -> 0L, onNewRetentionLease, - OPS_BASED_RECOVERY_ALWAYS_REASONABLE + OPS_BASED_RECOVERY_ALWAYS_REASONABLE, + REMOTE_DISCOVERY_NODE ); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6d49d6b44ef1c..d5d7163b66698 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -61,7 +61,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -217,36 +216,14 @@ public void testBlocks() { public void testGetReplicationModeWithRemoteTranslog() { final RetentionLeaseBackgroundSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final RetentionLeaseBackgroundSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - final RetentionLeaseBackgroundSyncAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - final RetentionLeaseBackgroundSyncAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java index a08d42df8f773..7610b8bc39296 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -61,7 +61,6 @@ import static java.util.Collections.emptyMap; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -217,36 +216,14 @@ public void testBlocks() { public void testGetReplicationModeWithRemoteTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final RetentionLeaseSyncAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - final RetentionLeaseSyncAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - final RetentionLeaseSyncAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 537bfcf8f8a6b..e5bfa8caee79a 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -697,7 +697,8 @@ public void testPrimaryPromotionRollsGeneration() throws Exception { (shard, listener) -> {}, 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build() + new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), + IndexShardTestUtils.getFakeDiscoveryNodes(primaryRouting) ); /* @@ -764,7 +765,8 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), - new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build() + new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), + IndexShardTestUtils.getFakeDiscoveryNodes(primaryRouting) ); latch.await(); assertThat(indexShard.getActiveOperationsCount(), is(oneOf(0, IndexShard.OPERATIONS_BLOCKED))); @@ -1446,7 +1448,8 @@ public void onFailure(Exception e) { (s, r) -> resyncLatch.countDown(), 1L, Collections.singleton(newRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build() + new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build(), + IndexShardTestUtils.getFakeDiscoveryNodes(newRouting) ); resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); @@ -3284,7 +3287,7 @@ public void testRecoverFromTranslog() throws IOException { Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations); primary.markAsRecovering( "store", - new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) + new RecoveryState(primary.routingEntry(), IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) ); recoverFromStore(primary); @@ -4029,15 +4032,19 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { if (isPrimary) { newShard.markAsRecovering( "store", - new RecoveryState(newShard.routingEntry(), getFakeDiscoNode(newShard.routingEntry().currentNodeId()), null) + new RecoveryState( + newShard.routingEntry(), + IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()), + null + ) ); } else { newShard.markAsRecovering( "peer", new RecoveryState( newShard.routingEntry(), - getFakeDiscoNode(newShard.routingEntry().currentNodeId()), - getFakeDiscoNode(newShard.routingEntry().currentNodeId()) + IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()), + IndexShardTestUtils.getFakeDiscoNode(newShard.routingEntry().currentNodeId()) ) ); } diff --git a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java index b1bcaac2c1947..09903a8b44cb5 100644 --- a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java @@ -111,7 +111,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { null, 1000L, Collections.singleton(allocationId), - new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build() + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), + IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) ); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getLastKnownGlobalCheckpoint()); @@ -190,7 +191,8 @@ public void testSyncerOnClosingShard() throws Exception { null, 1000L, Collections.singleton(allocationId), - new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build() + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), + IndexShardTestUtils.getFakeDiscoveryNodes(shard.routingEntry()) ); CountDownLatch syncCalledLatch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index fc0c71895fa9f..20a82218e70f4 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -57,7 +57,6 @@ public void testStartSequenceForReplicaRecovery() throws Exception { replica.failShard("test", null); final ShardRouting replicaRouting = replica.routingEntry(); - replicaRouting.setAssignedToRemoteStoreNode(true); final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) .build(); @@ -66,16 +65,14 @@ public void testStartSequenceForReplicaRecovery() throws Exception { int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); shards.flush(); - ShardRouting newShardRouting = newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - false, - ShardRoutingState.INITIALIZING, - RecoverySource.PeerRecoverySource.INSTANCE - ); - newShardRouting.setAssignedToRemoteStoreNode(true); IndexShard newReplicaShard = newShard( - newShardRouting, + newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + false, + ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE + ), replica.shardPath(), newIndexMetadata, null, diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0e16e81b1bb70..0428bdf0655b0 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -34,6 +34,7 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; @@ -164,7 +165,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem null, null, localNode, - null + null, + DiscoveryNodes.builder().add(localNode).build() ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index c455101ff4549..0490228a5cc16 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -264,7 +265,8 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final DiscoveryNodes discoveryNodes ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); @@ -387,7 +389,8 @@ public void updateShardState( BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, - IndexShardRoutingTable routingTable + IndexShardRoutingTable routingTable, + DiscoveryNodes discoveryNodes ) throws IOException { failRandomly(); assertThat(this.shardId(), equalTo(shardRouting.shardId())); diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java index 4fbae4b0d53ca..ded174fb98eef 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -38,6 +38,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.test.NodeRoles; @@ -65,8 +66,8 @@ public void testDuplicateRecoveries() throws IOException { StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest( primary.shardId(), randomAlphaOfLength(10), - getFakeDiscoNode("source"), - getFakeDiscoNode("target"), + IndexShardTestUtils.getFakeDiscoNode("source"), + IndexShardTestUtils.getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 1e6cc43703672..a8e5a02011538 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -56,6 +56,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; @@ -92,8 +93,8 @@ public void testWriteFileChunksConcurrently() throws Exception { mdFiles.add(md); } final IndexShard targetShard = newShard(false); - final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); - final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); + final DiscoveryNode pNode = IndexShardTestUtils.getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); + final DiscoveryNode rNode = IndexShardTestUtils.getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, threadPool); final PlainActionFuture receiveFileInfoFuture = new PlainActionFuture<>(); diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 1a6d5ad45f2b7..7a67d7ffd1f5d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; -import static org.opensearch.index.remote.RemoteStoreTestsHelper.createShardRouting; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -130,7 +129,7 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - + when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( @@ -165,36 +164,14 @@ public void testPublishCheckpointActionOnReplica() { public void testGetReplicationModeWithRemoteTranslog() { final PublishCheckpointAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } public void testGetReplicationModeWithLocalTranslog() { final PublishCheckpointAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, false)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnRemotePrimary() { - final PublishCheckpointAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); - when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(true, true)); - assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); - } - - public void testGetReplicationModeDuringMigrationOnDocrepReplica() { - final PublishCheckpointAction action = createAction(); - final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); when(indexShard.indexSettings()).thenReturn(createIndexSettings(false)); - when(indexShard.routingEntry()).thenReturn(createShardRouting(false, false)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 43289a7c89524..1cb5501810c5d 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -898,7 +898,8 @@ public EngineConfig config( update -> {}, () -> 0L, (leases, listener) -> listener.onResponse(new ReplicationResponse()), - () -> SafeCommitInfo.EMPTY + () -> SafeCommitInfo.EMPTY, + sId -> false ); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index d7bf51cb0ff9f..6a1e2d138adb9 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -64,6 +64,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; @@ -96,6 +97,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.translog.Translog; @@ -108,6 +110,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -340,6 +343,30 @@ public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } + public synchronized DiscoveryNodes generateFakeDiscoveryNodes() { + DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(); + if (primary.indexSettings() != null && primary.indexSettings().isRemoteNode()) { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(primary.routingEntry().currentNodeId())); + } else { + builder.add(IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId())); + } + for (IndexShard replica: replicas) { + if (replica.indexSettings() != null && replica.indexSettings().isRemoteNode()) { + builder.add(IndexShardTestUtils.getFakeRemoteEnabledNode(replica.routingEntry().currentNodeId())); + } else { + builder.add(IndexShardTestUtils.getFakeDiscoNode(replica.routingEntry().currentNodeId())); + } + } + return builder.build(); + } + + public synchronized void updateDiscoveryNodesOnShards(DiscoveryNodes discoveryNodes) { + primary.setDiscoveryNodes(discoveryNodes); + for (IndexShard replica: replicas) { + replica.setDiscoveryNodes(discoveryNodes); + } + } + public synchronized int startReplicas(int numOfReplicasToStart) throws IOException { if (primary.routingEntry().initializing()) { startPrimary(); @@ -368,7 +395,6 @@ public void startPrimary(boolean remote) throws IOException { activeIds.addAll(activeIds()); activeIds.add(primary.routingEntry().allocationId().getId()); ShardRouting startedRoutingEntry = ShardRoutingHelper.moveToStarted(primary.routingEntry()); - startedRoutingEntry.setAssignedToRemoteStoreNode(remote); IndexShardRoutingTable routingTable = routingTable(shr -> shr == primary.routingEntry() ? startedRoutingEntry : shr); primary.updateShardState( startedRoutingEntry, @@ -376,7 +402,8 @@ public void startPrimary(boolean remote) throws IOException { null, currentClusterStateVersion.incrementAndGet(), activeIds, - routingTable + routingTable, + generateFakeDiscoveryNodes() ); for (final IndexShard replica : replicas) { recoverReplica(replica); @@ -390,9 +417,6 @@ public IndexShard addReplica() throws IOException { public IndexShard addReplica(Path remotePath) throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); - if (remotePath != null) { - replicaRouting.setAssignedToRemoteStoreNode(true); - } final IndexShard replica = newShard( replicaRouting, indexMetadata, @@ -413,6 +437,7 @@ assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllo if (replicationTargets != null) { replicationTargets.addReplica(replica); } + updateDiscoveryNodesOnShards(generateFakeDiscoveryNodes()); updateAllocationIDsOnPrimary(); } @@ -500,7 +525,8 @@ public synchronized void promoteReplicaToPrimary( primaryReplicaSyncer, currentClusterStateVersion.incrementAndGet(), activeIds(), - routingTable + routingTable, + generateFakeDiscoveryNodes() ); } @@ -646,14 +672,16 @@ public void syncGlobalCheckpoint() { } private void updateAllocationIDsOnPrimary() throws IOException { - primary.updateShardState( primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), activeIds(), - routingTable(Function.identity()) + routingTable(Function.identity()), + primary.indexSettings().isRemoteTranslogStoreEnabled() ? + IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable(Function.identity()).getShards()) : + IndexShardTestUtils.getFakeDiscoveryNodes(routingTable(Function.identity()).getShards()) ); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a2f9eb677c0ac..64006ae52fee8 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -618,12 +619,14 @@ protected IndexShard newShard( IndexingOperationListener... listeners ) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); + DiscoveryNodes discoveryNodes = IndexShardTestUtils.getFakeDiscoveryNodes(routing); // To simulate that the node is remote backed if (indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) == "true") { nodeSettings = Settings.builder() .put("node.name", routing.currentNodeId()) .put("node.attr.remote_store.translog.repository", "seg_repo") .build(); + discoveryNodes = DiscoveryNodes.builder().add(IndexShardTestUtils.getFakeRemoteEnabledNode(routing.currentNodeId())).build(); } final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings); final IndexShard indexShard; @@ -711,7 +714,8 @@ protected IndexShard newShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", DefaultRecoverySettings.INSTANCE, - false + false, + discoveryNodes ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -985,7 +989,7 @@ protected void closeShards(Iterable shards) throws IOException { protected void recoverShardFromStore(IndexShard primary) throws IOException { primary.markAsRecovering( "store", - new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) + new RecoveryState(primary.routingEntry(), IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()), null) ); recoverFromStore(primary); updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry())); @@ -1002,7 +1006,15 @@ public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRoutin null, currentClusterStateVersion.incrementAndGet(), inSyncIds, - newRoutingTable + newRoutingTable, + DiscoveryNodes.builder().add(new DiscoveryNode( + shardRouting.currentNodeId(), + shardRouting.currentNodeId(), + buildNewFakeTransportAddress(), + Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + )).build() ); } @@ -1016,17 +1028,6 @@ protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) th } } - protected DiscoveryNode getFakeDiscoNode(String id) { - return new DiscoveryNode( - id, - id, - buildNewFakeTransportAddress(), - Collections.emptyMap(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - } - protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, startReplica, getReplicationFunc(replica)); } @@ -1103,7 +1104,7 @@ protected void recoverReplica( * @param targetSupplier supplies an instance of {@link RecoveryTarget} * @param markAsRecovering set to {@code false} if the replica is marked as recovering */ - protected final void recoverUnstartedReplica( + public final void recoverUnstartedReplica( final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, @@ -1112,8 +1113,18 @@ protected final void recoverUnstartedReplica( final IndexShardRoutingTable routingTable, final Function, List> replicatePrimaryFunction ) throws IOException { - final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); - final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); + final DiscoveryNode pNode; + final DiscoveryNode rNode; + if (primary.isRemoteTranslogEnabled()) { + pNode = IndexShardTestUtils.getFakeRemoteEnabledNode(primary.routingEntry().currentNodeId()); + } else { + pNode = IndexShardTestUtils.getFakeDiscoNode(primary.routingEntry().currentNodeId()); + } + if (replica.isRemoteTranslogEnabled()) { + rNode = IndexShardTestUtils.getFakeRemoteEnabledNode(replica.routingEntry().currentNodeId()); + } else { + rNode = IndexShardTestUtils.getFakeDiscoNode(replica.routingEntry().currentNodeId()); + } if (markAsRecovering) { replica.markAsRecovering("remote", new RecoveryState(replica.routingEntry(), pNode, rNode)); } else { @@ -1154,7 +1165,10 @@ protected final void recoverUnstartedReplica( null, currentClusterStateVersion.incrementAndGet(), inSyncIds, - routingTable + routingTable, + primary.isRemoteTranslogEnabled() ? + IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.getShards()) : + IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.getShards()) ); try { PlainActionFuture future = new PlainActionFuture<>(); @@ -1188,7 +1202,10 @@ protected void startReplicaAfterRecovery( null, currentClusterStateVersion.incrementAndGet(), inSyncIdsWithReplica, - newRoutingTable + newRoutingTable, + primary.indexSettings.isRemoteTranslogStoreEnabled() ? + IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.shards()): + IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.shards()) ); replica.updateShardState( replica.routingEntry().moveToStarted(), @@ -1196,7 +1213,10 @@ protected void startReplicaAfterRecovery( null, currentClusterStateVersion.get(), inSyncIdsWithReplica, - newRoutingTable + newRoutingTable, + replica.indexSettings.isRemoteTranslogStoreEnabled() ? + IndexShardTestUtils.getFakeRemoteEnabledDiscoveryNodes(routingTable.shards()): + IndexShardTestUtils.getFakeDiscoveryNodes(routingTable.shards()) ); } @@ -1225,7 +1245,8 @@ protected void promoteReplica(IndexShard replica, Set inSyncIds, IndexSh ), currentClusterStateVersion.incrementAndGet(), inSyncIds, - newRoutingTable + newRoutingTable, + IndexShardTestUtils.getFakeDiscoveryNodes(routingEntry) ); } @@ -1369,7 +1390,7 @@ protected void recoverShardFromSnapshot(final IndexShard shard, final Snapshot s final Version version = Version.CURRENT; final ShardId shardId = shard.shardId(); final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID()); - final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId()); + final DiscoveryNode node = IndexShardTestUtils.getFakeDiscoNode(shard.routingEntry().currentNodeId()); final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), snapshot, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java new file mode 100644 index 0000000000000..0742b850a96fe --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestUtils.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IndexShardTestUtils { + public static DiscoveryNode getFakeDiscoNode(String id) { + return new DiscoveryNode( + id, + id, + IndexShardTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + } + + public static DiscoveryNode getFakeRemoteEnabledNode(String id) { + Map remoteNodeAttributes = new HashMap(); + remoteNodeAttributes.put(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test-repo"); + return new DiscoveryNode( + id, + id, + IndexShardTestCase.buildNewFakeTransportAddress(), + remoteNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + } + + public static DiscoveryNodes getFakeDiscoveryNodes(List shardRoutings) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (ShardRouting routing: shardRoutings) { + builder.add(getFakeDiscoNode(routing.currentNodeId())); + } + return builder.build(); + } + + public static DiscoveryNodes getFakeRemoteEnabledDiscoveryNodes(List shardRoutings) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (ShardRouting routing: shardRoutings) { + builder.add(getFakeRemoteEnabledNode(routing.currentNodeId())); + } + return builder.build(); + } + + public static DiscoveryNodes getFakeDiscoveryNodes(ShardRouting shardRouting) { + return DiscoveryNodes.builder().add(getFakeDiscoNode(shardRouting.currentNodeId())).build(); + } +}