Skip to content

Commit

Permalink
Make AllocationService#adaptAutoExpandReplicas Faster (elastic#83092)
Browse files Browse the repository at this point in the history
This method is getting fairly expensive for large cluster states.
In most cases it is not necessary to actually compute the `RoutingAllocation`
so I made that lazy to save potentially needlessly building routing nodes.
Also, parsing the auto-expand-replicas setting gets quite expensive when looping
over thousands of shards in this method so I moved the auto-expand setting value
into the index metadata.
These changes make the method disappear from profiling in most cases and help
make reroute yet a bit faster.
  • Loading branch information
original-brownbear authored Jan 26, 2022
1 parent c775642 commit 788df35
Show file tree
Hide file tree
Showing 27 changed files with 123 additions and 163 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83092.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83092
summary: Make `AllocationService#adaptAutoExpandReplicas` Faster
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -92,11 +91,9 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<ClusterAllocationExplainResponse> listener
) {
final RoutingNodes routingNodes = state.getRoutingNodes();
final ClusterInfo clusterInfo = clusterInfoService.getClusterInfo();
final RoutingAllocation allocation = new RoutingAllocation(
allocationDeciders,
routingNodes,
state,
clusterInfo,
snapshotsInfoService.snapshotShardSizes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.function.Supplier;

import static org.elasticsearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed;

Expand Down Expand Up @@ -100,28 +101,27 @@ public boolean expandToAllNodes() {
return maxReplicas == Integer.MAX_VALUE;
}

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
for (DiscoveryNode discoveryNode : allocation.nodes().getDataNodes().values()) {
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetadata, discoveryNode, allocation);
if (decision.type() != Decision.Type.NO) {
numMatchingDataNodes++;
}
public OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
assert enabled : "should only be called when enabled";
int numMatchingDataNodes = 0;
for (DiscoveryNode discoveryNode : allocation.nodes().getDataNodes().values()) {
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetadata, discoveryNode, allocation);
if (decision.type() != Decision.Type.NO) {
numMatchingDataNodes++;
}
}

final int min = minReplicas();
final int max = getMaxReplicas(numMatchingDataNodes);
int numberOfReplicas = numMatchingDataNodes - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}
final int min = minReplicas();
final int max = getMaxReplicas(numMatchingDataNodes);
int numberOfReplicas = numMatchingDataNodes - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}

