From c7670d8f4974e8f2094f36676105c9eb6e9af953 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar <50844303+rahulkarajgikar@users.noreply.github.com> Date: Mon, 29 Jul 2024 18:18:44 +0530 Subject: [PATCH] [Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (#14972) * Fix for hasInitiatedFetching() in batch mode Signed-off-by: Rahul Karajgikar --- CHANGELOG.md | 1 + .../gateway/RecoveryFromGatewayIT.java | 160 +++++++++++++++++- .../gateway/AsyncShardBatchFetch.java | 8 + .../gateway/ReplicaShardBatchAllocator.java | 2 +- .../gateway/ShardsBatchGatewayAllocator.java | 31 +++- 5 files changed, 191 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e88a084f7d7f6..d4c8c955bced4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972)) ### Dependencies - Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861)) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 4085cc3890f30..eccc903dfac82 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -57,6 +57,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; @@ -797,11 +798,26 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); - // Now start both data nodes and ensure batch mode is working - logger.info("--> restarting the stopped nodes"); + // Now start one data node + logger.info("--> restarting the first stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + ensureStableCluster(2); + ensureYellow("test"); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); + + // calling reroute and asserting on reroute response + logger.info("--> calling reroute while cluster is yellow"); + clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + // Now start last data node and ensure batch mode is working and cluster goes green + logger.info("--> restarting the second stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); ensureStableCluster(3); ensureGreen("test"); @@ -842,11 +858,26 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); - // Now start both data nodes and ensure batch mode is working - logger.info("--> restarting the stopped nodes"); + // Now start one data nodes and ensure batch mode is working + logger.info("--> restarting the first stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + ensureStableCluster(2); + ensureYellow("test"); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); + + // calling reroute and asserting on reroute response + logger.info("--> calling reroute while cluster is yellow"); + clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + // Now start last data node and ensure batch mode is working and cluster goes green + logger.info("--> restarting the second stopped node"); internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); ensureStableCluster(3); ensureGreen("test"); @@ -907,7 +938,9 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches()); + // All replica shards would be marked ineligible since there are no data nodes. + // They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); assertEquals(RED, health.getStatus()); @@ -1051,6 +1084,18 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN ensureGreen("test"); } + public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception { + // Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are + // returning NO + this.allocationExplainReturnsNoWhenExtraReplicaShard(false); + } + + public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception { + // Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are + // returning NO + this.allocationExplainReturnsNoWhenExtraReplicaShard(true); + } + public void testNBatchesCreationAndAssignment() throws Exception { // we will reduce batch size to 5 to make sure we have enough batches to test assignment // Total number of primary shards = 50 (50 indices*1) @@ -1104,7 +1149,9 @@ public void testNBatchesCreationAndAssignment() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches()); + // All replica shards would be marked ineligible since there are no data nodes. + // They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); assertEquals(RED, health.getStatus()); @@ -1193,7 +1240,9 @@ public void testCulpritShardInBatch() throws Exception { ); assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); - assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + // Replica shard would be marked ineligible since there are no data nodes. + // It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); assertTrue(clusterRerouteResponse.isAcknowledged()); health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); assertFalse(health.isTimedOut()); @@ -1511,4 +1560,97 @@ private List findNodesWithShard(final boolean primary) { Collections.shuffle(requiredStartedShards, random()); return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList()); } + + private void allocationExplainReturnsNoWhenExtraReplicaShard(boolean batchModeEnabled) throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), batchModeEnabled).build() + ); + internalCluster().startDataOnlyNodes(5); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build() + ); + ensureGreen("test"); + ensureStableCluster(6); + + // Stop one of the nodes to make the cluster yellow + // We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to + // INDEX_CREATED + List nodesWithReplicaShards = findNodesWithShard(false); + Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0))); + + ensureStableCluster(5); + ensureYellow("test"); + + logger.info("--> calling allocation explain API"); + // shard should have decision NO because there is no valid node for the extra replica to go to + AllocateUnassignedDecision aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + // Now creating a new index with too many replicas and trying again + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build() + ); + + ensureYellowAndNoInitializingShards("test2"); + + logger.info("--> calling allocation explain API again"); + // shard should have decision NO because there are 6 replicas and 4 data nodes + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + logger.info("--> restarting the stopped node"); + internalCluster().startDataOnlyNode( + Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build() + ); + + ensureStableCluster(6); + ensureGreen("test"); + + logger.info("--> calling allocation explain API 3rd time"); + // shard should still have decision NO because there are 6 replicas and 5 data nodes + aud = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test2") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision(); + + assertEquals(AllocationDecision.NO, aud.getAllocationDecision()); + assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation()); + + internalCluster().startDataOnlyNodes(1); + + ensureStableCluster(7); + ensureGreen("test2"); + } } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 4f39a39cea678..df642a9f5a743 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) { this.cache.deleteShard(shardId); } + public boolean hasEmptyCache() { + return this.cache.getCache().isEmpty(); + } + + public AsyncShardFetchCache getCache() { + return this.cache; + } + /** * Cache implementation of transport actions returning batch of shards related data in the response. * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 7c75f2a5d1a8f..0818b187271cb 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -183,7 +183,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision( if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) { // only return early if we are not in explain mode, or we are in explain mode but we have not // yet attempted to fetch any shard data - logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting); + logger.trace("{}: ignoring allocation, can't be allocated on any node. Decision: {}", shardRouting, allocationDecision.type()); return AllocateUnassignedDecision.no( UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), result.v2() != null ? new ArrayList<>(result.v2().values()) : null diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 55f5388d8f454..673ed8dbaa1c3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -576,8 +576,37 @@ protected AsyncShardFetch.FetchResult