Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reroute iteration time-bound for large shard allocations #14848

Merged
merged 35 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6d2a9d3
Simplify batch allocators with timeouts
Bukhtawar Jul 21, 2024
ef8232a
Add settings for allocator timeout and add additional logs
imRishN Jul 22, 2024
bf9cf16
Change to TimeoutAwareRunnable
Bukhtawar Jul 22, 2024
de8f6aa
Introduce BatchRunnableExecutor
Bukhtawar Jul 22, 2024
d3d78cf
Null checks
Bukhtawar Jul 22, 2024
1c5c211
nitpick
Bukhtawar Jul 22, 2024
3d8b9e4
Fix NPE bug
imRishN Jul 22, 2024
cf9c9dd
Remove timeout from decide allocate unassigned
imRishN Jul 22, 2024
25588fa
Add UT for BatchRunnableExecutor and BatchAllocators
imRishN Jul 22, 2024
d5ede50
Add cluster green IT
imRishN Jul 22, 2024
b9a9027
Minor fix and spotless apply
imRishN Jul 22, 2024
0cde01c
Add changelog
imRishN Jul 22, 2024
3b4baae
Remove redundant code
imRishN Jul 23, 2024
109616a
Add min value for timeout
imRishN Jul 23, 2024
5c40c99
Fix spotless check
imRishN Jul 23, 2024
f5501fa
Switch to shuffle
Bukhtawar Jul 23, 2024
341fe5f
Switch to shuffle
Bukhtawar Jul 23, 2024
e760f08
Revert "Add min value for timeout"
imRishN Jul 23, 2024
23a8fba
Trigger Build
imRishN Jul 23, 2024
f789037
Fix BatchRunnableExecutorTest test
imRishN Jul 23, 2024
0d0ac70
Trigger Build
imRishN Jul 23, 2024
65b3159
Change log levels and minor comment fix
imRishN Jul 23, 2024
65ea950
Minor fix
imRishN Jul 23, 2024
7a440af
Fix naming convention
imRishN Jul 23, 2024
aa11ddf
Replace null check with Optional
imRishN Jul 23, 2024
6cfdfae
Remove unnecessary logs
imRishN Jul 23, 2024
e83db71
Fix spotless
imRishN Jul 23, 2024
ad77ffa
Update server/src/main/java/org/opensearch/common/util/BatchRunnableE…
imRishN Jul 23, 2024
cc6b5dd
Add null check test
imRishN Jul 23, 2024
d03f8cf
Resolve PR Comments
imRishN Jul 23, 2024
21c4c1c
Fix spotless
imRishN Jul 23, 2024
25ca951
Fix
imRishN Jul 24, 2024
b1a612e
Add ExistingShardsAllocatorTests
imRishN Jul 24, 2024
07befa2
Add tests for ShardsBatchGatewayAllocator
imRishN Jul 24, 2024
6ddb155
Trigger Build
imRishN Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision {
private final long remainingDelayInMillis;
private final long configuredDelayInMillis;

private AllocateUnassignedDecision(
public AllocateUnassignedDecision(
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
AllocationStatus allocationStatus,
DiscoveryNode assignedNode,
String allocationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
Expand All @@ -73,7 +74,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -565,8 +569,14 @@ private void reroute(RoutingAllocation allocation) {
long rerouteStartTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

long startTime = System.nanoTime();
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
logger.info("Completing allocate unassigned, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
startTime = System.nanoTime();
shardsAllocator.allocate(allocation);
logger.info("Shard allocator to allocate all shards, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.rerouteHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
Expand Down Expand Up @@ -617,10 +627,26 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> allocator.getPrimaryBatchAllocatorTimeout().millis(), true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> allocator.getReplicaBatchAllocatorTimeout().millis(), false);
}

private void executeTimedRunnables(List<TimeoutAwareRunnable> runnables, Supplier<Long> maxRunTimeSupplier, boolean primary) {
logger.info("Executing timed runnables for primary [{}] of size [{}]", primary, runnables.size());
Collections.shuffle(runnables);
long startTime = System.nanoTime();
for (TimeoutAwareRunnable workQueue : runnables) {
if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) {
logger.info("Starting primary [{}] batch to allocate", primary);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
workQueue.run();
} else {
logger.info("Timing out primary [{}] batch to allocate", primary);
workQueue.onTimeout();
}
}
logger.info("Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
Expand Down Expand Up @@ -108,14 +112,34 @@ void allocateUnassigned(
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
default List<TimeoutAwareRunnable> allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<TimeoutAwareRunnable> runnables = new ArrayList<>();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
runnables.add(new TimeoutAwareRunnable() {
@Override
public void onTimeout() {
//do nothing
}

@Override
public void run() {
allocateUnassigned(shardRouting, allocation, iterator);
}
});
}
}
return runnables;
}

default TimeValue getPrimaryBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
}

default TimeValue getReplicaBatchAllocatorTimeout() {
return TimeValue.MINUS_ONE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -162,6 +163,13 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATE_UNASSIGNED_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.allocate_unassigned_timeout",
TimeValue.MINUS_ONE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -172,6 +180,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;
private volatile TimeValue allocateUnassignedTimeout;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -187,6 +196,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocateUnassignedTimeout(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, this::setAllocateUnassignedTimeout);
}

/**
Expand Down Expand Up @@ -269,6 +280,10 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setAllocateUnassignedTimeout(TimeValue allocateUnassignedTimeout) {
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) {
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -297,19 +313,21 @@ public void allocate(RoutingAllocation allocation) {

@Override
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
long startTime = System.nanoTime();
ShardsBalancer localShardsBalancer = new LocalShardsBalancer(
logger,
allocation,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
allocateUnassignedTimeout
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
if (shard.unassigned()) {
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard);
allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard, startTime);
} else {
moveDecision = localShardsBalancer.decideMove(shard);
if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) {
Expand Down Expand Up @@ -558,7 +576,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, TimeValue.MINUS_ONE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand All @@ -40,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -60,6 +62,7 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final boolean preferPrimaryBalance;
private final boolean preferPrimaryRebalance;
private final TimeValue allocateUnassignedTimeout;
private final BalancedShardsAllocator.WeightFunction weight;

private final float threshold;
Expand All @@ -77,7 +80,8 @@ public LocalShardsBalancer(
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance
boolean preferPrimaryRebalance,
TimeValue allocateUnassignedTimeout
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -94,6 +98,7 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.allocateUnassignedTimeout = allocateUnassignedTimeout;
}

/**
Expand Down Expand Up @@ -742,6 +747,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
*/
@Override
void allocateUnassigned() {
long startTime = System.nanoTime();
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -797,7 +803,7 @@ void allocateUnassigned() {
do {
for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard, startTime);
final String assignedNodeId = allocationDecision.getTargetNode() != null
? allocationDecision.getTargetNode().getId()
: null;
Expand Down Expand Up @@ -870,6 +876,8 @@ void allocateUnassigned() {
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
logger.debug("Time taken in allocate unassigned [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

/**
Expand All @@ -879,12 +887,17 @@ void allocateUnassigned() {
* is of type {@link Decision.Type#NO}, then the assigned node will be null.
*/
@Override
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, long startTime) {
if (shard.assignedToNode()) {
// we only make decisions for unassigned shards here
return AllocateUnassignedDecision.NOT_TAKEN;
}

if (System.nanoTime() - startTime > allocateUnassignedTimeout.nanos()) {
logger.info("Timed out while running Local shard balancer allocate unassigned - outer loop");
return AllocateUnassignedDecision.throttle(null);
}

final boolean explain = allocation.debugDecision();
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
if (shardLevelDecision.type() == Decision.Type.NO && explain == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private Map<String, Integer> calculateNodePrimaryShardCount(List<RoutingNode> re
}

@Override
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) {
AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime) {
throw new UnsupportedOperationException("remote shards balancer does not support decision operations");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class ShardsBalancer {
* @param shardRouting the shard for which the decision has to be made
* @return the allocation decision
*/
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting);
abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime);

/**
* Makes a decision on whether to move a started shard to another node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.ALLOCATE_UNASSIGNED_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down Expand Up @@ -342,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

/**
* Runnable that is aware of a timeout and can execute another {@link Runnable} when a timeout is reached
*/
public interface TimeoutAwareRunnable extends Runnable {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

void onTimeout();
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -121,6 +122,19 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
logger.trace("Finished shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
}

public void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) {
allocationDecision = new AllocateUnassignedDecision(UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}.
* <p>
Expand Down
Loading
Loading