if (numberOfReplicas >= min && numberOfReplicas <= max) {
return OptionalInt.of(numberOfReplicas);
}
if (numberOfReplicas >= min && numberOfReplicas <= max) {
return OptionalInt.of(numberOfReplicas);
}
return OptionalInt.empty();
}
Expand All @@ -137,12 +137,22 @@ public String toString() {
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
*/
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(Metadata metadata, RoutingAllocation allocation) {
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(
Metadata metadata,
Supplier<RoutingAllocation> allocationSupplier
) {
Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();

// RoutingAllocation is fairly expensive to compute, only lazy create it via the supplier if we actually need it
RoutingAllocation allocation = null;
for (final IndexMetadata indexMetadata : metadata) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN || isIndexVerifiedBeforeClosed(indexMetadata)) {
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetadata.getSettings());
AutoExpandReplicas autoExpandReplicas = indexMetadata.getAutoExpandReplicas();
if (autoExpandReplicas.enabled() == false) {
continue;
}
if (allocation == null) {
allocation = allocationSupplier.get();
}
autoExpandReplicas.getDesiredNumberOfReplicas(indexMetadata, allocation).ifPresent(numberOfReplicas -> {
if (numberOfReplicas != indexMetadata.getNumberOfReplicas()) {
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetadata.getIndex().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {

private final LifecycleExecutionState lifecycleExecutionState;

private final AutoExpandReplicas autoExpandReplicas;

private IndexMetadata(
final Index index,
final long version,
Expand Down Expand Up @@ -536,7 +538,8 @@ private IndexMetadata(
final boolean ignoreDiskWatermarks,
@Nullable final List<String> tierPreference,
final int shardsPerNodeLimit,
final LifecycleExecutionState lifecycleExecutionState
final LifecycleExecutionState lifecycleExecutionState,
final AutoExpandReplicas autoExpandReplicas
) {
this.index = index;
this.version = version;
Expand Down Expand Up @@ -578,6 +581,7 @@ private IndexMetadata(
this.tierPreference = tierPreference;
this.shardsPerNodeLimit = shardsPerNodeLimit;
this.lifecycleExecutionState = lifecycleExecutionState;
this.autoExpandReplicas = autoExpandReplicas;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -618,7 +622,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
this.ignoreDiskWatermarks,
this.tierPreference,
this.shardsPerNodeLimit,
this.lifecycleExecutionState
this.lifecycleExecutionState,
this.autoExpandReplicas
);
}

Expand Down Expand Up @@ -746,6 +751,10 @@ public LifecycleExecutionState getLifecycleExecutionState() {
return lifecycleExecutionState;
}

public AutoExpandReplicas getAutoExpandReplicas() {
return autoExpandReplicas;
}

/**
* Return the concrete mapping for this index or {@code null} if this index has no mappings at all.
*/
Expand Down Expand Up @@ -1612,7 +1621,8 @@ public IndexMetadata build() {
DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(settings),
tierPreference,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings),
lifecycleExecutionState
lifecycleExecutionState,
AutoExpandReplicas.SETTING.get(settings)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand All @@ -71,7 +72,7 @@ public class AllocationService {
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final SnapshotsInfoService snapshotsInfoService;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand Down Expand Up @@ -298,17 +299,16 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
*/
public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
RoutingAllocation allocation = new RoutingAllocation(
final Supplier<RoutingAllocation> allocationSupplier = () -> new RoutingAllocation(
allocationDeciders,
clusterState.getRoutingNodes(),
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
final Map<Integer, List<String>> autoExpandReplicaChanges = AutoExpandReplicas.getAutoExpandReplicaChanges(
clusterState.metadata(),
allocation
allocationSupplier
);
if (autoExpandReplicaChanges.isEmpty()) {
return clusterState;
Expand Down Expand Up @@ -336,7 +336,7 @@ public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
.routingTable(routingTableBuilder.build())
.metadata(metadataBuilder)
.build();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metadata(), allocation).isEmpty();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metadata(), allocationSupplier).isEmpty();
return fixedState;
}
}
Expand Down Expand Up @@ -514,7 +514,7 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {

private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), () -> allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
Expand All @@ -42,15 +43,10 @@ public class RoutingAllocation {

private final AllocationDeciders deciders;

@Nullable
private final RoutingNodes routingNodes;

private final Metadata metadata;

private final RoutingTable routingTable;

private final DiscoveryNodes nodes;

private final ImmutableOpenMap<String, ClusterState.Custom> customs;
private final ClusterState clusterState;

private final ClusterInfo clusterInfo;

Expand All @@ -75,36 +71,41 @@ public class RoutingAllocation {
restoreInProgressUpdater
);

private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;
private final Map<String, SingleNodeShutdownMetadata> nodeReplacementTargets;

public RoutingAllocation(
AllocationDeciders deciders,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
}

/**
* Creates a new {@link RoutingAllocation}
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state
* @param clusterState cluster state before rerouting
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
*/
public RoutingAllocation(
AllocationDeciders deciders,
RoutingNodes routingNodes,
@Nullable RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
this.deciders = deciders;
this.routingNodes = routingNodes;
this.metadata = clusterState.metadata();
this.routingTable = clusterState.routingTable();
this.nodes = clusterState.nodes();
this.customs = clusterState.customs();
this.clusterState = clusterState;
this.clusterInfo = clusterInfo;
this.shardSizeInfo = shardSizeInfo;
this.currentNanoTime = currentNanoTime;
this.nodeShutdowns = metadata.nodeShutdowns();
Map<String, SingleNodeShutdownMetadata> targetNameToShutdown = new HashMap<>();
for (SingleNodeShutdownMetadata shutdown : this.nodeShutdowns.values()) {
for (SingleNodeShutdownMetadata shutdown : clusterState.metadata().nodeShutdowns().values()) {
if (shutdown.getType() == SingleNodeShutdownMetadata.Type.REPLACE) {
targetNameToShutdown.put(shutdown.getTargetNodeName(), shutdown);
}
Expand All @@ -130,31 +131,34 @@ public AllocationDeciders deciders() {
* @return current routing table
*/
public RoutingTable routingTable() {
return routingTable;
return clusterState.routingTable();
}

/**
* Get current routing nodes
* @return routing nodes
*/
public RoutingNodes routingNodes() {
return routingNodes;
if (routingNodes != null) {
return routingNodes;
}
return clusterState.getRoutingNodes();
}

/**
* Get metadata of routing nodes
* @return Metadata of routing nodes
*/
public Metadata metadata() {
return metadata;
return clusterState.metadata();
}

/**
* Get discovery nodes in current routing
* @return discovery nodes
*/
public DiscoveryNodes nodes() {
return nodes;
return clusterState.nodes();
}

public ClusterInfo clusterInfo() {
Expand All @@ -169,7 +173,7 @@ public SnapshotShardSizeInfo snapshotShardSizeInfo() {
* Returns the map of node id to shutdown metadata currently in the cluster
*/
public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
return this.nodeShutdowns;
return metadata().nodeShutdowns();
}

/**
Expand All @@ -181,11 +185,11 @@ public Map<String, SingleNodeShutdownMetadata> replacementTargetShutdowns() {

@SuppressWarnings("unchecked")
public <T extends ClusterState.Custom> T custom(String key) {
return (T) customs.get(key);
return (T) clusterState.customs().get(key);
}

public ImmutableOpenMap<String, ClusterState.Custom> getCustoms() {
return customs;
return clusterState.getCustoms();
}

public void ignoreDisable(boolean ignoreDisable) {
Expand Down Expand Up @@ -267,7 +271,7 @@ public RoutingChangesObserver changes() {
* Returns updated {@link Metadata} based on the changes that were made to the routing nodes
*/
public Metadata updateMetadataWithRoutingChanges(RoutingTable newRoutingTable) {
return indexMetadataUpdater.applyChanges(metadata, newRoutingTable);
return indexMetadataUpdater.applyChanges(metadata(), newRoutingTable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* This {@link AllocationDecider} controls shard allocation based on
Expand Down Expand Up @@ -164,7 +163,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
final boolean debug = allocation.debugDecision();
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) {
if (indexMetadata.getAutoExpandReplicas().expandToAllNodes()) {
return YES_AUTO_EXPAND_ALL;
}

Expand Down
Loading

0 comments on commit 788df35

Please sign in to comment.