Skip to content

Commit

Permalink
Changes to implement interface for ShardBatchGatewayAllocator
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Jan 15, 2024
1 parent ebc2371 commit 0b665aa
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,10 @@ protected void configure() {
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public void setExistingShardsAllocators(Map<String, GatewayAllocator> gatewayAllocators) {
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>(gatewayAllocators);
public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>();
existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator);
existingShardsAllocators.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, shardsBatchGatewayAllocator);
for (ClusterPlugin clusterPlugin : clusterPlugins) {
for (Map.Entry<String, ExistingShardsAllocator> existingShardsAllocatorEntry : clusterPlugin.getExistingShardsAllocators()
.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,6 @@
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.LogManager;
Expand All @@ -44,6 +20,7 @@
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.FailedShard;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.Priority;
Expand All @@ -57,7 +34,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;

import java.util.Collections;
Expand All @@ -74,11 +50,11 @@
import java.util.stream.StreamSupport;

/**
* Allocator for the Shards batch gateway
* Allocator for the gateway
*
* @opensearch.internal
*/
public class ShardsBatchGatewayAllocator extends GatewayAllocator {
public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator";

Expand Down Expand Up @@ -117,13 +93,10 @@ public class ShardsBatchGatewayAllocator extends GatewayAllocator {
@Inject
public ShardsBatchGatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction,
TransportNodesListGatewayStartedBatchShards batchStartedAction,
TransportNodesListShardStoreMetadataBatch batchStoreAction,
Settings settings
) {
super(rerouteService, startedAction, storeAction);
this.rerouteService = rerouteService;
this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator();
Expand Down Expand Up @@ -207,7 +180,16 @@ public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {
}

@Override
public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean primary) {
public void allocateUnassigned(
ShardRouting shardRouting,
RoutingAllocation allocation,
UnassignedAllocationHandler unassignedAllocationHandler
) {
throw new UnsupportedOperationException("ShardsBatchGatewayAllocator does not support allocating unassigned shards");
}

@Override
public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {

assert primaryBatchShardAllocator != null;
assert replicaBatchShardAllocator != null;
Expand Down
14 changes: 4 additions & 10 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1188,16 +1188,10 @@ protected Node(
// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
// reroute, which needs to call into the allocation service. We close the loop here:
// create Hashmap for existing Allocators
Map<String, GatewayAllocator> gatewayAllocatorMap = new HashMap<>() {
{
put(GatewayAllocator.ALLOCATOR_NAME, injector.getInstance(GatewayAllocator.class));
put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, injector.getInstance(ShardsBatchGatewayAllocator.class));
}
};
clusterModule.setExistingShardsAllocators(gatewayAllocatorMap);
// service needs access to the existing shards allocators (e.g. the GatewayAllocator, ShardsBatchGatewayAllocator) which
// need to be able to trigger a reroute, which needs to call into the allocation service. We close the loop here:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class),
injector.getInstance(ShardsBatchGatewayAllocator.class));

List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;
import org.opensearch.test.gateway.TestGatewayAllocator;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testSerializeRequest() throws IOException {
public void testClusterStateUpdateTask() {
AllocationService allocationService = new AllocationService(
new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())),
new TestShardBatchGatewayAllocator(),
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.test.gateway.TestGatewayAllocator;
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -292,11 +293,8 @@ public void testRejectsReservedExistingShardsAllocatorName() {
null,
threadContext
);
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new HashMap<>() {
{
put(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator());
}
}));
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(),
new TestShardBatchGatewayAllocator()));
}

public void testRejectsDuplicateExistingShardsAllocatorName() {
Expand All @@ -308,11 +306,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() {
null,
threadContext
);
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new HashMap<>() {
{
put(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator());
}
}));
expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(),
new TestShardBatchGatewayAllocator()));
}

private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) {
Expand Down

0 comments on commit 0b665aa

Please sign in to comment.