Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.16] Make reroute iteration time-bound for large shard allocations #14954

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
- Make reroute iteration time-bound for large shard allocations ([#14848](https://github.com/opensearch-project/OpenSearch/pull/14848))

### Deprecated
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
ensureGreen("test");
}

public void testBatchModeEnabled() throws Exception {
public void testBatchModeEnabledWithoutTimeout() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
Expand Down Expand Up @@ -810,6 +810,132 @@ public void testBatchModeEnabled() throws Exception {
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createNIndices(50, "test"); // this will create 50p, 50r shards
ensureStableCluster(3);
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
ClusterHealthResponse health = client().admin()
.cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
.actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());

String clusterManagerName = internalCluster().getClusterManagerName();
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
ensureStableCluster(1);

logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
assertEquals(100, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(0, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(0, health.getNumberOfDataNodes());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);

// wait for cluster to turn green
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());
assertEquals(0, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(100, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(2, health.getNumberOfDataNodes());
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

public void testBatchModeDisabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -617,10 +618,10 @@

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, true)).ifPresent(Runnable::run);

Check warning on line 621 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L621

Added line #L621 was not covered by tests
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, false)).ifPresent(Runnable::run);

Check warning on line 624 in server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java#L624

Added line #L624 was not covered by tests
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -108,14 +109,16 @@ void allocateUnassigned(
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default Runnable allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<Runnable> runnables = new ArrayList<>();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
runnables.add(() -> allocateUnassigned(shardRouting, allocation, iterator));
}
}
return () -> runnables.forEach(Runnable::run);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A {@link Runnable} that iteratively executes a batch of {@link TimeoutAwareRunnable}s. If the elapsed time exceeds the timeout defined by {@link TimeValue} timeout, then all subsequent {@link TimeoutAwareRunnable}s will have their {@link TimeoutAwareRunnable#onTimeout} method invoked and will not be run.
*
* @opensearch.internal
*/
public class BatchRunnableExecutor implements Runnable {

private final Supplier<TimeValue> timeoutSupplier;

private final List<TimeoutAwareRunnable> timeoutAwareRunnables;

private static final Logger logger = LogManager.getLogger(BatchRunnableExecutor.class);

public BatchRunnableExecutor(List<TimeoutAwareRunnable> timeoutAwareRunnables, Supplier<TimeValue> timeoutSupplier) {
this.timeoutSupplier = timeoutSupplier;
this.timeoutAwareRunnables = timeoutAwareRunnables;
}

// for tests
public List<TimeoutAwareRunnable> getTimeoutAwareRunnables() {
return this.timeoutAwareRunnables;
}

@Override
public void run() {
logger.debug("Starting execution of runnable of size [{}]", timeoutAwareRunnables.size());
long startTime = System.nanoTime();
if (timeoutAwareRunnables.isEmpty()) {
return;
}
Randomness.shuffle(timeoutAwareRunnables);
for (TimeoutAwareRunnable runnable : timeoutAwareRunnables) {
if (timeoutSupplier.get().nanos() < 0 || System.nanoTime() - startTime < timeoutSupplier.get().nanos()) {
runnable.run();
} else {
logger.debug("Executing timeout for runnable of size [{}]", timeoutAwareRunnables.size());
runnable.onTimeout();
}
}
logger.debug(
"Time taken to execute timed runnables in this cycle:[{}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

/**
* Runnable that is aware of a timeout
*
* @opensearch.internal
*/
public interface TimeoutAwareRunnable extends Runnable {

void onTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.core.index.shard.ShardId;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -78,6 +82,23 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

protected void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation, boolean primary) {
Set<ShardId> shardIdsFromBatch = new HashSet<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardId shardId = shardRouting.shardId();
shardIdsFromBatch.add(shardId);
}
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) {
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand Down
Loading
Loading