Skip to content

Commit

Permalink
Make reroute iteration time-bound for large shard allocations (opense…
Browse files Browse the repository at this point in the history
…arch-project#14848)

* Make reroute iteration time-bound for large shard allocations

Signed-off-by: Bukhtawar Khan <[email protected]>
Co-authored-by: Rishab Nahata <[email protected]>
  • Loading branch information
2 people authored and wangdongyu.danny committed Aug 22, 2024
1 parent 7661078 commit 681087a
Show file tree
Hide file tree
Showing 15 changed files with 645 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
- Make reroute iteration time-bound for large shard allocations ([#14848](https://github.com/opensearch-project/OpenSearch/pull/14848))

### Deprecated
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
ensureGreen("test");
}

public void testBatchModeEnabled() throws Exception {
public void testBatchModeEnabledWithoutTimeout() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
Expand Down Expand Up @@ -810,6 +810,132 @@ public void testBatchModeEnabled() throws Exception {
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createNIndices(50, "test"); // this will create 50p, 50r shards
ensureStableCluster(3);
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
ClusterHealthResponse health = client().admin()
.cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
.actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());

String clusterManagerName = internalCluster().getClusterManagerName();
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
ensureStableCluster(1);

logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
assertEquals(100, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(0, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(0, health.getNumberOfDataNodes());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);

// wait for cluster to turn green
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());
assertEquals(0, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(100, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(2, health.getNumberOfDataNodes());
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

public void testBatchModeDisabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -617,10 +618,10 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, true)).ifPresent(Runnable::run);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, false)).ifPresent(Runnable::run);
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -108,14 +109,16 @@ void allocateUnassigned(
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default Runnable allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<Runnable> runnables = new ArrayList<>();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
runnables.add(() -> allocateUnassigned(shardRouting, allocation, iterator));
}
}
return () -> runnables.forEach(Runnable::run);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A {@link Runnable} that iteratively executes a batch of {@link TimeoutAwareRunnable}s. If the elapsed time exceeds the timeout defined by {@link TimeValue} timeout, then all subsequent {@link TimeoutAwareRunnable}s will have their {@link TimeoutAwareRunnable#onTimeout} method invoked and will not be run.
*
* @opensearch.internal
*/
public class BatchRunnableExecutor implements Runnable {

private final Supplier<TimeValue> timeoutSupplier;

private final List<TimeoutAwareRunnable> timeoutAwareRunnables;

private static final Logger logger = LogManager.getLogger(BatchRunnableExecutor.class);

public BatchRunnableExecutor(List<TimeoutAwareRunnable> timeoutAwareRunnables, Supplier<TimeValue> timeoutSupplier) {
this.timeoutSupplier = timeoutSupplier;
this.timeoutAwareRunnables = timeoutAwareRunnables;
}

// for tests
public List<TimeoutAwareRunnable> getTimeoutAwareRunnables() {
return this.timeoutAwareRunnables;
}

@Override
public void run() {
logger.debug("Starting execution of runnable of size [{}]", timeoutAwareRunnables.size());
long startTime = System.nanoTime();
if (timeoutAwareRunnables.isEmpty()) {
return;
}
Randomness.shuffle(timeoutAwareRunnables);
for (TimeoutAwareRunnable runnable : timeoutAwareRunnables) {
if (timeoutSupplier.get().nanos() < 0 || System.nanoTime() - startTime < timeoutSupplier.get().nanos()) {
runnable.run();
} else {
logger.debug("Executing timeout for runnable of size [{}]", timeoutAwareRunnables.size());
runnable.onTimeout();
}
}
logger.debug(
"Time taken to execute timed runnables in this cycle:[{}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

/**
* Runnable that is aware of a timeout
*
* @opensearch.internal
*/
public interface TimeoutAwareRunnable extends Runnable {

void onTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.core.index.shard.ShardId;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -78,6 +82,23 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

protected void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation, boolean primary) {
Set<ShardId> shardIdsFromBatch = new HashSet<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardId shardId = shardRouting.shardId();
shardIdsFromBatch.add(shardId);
}
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) {
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand Down
Loading

0 comments on commit 681087a

Please sign in to comment.