From 7cf09a5afb9d3ae6bf4a988fb9999880d6e7ead1 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Mon, 2 Sep 2024 13:18:37 +0530 Subject: [PATCH 01/16] Schedule reroute after allocator timed out Signed-off-by: Rishab Nahata --- .../org/opensearch/cluster/ClusterModule.java | 6 +++++ .../allocator/BalancedShardsAllocator.java | 27 +++++++++++++++++++ .../gateway/ShardsBatchGatewayAllocator.java | 24 +++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 1 + 4 files changed, 58 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index bb51c42252448..719eae37fa129 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -53,6 +53,7 @@ import org.opensearch.cluster.metadata.ViewMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.DelayedAllocationService; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; @@ -474,4 +475,9 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard allocationService.setExistingShardsAllocators(existingShardsAllocators); } + public void setRerouteServiceForAllocator(RerouteService rerouteService) { + if (shardsAllocator instanceof BalancedShardsAllocator) { + ((BalancedShardsAllocator) shardsAllocator).setRerouteService(rerouteService); + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a5193ca602f04..b87280f8f2e34 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IntroSorter; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardMovementStrategy; @@ -49,12 +50,14 @@ import org.opensearch.cluster.routing.allocation.RebalanceParameter; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; +import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import java.util.HashMap; import java.util.HashSet; @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; private long startTime; + private RerouteService rerouteService; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -231,6 +235,11 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } + public void setRerouteService(RerouteService rerouteService) { + assert this.rerouteService == null : "RerouteService is already set"; + this.rerouteService = rerouteService; + } + /** * Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING. */ @@ -342,6 +351,7 @@ public void allocate(RoutingAllocation allocation) { localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); + scheduleRerouteIfAllocatorTimedOut(); final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); remoteShardsBalancer.allocateUnassigned(); @@ -404,6 +414,23 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { } } + private void scheduleRerouteIfAllocatorTimedOut() { + if(allocatorTimedOut()) { + if (rerouteService == null) { + logger.info("RerouteService not set to schedule reroute after allocator time out"); + return; + } + rerouteService.reroute( + "reroute after balanced shards allocator timed out", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("reroute after balanced shards allocator timed out completed"), + e -> logger.debug("reroute after balanced shards allocator timed out failed", e) + ) + ); + } + } + /** * Returns the currently configured delta threshold */ diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index d18304ea73ed0..a9d6017fa47e2 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -290,6 +290,18 @@ public void onTimeout() { @Override public void run() { primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); + if (timedOutPrimaryShardIds.isEmpty() == false) { + logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); + assert rerouteService != null; + rerouteService.reroute( + "reroute after existing shards allocator timed out", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("reroute after existing shards allocator timed out completed"), + e -> logger.debug("reroute after existing shards allocator timed out failed", e) + ) + ); + } } })); return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) { @@ -320,6 +332,18 @@ public void run() { public void onComplete() { logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size()); replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false); + if (timedOutReplicaShardIds.isEmpty() == false) { + logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); + assert rerouteService != null; + rerouteService.reroute( + "reroute after existing shards allocator timed out", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("reroute after existing shards allocator timed out completed"), + e -> logger.debug("reroute after existing shards allocator timed out failed", e) + ) + ); + } } }; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ea656af6110e5..2a010133f5740 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -867,6 +867,7 @@ protected Node( final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); clusterService.setRerouteService(rerouteService); + clusterModule.setRerouteServiceForAllocator(rerouteService); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); From 3e2bcae55624a2c755a3bb85535a22adb4c98ae1 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 3 Sep 2024 01:52:24 +0530 Subject: [PATCH 02/16] Refactor Signed-off-by: Rishab Nahata --- .../src/main/java/org/opensearch/cluster/ClusterModule.java | 2 +- .../routing/allocation/allocator/BalancedShardsAllocator.java | 1 + .../cluster/routing/allocation/allocator/ShardsAllocator.java | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 719eae37fa129..5b2b3df986a4f 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -477,7 +477,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard public void setRerouteServiceForAllocator(RerouteService rerouteService) { if (shardsAllocator instanceof BalancedShardsAllocator) { - ((BalancedShardsAllocator) shardsAllocator).setRerouteService(rerouteService); + shardsAllocator.setRerouteService(rerouteService); } } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b87280f8f2e34..95e8fefacdff6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -235,6 +235,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } + @Override public void setRerouteService(RerouteService rerouteService) { assert this.rerouteService == null : "RerouteService is already set"; this.rerouteService = rerouteService; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 29e9acca4e6c2..4172c58d47e75 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing.allocation.allocator; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.MoveDecision; @@ -73,4 +74,7 @@ public interface ShardsAllocator { * the cluster explain API, then this method should throw a {@code UnsupportedOperationException}. */ ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation); + + default void setRerouteService(RerouteService rerouteService) { + } } From a8717d9c049a4556b152a64b17e52e03c20ea05f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 3 Sep 2024 02:01:41 +0530 Subject: [PATCH 03/16] Fix spotless Signed-off-by: Rishab Nahata --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- .../cluster/routing/allocation/allocator/ShardsAllocator.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 95e8fefacdff6..5f906f4277dca 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -416,7 +416,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { } private void scheduleRerouteIfAllocatorTimedOut() { - if(allocatorTimedOut()) { + if (allocatorTimedOut()) { if (rerouteService == null) { logger.info("RerouteService not set to schedule reroute after allocator time out"); return; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 4172c58d47e75..38aafff6ce3e8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -75,6 +75,5 @@ public interface ShardsAllocator { */ ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation); - default void setRerouteService(RerouteService rerouteService) { - } + default void setRerouteService(RerouteService rerouteService) {} } From edc02b0ef5b2fc3da8ab860efbc50399f95c9d24 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 3 Sep 2024 02:47:38 +0530 Subject: [PATCH 04/16] Minor change Signed-off-by: Rishab Nahata --- .../org/opensearch/gateway/ShardsBatchGatewayAllocator.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index a9d6017fa47e2..e6dffccf606ae 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -290,9 +290,8 @@ public void onTimeout() { @Override public void run() { primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); - if (timedOutPrimaryShardIds.isEmpty() == false) { + if (timedOutPrimaryShardIds.isEmpty() == false && rerouteService != null) { logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); - assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator timed out", Priority.HIGH, @@ -332,9 +331,8 @@ public void run() { public void onComplete() { logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size()); replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false); - if (timedOutReplicaShardIds.isEmpty() == false) { + if (timedOutReplicaShardIds.isEmpty() == false && rerouteService != null) { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); - assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator timed out", Priority.HIGH, From 3ba4633697d41a96881dddfbfca0b53dec9f56dc Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 3 Sep 2024 10:03:42 +0530 Subject: [PATCH 05/16] Trigger Build Signed-off-by: Rishab Nahata From f299939720bf1c2408921476549d37367ae40639 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 3 Sep 2024 22:04:42 +0530 Subject: [PATCH 06/16] Fix bug and add test Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 37 +++++++++++++------ .../gateway/GatewayAllocatorTests.java | 37 ++++++++++++++----- .../TestShardBatchGatewayAllocator.java | 4 +- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index e6dffccf606ae..116a7f94a512e 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -198,6 +198,17 @@ protected ShardsBatchGatewayAllocator(long batchSize) { this.replicaShardsBatchGatewayAllocatorTimeout = null; } + protected ShardsBatchGatewayAllocator(RerouteService rerouteService) { + this.rerouteService = rerouteService; + this.batchStartedAction = null; + this.primaryShardBatchAllocator = null; + this.batchStoreAction = null; + this.replicaShardBatchAllocator = null; + this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE; + this.primaryShardsBatchGatewayAllocatorTimeout = null; + this.replicaShardsBatchGatewayAllocatorTimeout = null; + } + // for tests @Override @@ -290,17 +301,6 @@ public void onTimeout() { @Override public void run() { primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); - if (timedOutPrimaryShardIds.isEmpty() == false && rerouteService != null) { - logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); - rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, - ActionListener.wrap( - r -> logger.trace("reroute after existing shards allocator timed out completed"), - e -> logger.debug("reroute after existing shards allocator timed out failed", e) - ) - ); - } } })); return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) { @@ -308,6 +308,18 @@ public void run() { public void onComplete() { logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size()); primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true); + if (timedOutPrimaryShardIds.isEmpty() == false) { + logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); + assert rerouteService != null; + rerouteService.reroute( + "reroute after existing shards allocator timed out", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("reroute after existing shards allocator timed out completed"), + e -> logger.debug("reroute after existing shards allocator timed out failed", e) + ) + ); + } } }; } else { @@ -331,8 +343,9 @@ public void run() { public void onComplete() { logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size()); replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false); - if (timedOutReplicaShardIds.isEmpty() == false && rerouteService != null) { + if (timedOutReplicaShardIds.isEmpty() == false) { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); + assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator timed out", Priority.HIGH, diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index c7eae77d6deba..19f18066125d3 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -22,6 +22,7 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -30,6 +31,8 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -37,7 +40,9 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; import org.opensearch.snapshots.SnapshotShardSizeInfo; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; +import org.opensearch.threadpool.TestThreadPool; import org.junit.Before; import java.util.ArrayList; @@ -47,13 +52,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class GatewayAllocatorTests extends OpenSearchAllocationTestCase { @@ -426,22 +431,34 @@ public void testReplicaAllocatorTimeout() { assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } - public void testCollectTimedOutShards() throws InterruptedException { + public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedException { createIndexAndUpdateClusterState(2, 5, 2); - CountDownLatch latch = new CountDownLatch(10); - testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ; + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + assertEquals("reroute after existing shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); executor.run(); - assertTrue(latch.await(1, TimeUnit.MINUTES)); - latch = new CountDownLatch(10); - testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); - testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); - testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); executor.run(); - assertTrue(latch.await(1, TimeUnit.MINUTES)); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); } private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index 156b1d7c620e6..c134c9ca72d5d 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -10,6 +10,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -39,7 +40,8 @@ public TestShardBatchGatewayAllocator() { } - public TestShardBatchGatewayAllocator(CountDownLatch latch) { + public TestShardBatchGatewayAllocator(CountDownLatch latch, RerouteService rerouteService) { + super(rerouteService); this.latch = latch; } From 55da99dea6562725ce15d92a682a1a617ec2fb12 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 00:28:39 +0530 Subject: [PATCH 07/16] Add tests for BSA Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 220 ++++++++++++------ .../gateway/GatewayAllocatorTests.java | 1 - 2 files changed, 153 insertions(+), 68 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index a10c305686638..afdd35796e2d8 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -8,6 +8,8 @@ package org.opensearch.cluster.routing.allocation.allocator; +import org.junit.After; +import org.junit.Before; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; @@ -17,6 +19,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -26,41 +29,72 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.threadpool.TestThreadPool; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { + private TestThreadPool threadPool; + private ClusterService clusterService; + private ClusterState state; + private final DiscoveryNode node1 = newNode("node1", "node1", Collections.singletonMap("zone", "1a")); private final DiscoveryNode node2 = newNode("node2", "node2", Collections.singletonMap("zone", "1b")); private final DiscoveryNode node3 = newNode("node3", "node3", Collections.singletonMap("zone", "1c")); - public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + if (threadPool != null) { + final boolean terminated = terminate(threadPool); + assert terminated; + } + if (clusterService != null) { + clusterService.close(); + } + } + + public void setupStateAndService(Metadata metadata, RoutingTable routingTable) { + state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + threadPool = new TestThreadPool(getTestName()); + clusterService = ClusterServiceUtils.createClusterService(state, threadPool); + } + + public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled() { int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; int totalPrimaryCount = numberOfIndices * numberOfShards; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Settings.Builder settings = Settings.builder(); - // passing total shard count for timed out latch such that no shard times out - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(totalShardCount)); + // passing sufficiently high count for timeout latch to simulate no time out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(Integer.MAX_VALUE)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); RoutingAllocation allocation = new RoutingAllocation( yesAllocationDeciders(), new RoutingNodes(state, false), @@ -69,6 +103,14 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -77,9 +119,10 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { assertEquals(totalShardCount, initializingShards.size()); assertEquals(0, allocation.routingNodes().unassigned().ignored().size()); assertEquals(totalPrimaryCount, node1Recoveries + node2Recoveries + node3Recoveries); + assertFalse(rerouteScheduled.get()); } - public void testAllUnassignedShardsIgnoredWhenTimedOut() { + public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; @@ -89,11 +132,7 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); RoutingAllocation allocation = new RoutingAllocation( yesAllocationDeciders(), new RoutingNodes(state, false), @@ -102,6 +141,14 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -110,9 +157,10 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { assertEquals(0, initializingShards.size()); assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); } - public void testAllocatePartialPrimaryShardsUntilTimedOut() { + public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; @@ -123,11 +171,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOut() { BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); RoutingAllocation allocation = new RoutingAllocation( yesAllocationDeciders(), new RoutingNodes(state, false), @@ -136,6 +180,14 @@ public void testAllocatePartialPrimaryShardsUntilTimedOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -144,9 +196,10 @@ public void testAllocatePartialPrimaryShardsUntilTimedOut() { assertEquals(shardsToAllocate, initializingShards.size()); assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); assertEquals(shardsToAllocate, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); } - public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { + public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndRerouteScheduled() { int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; @@ -158,11 +211,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); RoutingAllocation allocation = new RoutingAllocation( yesAllocationDeciders(), new RoutingNodes(state, false), @@ -171,6 +220,14 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -179,20 +236,17 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { assertEquals(shardsToAllocate, initializingShards.size()); assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); assertEquals(numberOfShards * numberOfIndices, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); } - public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { + public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotScheduled() { int numberOfIndices = 3; int numberOfShards = 5; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService(); state = applyStartedShardsUntilNoChange(state, allocationService); // check all shards allocated @@ -200,8 +254,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); int node1ShardCount = state.getRoutingNodes().node("node1").size(); Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); - int shardsToMove = 10 + 1000; // such that time out is never breached - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(Integer.MAX_VALUE)); // such that it never times out RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(settings), new RoutingNodes(state, false), @@ -210,30 +263,35 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(node1ShardCount, relocatingShards.size()); + assertFalse(rerouteScheduled.get()); } - public void testNoShardsMoveWhenExcludedAndTimeoutBreached() { + public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() { int numberOfIndices = 3; int numberOfShards = 5; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService(); state = applyStartedShardsUntilNoChange(state, allocationService); // check all shards allocated assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); - int shardsToMove = 0; // such that time out is never breached + int shardsToMove = 0; // such that time out is breached BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(settings), @@ -243,23 +301,28 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreached() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(0, relocatingShards.size()); + assertTrue(rerouteScheduled.get()); } - public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { + public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() { int numberOfIndices = 3; int numberOfShards = 5; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService(); state = applyStartedShardsUntilNoChange(state, allocationService); // check all shards allocated @@ -279,23 +342,28 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(shardsToMove / 3, relocatingShards.size()); + assertTrue(rerouteScheduled.get()); } - public void testClusterRebalancedWhenNotTimedOut() { + public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { int numberOfIndices = 1; int numberOfShards = 15; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService( Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() ); // such that no shards are allocated to node1 @@ -306,8 +374,7 @@ public void testClusterRebalancedWhenNotTimedOut() { assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); assertEquals(0, node1ShardCount); Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); - int shardsToMove = 1000; // such that time out is never breached - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove)); + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(Integer.MAX_VALUE)); // such that it never times out RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(newSettings), new RoutingNodes(state, false), @@ -316,23 +383,28 @@ public void testClusterRebalancedWhenNotTimedOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(totalShardCount / 3, relocatingShards.size()); + assertFalse(rerouteScheduled.get()); } - public void testClusterNotRebalancedWhenTimedOut() { + public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { int numberOfIndices = 1; int numberOfShards = 15; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService( Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() ); // such that no shards are allocated to node1 @@ -353,23 +425,28 @@ public void testClusterNotRebalancedWhenTimedOut() { null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(0, relocatingShards.size()); + assertTrue(rerouteScheduled.get()); } - public void testClusterPartialRebalancedWhenTimedOut() { + public void testClusterPartialRebalancedWhenTimedOutAndRerouteScheduled() { int numberOfIndices = 1; int numberOfShards = 15; int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); + setupStateAndService(metadata, routingTable); MockAllocationService allocationService = createAllocationService( Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() ); // such that no shards are allocated to node1 @@ -404,9 +481,18 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca null, System.nanoTime() ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(3, relocatingShards.size()); + assertTrue(rerouteScheduled.get()); } public void testAllocatorNeverTimedOutIfValueIsMinusOne() { diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 19f18066125d3..3222d19ecd066 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -434,7 +434,6 @@ public void testReplicaAllocatorTimeout() { public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedException { createIndexAndUpdateClusterState(2, 5, 2); TestThreadPool threadPool = new TestThreadPool(getTestName()); - ; ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); final RerouteService rerouteService = (reason, priority, listener) -> { From 838944ad22119146028644ac1b143af8c696e3d6 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 00:34:18 +0530 Subject: [PATCH 08/16] Fix spotless Signed-off-by: Rishab Nahata --- .../TimeBoundBalancedShardsAllocatorTests.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index afdd35796e2d8..ee50074bd9f5c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -8,8 +8,6 @@ package org.opensearch.cluster.routing.allocation.allocator; -import org.junit.After; -import org.junit.Before; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; @@ -33,9 +31,9 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.threadpool.TestThreadPool; +import org.junit.After; import java.util.Arrays; import java.util.Collections; @@ -44,8 +42,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; @@ -254,7 +250,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); int node1ShardCount = state.getRoutingNodes().node("node1").size(); Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(Integer.MAX_VALUE)); // such that it never times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(Integer.MAX_VALUE)); RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(settings), new RoutingNodes(state, false), @@ -374,7 +370,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); assertEquals(0, node1ShardCount); Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(Integer.MAX_VALUE)); // such that it never times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(Integer.MAX_VALUE)); RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(newSettings), new RoutingNodes(state, false), From c1889083827bcae332d473613335a5f884d05f5b Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 01:06:38 +0530 Subject: [PATCH 09/16] Minor Signed-off-by: Rishab Nahata --- .../allocation/allocator/BalancedShardsAllocator.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5f906f4277dca..785636fa7ff2a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -417,10 +417,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { private void scheduleRerouteIfAllocatorTimedOut() { if (allocatorTimedOut()) { - if (rerouteService == null) { - logger.info("RerouteService not set to schedule reroute after allocator time out"); - return; - } + assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out"; rerouteService.reroute( "reroute after balanced shards allocator timed out", Priority.HIGH, From 535fb03709a2c517c79cbe50113b75cba64a97f0 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 01:19:55 +0530 Subject: [PATCH 10/16] Test Signed-off-by: Rishab Nahata --- .../org/opensearch/cluster/ClusterModuleTests.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 97706927ba857..f8240e775cfa5 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -337,6 +337,19 @@ public void testQueryGroupMetadataRegister() { ); } + public void testRerouteServiceSetForBalancedShardsAllocator() { + ClusterModule clusterModule = new ClusterModule( + Settings.EMPTY, + clusterService, + Collections.emptyList(), + clusterInfoService, + null, + threadContext, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); + clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state())); + } + private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) { return new ClusterPlugin() { @Override From 349044d3f6b3fc56824a9c9d13b06022d8d1253d Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 12:40:59 +0530 Subject: [PATCH 11/16] Minor Signed-off-by: Rishab Nahata --- .../src/main/java/org/opensearch/cluster/ClusterModule.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 5b2b3df986a4f..79d091e9b632e 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -476,8 +476,6 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard } public void setRerouteServiceForAllocator(RerouteService rerouteService) { - if (shardsAllocator instanceof BalancedShardsAllocator) { - shardsAllocator.setRerouteService(rerouteService); - } + shardsAllocator.setRerouteService(rerouteService); } } From d19c0cd612704b771044ae28ac9a15e2ceef99fa Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 13:29:25 +0530 Subject: [PATCH 12/16] Minor refactor Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 17 +++-------------- .../gateway/GatewayAllocatorTests.java | 2 +- .../gateway/TestShardBatchGatewayAllocator.java | 6 +++--- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 116a7f94a512e..5e2dcbcd70b40 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -184,27 +184,16 @@ public void cleanCaches() { // for tests protected ShardsBatchGatewayAllocator() { - this(DEFAULT_SHARD_BATCH_SIZE); + this(DEFAULT_SHARD_BATCH_SIZE, null); } - protected ShardsBatchGatewayAllocator(long batchSize) { - this.rerouteService = null; - this.batchStartedAction = null; - this.primaryShardBatchAllocator = null; - this.batchStoreAction = null; - this.replicaShardBatchAllocator = null; - this.maxBatchSize = batchSize; - this.primaryShardsBatchGatewayAllocatorTimeout = null; - this.replicaShardsBatchGatewayAllocatorTimeout = null; - } - - protected ShardsBatchGatewayAllocator(RerouteService rerouteService) { + protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) { this.rerouteService = rerouteService; this.batchStartedAction = null; this.primaryShardBatchAllocator = null; this.batchStoreAction = null; this.replicaShardBatchAllocator = null; - this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE; + this.maxBatchSize = batchSize; this.primaryShardsBatchGatewayAllocatorTimeout = null; this.replicaShardsBatchGatewayAllocatorTimeout = null; } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 3222d19ecd066..61b53d688d330 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -444,7 +444,7 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce rerouteLatch.countDown(); }; CountDownLatch timedOutShardsLatch = new CountDownLatch(20); - testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, rerouteService); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index c134c9ca72d5d..c2ff228a6bf3a 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -40,13 +40,13 @@ public TestShardBatchGatewayAllocator() { } - public TestShardBatchGatewayAllocator(CountDownLatch latch, RerouteService rerouteService) { - super(rerouteService); + public TestShardBatchGatewayAllocator(CountDownLatch latch, long maxBatchSize, RerouteService rerouteService) { + super(maxBatchSize, rerouteService); this.latch = latch; } public TestShardBatchGatewayAllocator(long maxBatchSize) { - super(maxBatchSize); + super(maxBatchSize, null); } Map> knownAllocations = new HashMap<>(); From 00b809ea7b0ac2a93acdd62b7aaf72c64faae0ae Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 17:01:07 +0530 Subject: [PATCH 13/16] Randomly fail reroute Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 61 ++++++++++++++++--- .../gateway/GatewayAllocatorTests.java | 9 ++- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index ee50074bd9f5c..45a0bd7b18afd 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.allocation.allocator; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; @@ -101,7 +102,11 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -139,7 +144,11 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -178,7 +187,11 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -218,7 +231,11 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -261,7 +278,11 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -299,7 +320,11 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -340,7 +365,11 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -381,7 +410,11 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -423,7 +456,11 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -479,7 +516,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 61b53d688d330..d9120e1a6b37f 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -437,7 +438,11 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); assertEquals("reroute after existing shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); @@ -454,7 +459,7 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); - assertEquals(0, rerouteLatch.getCount()); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners final boolean terminated = terminate(threadPool); assert terminated; clusterService.close(); From 05d0c125c7329e2a73637cf3a159b91bbc5752bd Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 20:35:19 +0530 Subject: [PATCH 14/16] Trigger Build Signed-off-by: Rishab Nahata From b172ab500b7af1514b0e80d7fdefd822432983aa Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 21:03:58 +0530 Subject: [PATCH 15/16] Trigger Build Signed-off-by: Rishab Nahata From 3e1129c357d52d8f0c95a74bbb9a7d2f4122824e Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Sep 2024 22:38:08 +0530 Subject: [PATCH 16/16] Trigger Build Signed-off-by: Rishab Nahata