diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShardsIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShardsIT.java index 590cb66e2d14e..bc15bb79f778e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShardsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShardsIT.java @@ -20,6 +20,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.shard.ShardPath; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; @@ -39,14 +40,14 @@ public class TransportNodesListGatewayStartedBatchShardsIT extends OpenSearchInt public void testSingleShardFetch() throws Exception { String indexName = "test"; - Map shardIdCustomDataPathMap = prepareRequestMap(new String[] { indexName }, 1); + Map shardAttributesMap = prepareRequestMap(new String[] { indexName }, 1); ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); TransportNodesListGatewayStartedBatchShards.NodesGatewayStartedShardsBatch response; response = ActionTestUtils.executeBlocking( internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class), - new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardIdCustomDataPathMap) + new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardAttributesMap) ); final Index index = resolveIndex(indexName); final ShardId shardId = new ShardId(index, 0); @@ -63,7 +64,7 @@ public void testShardFetchMultiNodeMultiIndexes() throws Exception { String indexName1 = "test1"; String indexName2 = "test2"; // assign one primary shard each to the data nodes - Map shardIdCustomDataPathMap = prepareRequestMap( + Map shardAttributesMap = prepareRequestMap( new String[] { indexName1, indexName2 }, internalCluster().numDataNodes() ); @@ -72,7 +73,7 @@ public void testShardFetchMultiNodeMultiIndexes() throws Exception { TransportNodesListGatewayStartedBatchShards.NodesGatewayStartedShardsBatch response; response = ActionTestUtils.executeBlocking( internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class), - new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardIdCustomDataPathMap) + new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardAttributesMap) ); for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { ShardId shardId = clusterSearchShardsGroup.getShardId(); @@ -88,7 +89,7 @@ public void testShardFetchMultiNodeMultiIndexes() throws Exception { public void testShardFetchCorruptedShards() throws Exception { String indexName = "test"; - Map shardIdCustomDataPathMap = prepareRequestMap(new String[] { indexName }, 1); + Map shardAttributes = prepareRequestMap(new String[] { indexName }, 1); ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); final Index index = resolveIndex(indexName); final ShardId shardId = new ShardId(index, 0); @@ -97,7 +98,7 @@ public void testShardFetchCorruptedShards() throws Exception { internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName()); response = ActionTestUtils.executeBlocking( internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class), - new TransportNodesListGatewayStartedBatchShards.Request(getDiscoveryNodes(), shardIdCustomDataPathMap) + new TransportNodesListGatewayStartedBatchShards.Request(getDiscoveryNodes(), shardAttributes) ); DiscoveryNode[] discoveryNodes = getDiscoveryNodes(); TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards nodeGatewayStartedShards = response.getNodesMap() @@ -137,8 +138,8 @@ private void prepareIndex(String indexName, int numberOfPrimaryShards) { flush(indexName); } - private Map prepareRequestMap(String[] indices, int primaryShardCount) { - Map shardIdCustomDataPathMap = new HashMap<>(); + private Map prepareRequestMap(String[] indices, int primaryShardCount) { + Map shardAttributesMap = new HashMap<>(); for (String indexName : indices) { prepareIndex(indexName, primaryShardCount); final Index index = resolveIndex(indexName); @@ -147,10 +148,10 @@ private Map prepareRequestMap(String[] indices, int primaryShar ); for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) { final ShardId shardId = new ShardId(index, shardIdNum); - shardIdCustomDataPathMap.put(shardId, customDataPath); + shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); } } - return shardIdCustomDataPathMap; + return shardAttributesMap; } private void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index d7859fb17bcb6..04166c88a00ad 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -223,7 +223,7 @@ protected synchronized void processAsyncFetch( List failures, long fetchingRound ) { - fetchResponses.add(new Response(shardToCustomDataPath.keySet().iterator().next(), responses, failures)); + fetchResponses.add(new Response(shardAttributesMap.keySet().iterator().next(), responses, failures)); if (expectedOps.countDown()) { finish(); } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index eed46c2940f44..fb187fc04bc93 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -46,6 +46,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -68,6 +69,7 @@ * and once the results are back, it makes sure to schedule a reroute to make sure those results will * be taken into account. * + * It comes in two modes, to single fetch a shard or fetch a batch of shards. * @opensearch.internal */ public abstract class AsyncShardFetch implements Releasable { @@ -76,19 +78,18 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener); + void list(Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener); } protected final Logger logger; protected final String type; - - protected final Map shardToCustomDataPath; + protected final Map shardAttributesMap; private final Lister, T> action; private final Map> cache = new HashMap<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; - private final String logKey; + private final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); private final boolean enableBatchMode; @@ -103,26 +104,35 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - shardToCustomDataPath = new HashMap<>(); - shardToCustomDataPath.put(shardId, customDataPath); + shardAttributesMap =new HashMap<>(); + shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); this.action = (Lister, T>) action; - this.logKey = "ShardId=[" + shardId.toString() + "]"; + this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; enableBatchMode = false; } + /** + * Added to fetch a batch of shards from nodes + * + * @param logger Logger + * @param type type of action + * @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a + * @param action Transport Action + * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification + */ @SuppressWarnings("unchecked") protected AsyncShardFetch( Logger logger, String type, - Map shardToCustomDataPath, + Map shardAttributesMap, Lister, T> action, String batchId ) { this.logger = logger; this.type = type; - this.shardToCustomDataPath = shardToCustomDataPath; + this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; - this.logKey = "BatchID=[" + batchId + "]"; + this.reroutingKey = "BatchID=[" + batchId+ "]"; enableBatchMode = true; } @@ -153,15 +163,19 @@ public synchronized int getNumberOfInFlightFetches() { */ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { if (closed) { - throw new IllegalStateException(logKey + ": can't fetch data on closed async fetch"); + throw new IllegalStateException(reroutingKey + ": can't fetch data on closed async fetch"); } if (enableBatchMode == false) { // we will do assertions here on ignoreNodes - assert ignoreNodes.size() <= 1 : "Can only have at-most one shard"; - if (ignoreNodes.size() == 1) { - assert shardToCustomDataPath.containsKey(ignoreNodes.keySet().iterator().next()) - : "ShardId should be same as initialised in fetcher"; + if (ignoreNodes.size() > 1) { + throw new IllegalStateException("Fetching Shard Data, " + reroutingKey + "Can only have atmost one shard" + + "for non-batch mode" ); + } + if(ignoreNodes.size() == 1) { + if (shardAttributesMap.containsKey(ignoreNodes.keySet().iterator().next())) { + throw new IllegalStateException("Shard Id must be same as initialized in AsyncShardFetch. Expecting = " + reroutingKey); + } } } @@ -221,16 +235,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { - reroute( - logKey, - "nodes failed [" - + failedNodes.size() - + "], ignored [" - + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() - + "]" - ); + + if (failedNodes.isEmpty() == false || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute(reroutingKey, "nodes failed [" + failedNodes.size() + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); } return new FetchResult<>(fetchData, allIgnoreNodesMap); @@ -246,10 +254,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map responses, List failures, long fetchingRound) { if (closed) { // we are closed, no need to process this async fetch at all - logger.trace("{} ignoring fetched [{}] results, already closed", logKey, type); + logger.trace("{} ignoring fetched [{}] results, already closed", reroutingKey, type); return; } - logger.trace("{} processing fetched [{}] results", logKey, type); + logger.trace("{} processing fetched [{}] results", reroutingKey, type); if (responses != null) { for (T response : responses) { @@ -259,7 +267,7 @@ protected synchronized void processAsyncFetch(List responses, List fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -268,14 +276,14 @@ protected synchronized void processAsyncFetch(List responses, List responses, List nodeEntry = cache.get(failure.nodeId()); if (nodeEntry != null) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -308,7 +316,7 @@ protected synchronized void processAsyncFetch(List responses, List new ParameterizedMessage( "{}: failed to list shard for {} on node [{}]", - logKey, + reroutingKey, type, failure.nodeId() ), @@ -320,7 +328,7 @@ protected synchronized void processAsyncFetch(List responses, List> shardCache) { */ // visible for testing void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { - logger.trace("{} fetching [{}] from {}", logKey, type, nodes); - action.list(shardToCustomDataPath, nodes, new ActionListener>() { + logger.trace("{} fetching [{}] from {}", reroutingKey, type, nodes); + action.list(shardAttributesMap, nodes, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 452d31186cc4e..9ce7ad9953d7f 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -57,6 +57,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; @@ -331,10 +332,9 @@ else if (shardRouting.primary() == primary) { while (iterator.hasNext()) { ShardRouting currentShard = iterator.next(); if (batchSize > 0) { - ShardEntry sharEntry = new ShardEntry( - IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()), - currentShard - ); + ShardEntry sharEntry = new ShardEntry(new ShardAttributes(currentShard.shardId(), + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings())) + , currentShard); addToCurrentBatch.put(currentShard.shardId(), sharEntry); batchSize--; iterator.remove(); @@ -505,8 +505,8 @@ private static void clearCacheForPrimary( AsyncShardFetch fetch, RoutingAllocation allocation ) { - assert fetch.shardToCustomDataPath.size() == 1 : "expected only one shard"; - ShardId shardId = fetch.shardToCustomDataPath.keySet().iterator().next(); + assert fetch.shardAttributesMap.size() == 1 : "expected only one shard"; + ShardId shardId = fetch.shardAttributesMap.keySet().iterator().next(); ShardRouting primary = allocation.routingNodes().activePrimary(shardId); if (primary != null) { fetch.clearCacheForNode(primary.currentNodeId()); @@ -552,42 +552,40 @@ class InternalAsyncFetch extends AsyncShardFetch } @Override - protected void reroute(String logKey, String reason) { - logger.trace("{} scheduling reroute for {}", logKey, reason); + protected void reroute(String reroutingKey, String reason) { + logger.trace("{} scheduling reroute for {}", reroutingKey, reason); assert rerouteService != null; rerouteService.reroute( "async_shard_fetch", Priority.HIGH, ActionListener.wrap( - r -> logger.trace("{} scheduled reroute completed for {}", logKey, reason), - e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", logKey, reason), e) + r -> logger.trace("{} scheduled reroute completed for {}", reroutingKey, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", reroutingKey, reason), e) ) ); } } class InternalBatchAsyncFetch extends AsyncShardFetch { - - InternalBatchAsyncFetch( - Logger logger, - String type, - Map map, - AsyncShardFetch.Lister, T> action, - String batchUUId + InternalBatchAsyncFetch(Logger logger, + String type, + Map map, + AsyncShardFetch.Lister, T> action, + String batchUUId ) { super(logger, type, map, action, batchUUId); } @Override - protected void reroute(String logKey, String reason) { - logger.trace("{} scheduling reroute for {}", logKey, reason); + protected void reroute(String reroutingKey, String reason) { + logger.trace("{} scheduling reroute for {}", reroutingKey, reason); assert rerouteService != null; rerouteService.reroute( "async_shard_batch_fetch", Priority.HIGH, ActionListener.wrap( - r -> logger.trace("{} scheduled reroute completed for {}", logKey, reason), - e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", logKey, reason), e) + r -> logger.trace("{} scheduled reroute completed for {}", reroutingKey, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", reroutingKey, reason), e) ) ); } @@ -670,7 +668,7 @@ protected AsyncShardFetch.FetchResult> shardToIgnoreNodes = new HashMap<>(); - for (ShardId shardId : shardsBatch.asyncBatch.shardToCustomDataPath.keySet()) { + for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); } AsyncShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); @@ -767,7 +765,7 @@ protected AsyncShardFetch.FetchResult fetchData( return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); } Map> shardToIgnoreNodes = new HashMap<>(); - for (ShardId shardId : shardsBatch.asyncBatch.shardToCustomDataPath.keySet()) { + for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); } AsyncShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); @@ -806,9 +804,10 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool this.batchId = batchId; this.batchInfo = new HashMap<>(shardsWithInfo); // create a ShardId -> customDataPath map for async fetch - Map shardIdsMap = batchInfo.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getCustomDataPath())); + Map shardIdsMap = batchInfo.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().getShardAttributes() + )); this.primary = primary; if (primary) { asyncBatch = new InternalBatchAsyncFetch<>(logger, "batch_shards_started", shardIdsMap, batchStartedAction, batchId); @@ -820,9 +819,9 @@ public ShardsBatch(String batchId, Map shardsWithInfo, bool private void removeFromBatch(ShardRouting shard) { batchInfo.remove(shard.shardId()); - asyncBatch.shardToCustomDataPath.remove(shard.shardId()); + asyncBatch.shardAttributesMap.remove(shard.shardId()); // assert that fetcher and shards are the same as batched shards - assert batchInfo.size() == asyncBatch.shardToCustomDataPath.size() : "Shards size is not equal to fetcher size"; + assert batchInfo.size() == asyncBatch.shardAttributesMap.size() : "Shards size is not equal to fetcher size"; } public Set getBatchedShardRoutings() { @@ -874,7 +873,7 @@ public String toString() { */ private class ShardEntry { - private final String customDataPath; + private final ShardAttributes shardAttributes; public ShardEntry setShardRouting(ShardRouting shardRouting) { this.shardRouting = shardRouting; @@ -883,8 +882,8 @@ public ShardEntry setShardRouting(ShardRouting shardRouting) { private ShardRouting shardRouting; - public ShardEntry(String customDataPath, ShardRouting shardRouting) { - this.customDataPath = customDataPath; + public ShardEntry(ShardAttributes shardAttributes, ShardRouting shardRouting) { + this.shardAttributes = shardAttributes; this.shardRouting = shardRouting; } @@ -892,8 +891,8 @@ public ShardRouting getShardRouting() { return shardRouting; } - public String getCustomDataPath() { - return customDataPath; + public ShardAttributes getShardAttributes() { + return shardAttributes; } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShards.java index 63094352ed175..1066d85d781a0 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedBatchShards.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; /** * This transport action is used to fetch all unassigned shard version from each node during primary allocation in {@link GatewayAllocator}. @@ -107,11 +106,11 @@ public TransportNodesListGatewayStartedBatchShards( @Override public void list( - Map shardIdsWithCustomDataPath, + Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener ) { - execute(new Request(nodes, shardIdsWithCustomDataPath), listener); + execute(new Request(nodes, shardAttributesMap), listener); } @Override @@ -143,7 +142,7 @@ protected NodesGatewayStartedShardsBatch newResponse( @Override protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { Map shardsOnNode = new HashMap<>(); - for (ShardAttributes shardAttr : request.shardAttributes) { + for (ShardAttributes shardAttr: request.shardAttributes.values()) { final ShardId shardId = shardAttr.getShardId(); try { logger.trace("{} loading local shard state info", shardId); @@ -229,29 +228,25 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { * @opensearch.internal */ public static class Request extends BaseNodesRequest { - private final List shardAttributes; + private final Map shardAttributes; public Request(StreamInput in) throws IOException { super(in); - shardAttributes = in.readList(ShardAttributes::new); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); } - public Request(DiscoveryNode[] nodes, Map shardIdStringMap) { + public Request(DiscoveryNode[] nodes, Map shardAttributes) { super(nodes); - this.shardAttributes = Objects.requireNonNull(shardIdStringMap) - .entrySet() - .stream() - .map(entry -> new ShardAttributes(entry.getKey(), entry.getValue())) - .collect(Collectors.toList()); + this.shardAttributes = Objects.requireNonNull(shardAttributes); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(shardAttributes); + out.writeMap(shardAttributes, (o,k)-> k.writeTo(o), (o,v) -> v.writeTo(o)); } - public List getShardAttributes() { + public Map getShardAttributes() { return shardAttributes; } } @@ -294,11 +289,11 @@ protected void writeNodesTo(StreamOutput out, List shardAttributes; + private final Map shardAttributes; public NodeRequest(StreamInput in) throws IOException { super(in); - shardAttributes = in.readList(ShardAttributes::new); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); } public NodeRequest(Request request) { @@ -308,7 +303,7 @@ public NodeRequest(Request request) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(shardAttributes); + out.writeMap(shardAttributes, (o,k)-> k.writeTo(o), (o,v) -> v.writeTo(o)); } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 03ba5fd62eb6a..d90be0449470d 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -62,6 +62,7 @@ import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; @@ -125,14 +126,10 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list( - Map shardIdsWithCustomDataPath, - DiscoveryNode[] nodes, - ActionListener listener - ) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; - final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); - final String customDataPath = shardIdsWithCustomDataPath.get(shardId); + public void list(Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener) { + assert shardAttributesMap.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributesMap.keySet().iterator().next(); + final String customDataPath = shardAttributesMap.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 8d9df22ef360e..5341305cc83d8 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -121,14 +121,10 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list( - Map shardIdsWithCustomDataPath, - DiscoveryNode[] nodes, - ActionListener listener - ) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; - final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); - final String customDataPath = shardIdsWithCustomDataPath.get(shardId); + public void list(Map shardAttributes, DiscoveryNode[] nodes, ActionListener listener) { + assert shardAttributes.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributes.keySet().iterator().next(); + final String customDataPath = shardAttributes.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index f851b7e2abff2..c297debfdf626 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -47,7 +47,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Transport action for fetching the batch of shard stores Metadata from a list of transport nodes @@ -102,20 +101,11 @@ public TransportNodesListShardStoreMetadataBatch( @Override public void list( - Map shardIdsWithCustomDataPath, + Map shardAttributes, DiscoveryNode[] nodes, ActionListener listener ) { - execute( - new TransportNodesListShardStoreMetadataBatch.Request( - shardIdsWithCustomDataPath.entrySet() - .stream() - .map(entry -> new ShardAttributes(entry.getKey(), entry.getValue())) - .collect(Collectors.toList()), - nodes - ), - listener - ); + execute(new TransportNodesListShardStoreMetadataBatch.Request(shardAttributes, nodes), listener); } @Override @@ -142,10 +132,7 @@ protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { try { return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request)); } catch (IOException e) { - throw new OpenSearchException( - "Failed to list store metadata for shards [" + request.getShardAttributes().stream().map(ShardAttributes::getShardId) + "]", - e - ); + throw new OpenSearchException("Failed to list store metadata for shards [" + request.getShardAttributes().keySet().stream().map(ShardId::toString) + "]", e); } } @@ -155,7 +142,7 @@ protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { */ private Map listStoreMetadata(NodeRequest request) throws IOException { Map shardStoreMetadataMap = new HashMap(); - for (ShardAttributes shardAttributes : request.getShardAttributes()) { + for (ShardAttributes shardAttributes : request.getShardAttributes().values()) { final ShardId shardId = shardAttributes.getShardId(); logger.trace("listing store meta data for {}", shardId); long startTimeNS = System.nanoTime(); @@ -281,26 +268,26 @@ private Map listStoreMetadata(NodeRequest reque */ public static class Request extends BaseNodesRequest { - private final List shardAttributes; + private final Map shardAttributes; public Request(StreamInput in) throws IOException { super(in); - shardAttributes = in.readList(ShardAttributes::new); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); } - public Request(List shardAttributes, DiscoveryNode[] nodes) { + public Request(Map shardAttributes, DiscoveryNode[] nodes) { super(nodes); this.shardAttributes = Objects.requireNonNull(shardAttributes); } - public List getShardAttributes() { + public Map getShardAttributes() { return shardAttributes; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(shardAttributes); + out.writeMap(shardAttributes, (o,k)-> k.writeTo(o), (o,v) -> v.writeTo(o)); } } @@ -398,11 +385,11 @@ public String toString() { */ public static class NodeRequest extends TransportRequest { - private final List shardAttributes; + private final Map shardAttributes; public NodeRequest(StreamInput in) throws IOException { super(in); - shardAttributes = in.readList(ShardAttributes::new); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); } public NodeRequest(Request request) { @@ -412,10 +399,10 @@ public NodeRequest(Request request) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(shardAttributes); + out.writeMap(shardAttributes, (o,k)-> k.writeTo(o), (o,v) -> v.writeTo(o)); } - public List getShardAttributes() { + public Map getShardAttributes() { return shardAttributes; } } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index db97c3ece94ba..d4570db599a6c 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -46,6 +47,7 @@ import org.junit.Before; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -84,7 +86,16 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); this.threadPool = new TestThreadPool(getTestName()); - this.test = new TestFetch(threadPool); + if (randomBoolean()) { + this.test = new TestFetch(threadPool); + } else { + HashMap shardToCustomDataPath = new HashMap<>(); + ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); + ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); + shardToCustomDataPath.put(shardId0,new ShardAttributes(shardId0, "")); + shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); + this.test = new TestFetch(threadPool, shardToCustomDataPath); + } } @After @@ -403,6 +414,12 @@ static class Entry { this.threadPool = threadPool; } + TestFetch(ThreadPool threadPool, Map shardAttributesMap) { + super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); + this.threadPool = threadPool; + } + + public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); }