From 0b665aa3bb83b57c3a3b7608be333a7dd5267bfe Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Mon, 15 Jan 2024 10:47:38 +0530 Subject: [PATCH] Changes to implement interface for ShardBatchGatewayAllocator Signed-off-by: Gaurav Chandani --- .../org/opensearch/cluster/ClusterModule.java | 6 ++- .../gateway/ShardsBatchGatewayAllocator.java | 44 ++++++------------- .../main/java/org/opensearch/node/Node.java | 14 ++---- .../cluster/reroute/ClusterRerouteTests.java | 4 +- .../cluster/ClusterModuleTests.java | 15 +++---- 5 files changed, 28 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 08cd353a9eb17..08a93221f63da 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -442,8 +442,10 @@ protected void configure() { bind(ShardsAllocator.class).toInstance(shardsAllocator); } - public void setExistingShardsAllocators(Map gatewayAllocators) { - final Map existingShardsAllocators = new HashMap<>(gatewayAllocators); + public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) { + final Map existingShardsAllocators = new HashMap<>(); + existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator); + existingShardsAllocators.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, shardsBatchGatewayAllocator); for (ClusterPlugin clusterPlugin : clusterPlugins) { for (Map.Entry existingShardsAllocatorEntry : clusterPlugin.getExistingShardsAllocators() .entrySet()) { diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 13d2b518d25ef..ecae15b31752e 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.gateway; import org.apache.logging.log4j.LogManager; @@ -44,6 +20,7 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.common.Priority; @@ -57,7 +34,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.ShardAttributes; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import java.util.Collections; @@ -74,11 +50,11 @@ import java.util.stream.StreamSupport; /** - * Allocator for the Shards batch gateway + * Allocator for the gateway * * @opensearch.internal */ -public class ShardsBatchGatewayAllocator extends GatewayAllocator { +public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator"; @@ -117,13 +93,10 @@ public class ShardsBatchGatewayAllocator extends GatewayAllocator { @Inject public ShardsBatchGatewayAllocator( RerouteService rerouteService, - TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction, TransportNodesListGatewayStartedBatchShards batchStartedAction, TransportNodesListShardStoreMetadataBatch batchStoreAction, Settings settings ) { - super(rerouteService, startedAction, storeAction); this.rerouteService = rerouteService; this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator(); this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator(); @@ -207,7 +180,16 @@ public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) { } @Override - public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean primary) { + public void allocateUnassigned( + ShardRouting shardRouting, + RoutingAllocation allocation, + UnassignedAllocationHandler unassignedAllocationHandler + ) { + throw new UnsupportedOperationException("ShardsBatchGatewayAllocator does not support allocating unassigned shards"); + } + + @Override + public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) { assert primaryBatchShardAllocator != null; assert replicaBatchShardAllocator != null; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 43be1cdf72eaa..298d8fe36952e 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1188,16 +1188,10 @@ protected Node( // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there. // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation - // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a - // reroute, which needs to call into the allocation service. We close the loop here: - // create Hashmap for existing Allocators - Map gatewayAllocatorMap = new HashMap<>() { - { - put(GatewayAllocator.ALLOCATOR_NAME, injector.getInstance(GatewayAllocator.class)); - put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, injector.getInstance(ShardsBatchGatewayAllocator.class)); - } - }; - clusterModule.setExistingShardsAllocators(gatewayAllocatorMap); + // service needs access to the existing shards allocators (e.g. the GatewayAllocator, ShardsBatchGatewayAllocator) which + // need to be able to trigger a reroute, which needs to call into the allocation service. We close the loop here: + clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class), + injector.getInstance(ShardsBatchGatewayAllocator.class)); List pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java index 6764693ec86ed..bfa33628d822a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -55,7 +55,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.snapshots.EmptySnapshotsInfoService; -import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; +import org.opensearch.test.gateway.TestGatewayAllocator; import java.io.IOException; import java.util.Collections; @@ -94,7 +94,7 @@ public void testSerializeRequest() throws IOException { public void testClusterStateUpdateTask() { AllocationService allocationService = new AllocationService( new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), - new TestShardBatchGatewayAllocator(), + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, EmptySnapshotsInfoService.INSTANCE diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index f64e6b146e001..50a4f87f78d09 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -71,6 +71,7 @@ import org.opensearch.gateway.GatewayAllocator; import org.opensearch.plugins.ClusterPlugin; import org.opensearch.test.gateway.TestGatewayAllocator; +import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; import java.util.Arrays; import java.util.Collection; @@ -292,11 +293,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new HashMap<>() { - { - put(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()); - } - })); + expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), + new TestShardBatchGatewayAllocator())); } public void testRejectsDuplicateExistingShardsAllocatorName() { @@ -308,11 +306,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new HashMap<>() { - { - put(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()); - } - })); + expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), + new TestShardBatchGatewayAllocator())); } private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) {