diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index ba03532a9aa2f..26714f9a65329 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -56,7 +56,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.MergePolicyProvider; @@ -818,9 +818,9 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception { .get(discoveryNodes[0].getId()) .getNodeGatewayStartedShardsBatch() .get(shardId); - assertNotNull(gatewayStartedShard.storeException()); - assertNotNull(gatewayStartedShard.allocationId()); - assertTrue(gatewayStartedShard.primary()); + assertNotNull(gatewayStartedShard.get().storeException()); + assertNotNull(gatewayStartedShard.get().allocationId()); + assertTrue(gatewayStartedShard.get().primary()); } public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException { @@ -949,9 +949,9 @@ private void assertNodeStoreFilesMetadataSuccessCase( } private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) { - assertNull(gatewayStartedShard.storeException()); - assertNotNull(gatewayStartedShard.allocationId()); - assertTrue(gatewayStartedShard.primary()); + assertNull(gatewayStartedShard.get().storeException()); + assertNotNull(gatewayStartedShard.get().allocationId()); + assertTrue(gatewayStartedShard.get().primary()); } private void prepareIndex(String indexName, int numberOfPrimaryShards) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 9b961b71653c2..18133f9b4bd1a 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -194,8 +194,8 @@ public Map getCacheData(DiscoveryNodes nodes, Set fail */ private void fillReverseIdMap() { arrayToShardId.clear(); - for (ShardId shardId : shardIdToArray.keySet()) { - arrayToShardId.putIfAbsent(shardIdToArray.get(shardId), shardId); + for (Map.Entry indexMapping : shardIdToArray.entrySet()) { + arrayToShardId.putIfAbsent(indexMapping.getValue(), indexMapping.getKey()); } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java deleted file mode 100644 index 8d222903b6f29..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway; - -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; -import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.gateway.AsyncShardFetch.FetchResult; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is - * that it can allocate multiple unassigned primary shards wherein PrimaryShardAllocator can only allocate single - * unassigned shard. - * The primary shard batch allocator allocates multiple unassigned primary shards to nodes that hold - * valid copies of the unassigned primaries. It does this by iterating over all unassigned - * primary shards in the routing table and fetching shard metadata from each node in the cluster - * that holds a copy of the shard. The shard metadata from each node is compared against the - * set of valid allocation IDs and for all valid shard copies (if any), the primary shard batch allocator - * executes the allocation deciders to chose a copy to assign the primary shard to. - *

- * Note that the PrimaryShardBatchAllocator does *not* allocate primaries on index creation - * (see {@link org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}), - * nor does it allocate primaries when a primary shard failed and there is a valid replica - * copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}. - * - * @opensearch.internal - */ -public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator { - - abstract protected FetchResult fetchData( - List eligibleShards, - List inEligibleShards, - RoutingAllocation allocation - ); - - protected FetchResult fetchData( - ShardRouting shard, - RoutingAllocation allocation - ) { - logger.error("fetchData for single shard called via batch allocator, shard id {}", shard.shardId()); - throw new IllegalStateException("PrimaryShardBatchAllocator should only be used for a batch of shards"); - } - - @Override - public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) { - return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard); - } - - /** - * Build allocation decisions for all the shards present in the batch identified by batchId. - * - * @param shards set of shards given for allocation - * @param allocation current allocation of all the shards - * @param logger logger used for logging - * @return shard to allocation decision map - */ - @Override - public HashMap makeAllocationDecision( - List shards, - RoutingAllocation allocation, - Logger logger - ) { - HashMap shardAllocationDecisions = new HashMap<>(); - List eligibleShards = new ArrayList<>(); - List inEligibleShards = new ArrayList<>(); - // identify ineligible shards - for (ShardRouting shard : shards) { - AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation); - if (decision != null) { - inEligibleShards.add(shard); - shardAllocationDecisions.put(shard, decision); - } else { - eligibleShards.add(shard); - } - } - // Do not call fetchData if there are no eligible shards - if (eligibleShards.isEmpty()) { - return shardAllocationDecisions; - } - // only fetch data for eligible shards - final FetchResult shardsState = fetchData(eligibleShards, inEligibleShards, allocation); - - // process the received data - for (ShardRouting unassignedShard : eligibleShards) { - List nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); - // get allocation decision for this shard - shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger)); - } - return shardAllocationDecisions; - } - - /** - * Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}. - *

- * Returns null if {@link FetchResult} does not have any data. - *

- * shardsState contain the Data, there key is DiscoveryNode but value is Map of ShardId - * and NodeGatewayStartedShardsBatch so to get one shard level data (from all the nodes), we'll traverse the map - * and construct the nodeShardState along the way before making any allocation decision. As metadata for a - * particular shard is needed from all the discovery nodes. - * - * @param unassignedShard unassigned shard - * @param shardsState fetch data result for the whole batch - * @return shard state returned from each node - */ - private static List adaptToNodeShardStates( - ShardRouting unassignedShard, - FetchResult shardsState - ) { - if (!shardsState.hasData()) { - return null; - } - List nodeShardStates = new ArrayList<>(); - Map nodeResponses = shardsState.getData(); - - // build data for a shard from all the nodes - nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { - TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch - .getNodeGatewayStartedShardsBatch() - .get(unassignedShard.shardId()); - nodeShardStates.add( - new NodeGatewayStartedShard( - shardData.allocationId(), - shardData.primary(), - shardData.replicationCheckpoint(), - shardData.storeException(), - node - ) - ); - }); - return nodeShardStates; - } -} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index 1f16f784515d5..914119185ddd8 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -240,6 +240,10 @@ public static Boolean isEmpty(GatewayStartedShard response) { && response.storeException() == null && response.replicationCheckpoint() == null; } + + public Boolean isEmpty() { + return allocationId == null && primary == false && storeException == null && replicationCheckpoint == null; + } } /** diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index 70e7df785155f..f187b97b82319 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -26,7 +26,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; @@ -141,15 +140,18 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { try { shardsOnNode.put( shardId, - getShardInfoOnLocalNode( - logger, - shardId, - namedXContentRegistry, - nodeEnv, - indicesService, - shardAttr.getValue().getCustomDataPath(), - settings, - clusterService + new GatewayStartedShard( + getShardInfoOnLocalNode( + logger, + shardId, + namedXContentRegistry, + nodeEnv, + indicesService, + shardAttr.getValue().getCustomDataPath(), + settings, + clusterService + ), + null ) ); } catch (Exception e) { @@ -158,7 +160,10 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { shardsOnNode.put(shardId, null); } else { // return actual exception as it is for unknown exceptions - shardsOnNode.put(shardId, new GatewayStartedShard(null, false, null, e)); + shardsOnNode.put( + shardId, + new GatewayStartedShard(new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), e) + ); } } } @@ -251,6 +256,50 @@ public void writeTo(StreamOutput out) throws IOException { } } + public static class GatewayStartedShard { + private final TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard; + private final Exception transportError; + + public GatewayStartedShard( + TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard, + Exception transportError + ) { + this.gatewayStartedShard = gatewayStartedShard; + this.transportError = transportError; + } + + public GatewayStartedShard(StreamInput in) throws IOException { + this.gatewayStartedShard = new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(in); + if (in.readBoolean()) { + this.transportError = in.readException(); + } else { + this.transportError = null; + } + } + + public void writeTo(StreamOutput out) throws IOException { + gatewayStartedShard.writeTo(out); + if (transportError != null) { + out.writeBoolean(true); + out.writeException(transportError); + } else { + out.writeBoolean(false); + } + } + + public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) { + return gatewayStartedShard.get().isEmpty() && gatewayStartedShard.getTransportError() == null; + } + + public Exception getTransportError() { + return transportError; + } + + public TransportNodesGatewayStartedShardHelper.GatewayStartedShard get() { + return gatewayStartedShard; + } + } + /** * This is the response from a single node, this is used in {@link NodesGatewayStartedShardsBatch} for creating * node to its response mapping for this transport request. diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java deleted file mode 100644 index 4796def2b8902..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.gateway; - -import org.apache.lucene.codecs.Codec; -import org.opensearch.Version; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.OpenSearchAllocationTestCase; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; -import org.opensearch.cluster.routing.allocation.AllocationDecision; -import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.opensearch.common.Nullable; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.set.Sets; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.env.Environment; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.codec.CodecService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.test.IndexSettingsModule; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; - -public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase { - - private final ShardId shardId = new ShardId("test", "_na_", 0); - private static Set shardsInBatch; - private final DiscoveryNode node1 = newNode("node1"); - private final DiscoveryNode node2 = newNode("node2"); - private final DiscoveryNode node3 = newNode("node3"); - private TestBatchAllocator batchAllocator; - - public static void setUpShards(int numberOfShards) { - shardsInBatch = new HashSet<>(); - for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { - ShardId shardId = new ShardId("test", "_na_", shardNumber); - shardsInBatch.add(shardId); - } - } - - @Before - public void buildTestAllocator() { - this.batchAllocator = new TestBatchAllocator(); - } - - private void allocateAllUnassigned(final RoutingAllocation allocation) { - final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - while (iterator.hasNext()) { - batchAllocator.allocateUnassigned(iterator.next(), allocation, iterator); - } - } - - private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { - final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - List shardsToBatch = new ArrayList<>(); - while (iterator.hasNext()) { - shardsToBatch.add(iterator.next()); - } - batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation); - } - - public void testMakeAllocationDecisionDataFetching() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - - List shards = new ArrayList<>(); - allocateAllUnassignedBatch(allocation); - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision()); - } - - public void testMakeAllocationDecisionForReplicaShard() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - - List replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards(); - List shards = new ArrayList<>(replicaShards); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken()); - } - - public void testMakeAllocationDecisionDataFetched() { - final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); - - List shards = new ArrayList<>(); - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, new ArrayList<>(allDecisions.keySet())); - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); - } - - public void testMakeAllocationDecisionDataFetchedMultipleShards() { - setUpShards(2); - final RoutingAllocation allocation = routingAllocationWithMultiplePrimaries( - noAllocationDeciders(), - CLUSTER_RECOVERED, - 2, - 0, - "allocId-0", - "allocId-1" - ); - List shards = new ArrayList<>(); - for (ShardId shardId : shardsInBatch) { - ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard().recoverySource(); - shards.add(shard); - batchAllocator.addShardData( - node1, - "allocId-" + shardId.id(), - shardId, - true, - new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()), - null - ); - } - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(new HashSet<>(shards), allDecisions.keySet()); - for (ShardRouting shard : shards) { - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); - } - } - - private RoutingAllocation routingAllocationWithOnePrimary( - AllocationDeciders deciders, - UnassignedInfo.Reason reason, - String... activeAllocationIds - ) { - Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(1) - .putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)) - ) - .build(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - switch (reason) { - - case INDEX_CREATED: - routingTableBuilder.addAsNew(metadata.index(shardId.getIndex())); - break; - case CLUSTER_RECOVERED: - routingTableBuilder.addAsRecovery(metadata.index(shardId.getIndex())); - break; - case INDEX_REOPENED: - routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardId.getIndex())); - break; - default: - throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); - } - ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTableBuilder.build()) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); - } - - private RoutingAllocation routingAllocationWithMultiplePrimaries( - AllocationDeciders deciders, - UnassignedInfo.Reason reason, - int numberOfShards, - int replicas, - String... activeAllocationIds - ) { - Iterator shardIterator = shardsInBatch.iterator(); - Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings(settings(Version.CURRENT)) - .numberOfShards(numberOfShards) - .numberOfReplicas(replicas) - .putInSyncAllocationIds(shardIterator.next().id(), Sets.newHashSet(activeAllocationIds[0])) - .putInSyncAllocationIds(shardIterator.next().id(), Sets.newHashSet(activeAllocationIds[1])) - ) - .build(); - - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - for (ShardId shardIdFromBatch : shardsInBatch) { - switch (reason) { - case INDEX_CREATED: - routingTableBuilder.addAsNew(metadata.index(shardIdFromBatch.getIndex())); - break; - case CLUSTER_RECOVERED: - routingTableBuilder.addAsRecovery(metadata.index(shardIdFromBatch.getIndex())); - break; - case INDEX_REOPENED: - routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardIdFromBatch.getIndex())); - break; - default: - throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); - } - } - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTableBuilder.build()) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); - } - - class TestBatchAllocator extends PrimaryShardBatchAllocator { - - private Map data; - - public TestBatchAllocator clear() { - data = null; - return this; - } - - public TestBatchAllocator addData( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint - ) { - return addData(node, allocationId, primary, replicationCheckpoint, null); - } - - public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary) { - Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); - return addData( - node, - allocationId, - primary, - ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()), - null - ); - } - - public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) { - Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); - return addData( - node, - allocationId, - primary, - ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()), - storeException - ); - } - - public TestBatchAllocator addData( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - @Nullable Exception storeException - ) { - if (data == null) { - data = new HashMap<>(); - } - Map shardData = Map.of( - shardId, - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ) - ); - data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); - return this; - } - - public TestBatchAllocator addShardData( - DiscoveryNode node, - String allocationId, - ShardId shardId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - @Nullable Exception storeException - ) { - if (data == null) { - data = new HashMap<>(); - } - Map shardData = new HashMap<>(); - shardData.put( - shardId, - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ) - ); - if (data.get(node) != null) shardData.putAll(data.get(node).getNodeGatewayStartedShardsBatch()); - data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); - return this; - } - - @Override - protected AsyncShardFetch.FetchResult fetchData( - List shardsEligibleForFetch, - List inEligibleShards, - RoutingAllocation allocation - ) { - return new AsyncShardFetch.FetchResult<>(data, Collections.>emptyMap()); - } - } -} diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 647b85a9a1a34..085793162b3c4 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -16,7 +16,7 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import org.opensearch.indices.store.ShardAttributes; @@ -53,9 +53,9 @@ public void setupShardBatchCache(String batchId, int numberOfShards) { GatewayStartedShard.class, NodeGatewayStartedShardsBatch::new, NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch, - () -> new GatewayStartedShard(null, false, null, null), + () -> new GatewayStartedShard(new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), null), this::removeShard, - GatewayStartedShard::storeException, + GatewayStartedShard::getTransportError, GatewayStartedShard::isEmpty ); } @@ -128,7 +128,7 @@ public void testPutData() { ); assertEquals(2, fetchData.size()); assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); - assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).allocationId()); + assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).get().allocationId()); assertEquals(10, fetchData.get(node2).getNodeGatewayStartedShardsBatch().size()); assertTrue(GatewayStartedShard.isEmpty(fetchData.get(node2).getNodeGatewayStartedShardsBatch().get(shard))); @@ -176,10 +176,22 @@ private Map getPrimaryResponse(List shard shardData.put(shard, null); break; case EMPTY: - shardData.put(shard, new GatewayStartedShard(null, false, null, null)); + shardData.put( + shard, + new GatewayStartedShard( + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), + null + ) + ); break; case VALID: - shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); + shardData.put( + shard, + new GatewayStartedShard( + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), + null + ) + ); break; default: throw new AssertionError("unknown response type"); @@ -195,10 +207,19 @@ private Map getFailedPrimaryResponse(List if (failedShardsCount-- > 0) { shardData.put( shard, - new GatewayStartedShard("alloc-" + allocationId++, false, null, new OpenSearchRejectedExecutionException()) + new GatewayStartedShard( + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), + new OpenSearchRejectedExecutionException() + ) ); } else { - shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); + shardData.put( + shard, + new GatewayStartedShard( + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), + null + ) + ); } } return shardData;