Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation service changes for batch assignment #8888

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -55,13 +56,16 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.SnapshotsInfoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -73,6 +77,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE;

/**
* This service manages the node allocation of a cluster. For this reason the
Expand All @@ -87,6 +92,7 @@ public class AllocationService {
private static final Logger logger = LogManager.getLogger(AllocationService.class);

private final AllocationDeciders allocationDeciders;
private Settings settings;
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
Expand All @@ -109,11 +115,23 @@ public AllocationService(
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -548,6 +566,24 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
existingShardsAllocator.beforeAllocation(allocation);
}

/*
Use batch mode if enabled and there is no custom allocator set for Allocation service
*/
Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings);
if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT) && existingShardsAllocators.size() == 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the check for ==2 here is for GatewayAllocator & BatchGatewayAllocator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this version check should be against 3.0.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

/*
If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
return;
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
Expand Down Expand Up @@ -608,9 +644,9 @@ private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardR
: "shard started for unknown index (shard entry: " + startedShard + ")";
assert startedShard == routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId())
: "shard routing to start does not exist in routing table, expected: "
+ startedShard
+ " but was: "
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
+ startedShard
+ " but was: "
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

routingNodes.startShard(logger, startedShard, routingAllocation.changes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

import java.util.List;

/**
* Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is
* {@link GatewayAllocator}, but plugins can supply their own implementations too.
* {@link GatewayAllocator} and {@link ShardsBatchGatewayAllocator}, but plugins can supply their own implementations too.
*
* @opensearch.internal
*/
Expand All @@ -60,6 +61,26 @@ public interface ExistingShardsAllocator {
Setting.Property.PrivateIndex
);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc here, we are adding a new public setting. Highly recommend adding multi-line comment explaining the purpose of the setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
Comment on lines +69 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By enabling this setting, GatewayAllocator will use batch implementation by default. In case of custom ExistingShardAllocator , the performance benefits will be visible if {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method is implemented efficiently for batch mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if custom Allocator is set we wont be even running allocateAllUnassignedShards in AllocationService.

This was discussed here

*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
*
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
Setting.Property.NodeScope
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
);

/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand All @@ -80,6 +101,23 @@ void allocateUnassigned(
UnassignedAllocationHandler unassignedAllocationHandler
);

/**
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
* and is kept here for backward compatibility.
*
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
*/
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting shardRouting = iterator.next();
if (shardRouting.primary() == primary) {
allocateUnassigned(shardRouting, allocation, iterator);
}
}
}

/**
* Returns an explanation for a single unassigned shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
Expand Down Expand Up @@ -246,6 +247,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
Expand Down
Loading