From 65229fce55e24f250d93204ecab836d40c75c827 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 9 Apr 2024 13:46:08 +0530 Subject: [PATCH] Cleanup java doc and review comments changes Signed-off-by: Aman Khare --- .../gateway/AsyncShardBatchFetch.java | 44 ++++++++++--------- ...ansportNodesGatewayStartedShardHelper.java | 2 +- .../gateway/ShardBatchCacheTests.java | 8 ++-- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 8a292e0925686..ca769235a8053 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -20,14 +20,16 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.function.Function; +import java.util.function.Predicate; + +import reactor.util.annotation.NonNull; /** * Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch * part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here. - * Cleanup of failed shards is necessary in a batch and based on that a reroute should be triggered to take care of - * those in the next run. This separation also takes care of the extra generic type V which is only needed for batch - * transport actions like {@link TransportNodesListGatewayStartedShardsBatch}. + * This separation also takes care of the extra generic type V which is only needed for batch + * transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}. * * @param Response type of the transport action. * @param Data type of shard level response. @@ -44,8 +46,8 @@ public abstract class AsyncShardBatchFetch extend AsyncShardFetch.Lister, T> action, String batchId, Class clazz, - V emptyResponse, - Function isEmptyResponse, + V emptyShardResponse, + Predicate emptyShardResponsePredicate, ShardBatchResponseFactory responseFactory ) { super( @@ -60,8 +62,8 @@ public abstract class AsyncShardBatchFetch extend shardAttributesMap, "BatchID=[" + batchId + "]", clazz, - emptyResponse, - isEmptyResponse, + emptyShardResponse, + emptyShardResponsePredicate, responseFactory ) ); @@ -82,7 +84,8 @@ public void clearShard(ShardId shardId) { * Cache implementation of transport actions returning batch of shards related data in the response. * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching - * approach. + * approach. This cache class is not thread safe, all of its methods are being called from + * {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads. * * @param Response type of transport action. * @param Data type of shard level response. @@ -94,7 +97,7 @@ static class ShardBatchCache extends AsyncShardFe private final Class shardResponseClass; private final ShardBatchResponseFactory responseFactory; private final V emptyResponse; - private final Function isEmpty; + private final Predicate emptyShardResponsePredicate; private final Logger logger; public ShardBatchCache( @@ -104,12 +107,12 @@ public ShardBatchCache( String logKey, Class clazz, V emptyResponse, - Function isEmptyResponse, + Predicate emptyShardResponsePredicate, ShardBatchResponseFactory responseFactory ) { super(Loggers.getLogger(logger, "_" + logKey), type); this.batchSize = shardAttributesMap.size(); - this.isEmpty = isEmptyResponse; + this.emptyShardResponsePredicate = emptyShardResponsePredicate; cache = new HashMap<>(); shardIdToArray = new HashMap<>(); fillShardIdKeys(shardAttributesMap.keySet()); @@ -120,6 +123,7 @@ public ShardBatchCache( } @Override + @NonNull public Map getCache() { return cache; } @@ -136,7 +140,7 @@ public void deleteShard(ShardId shardId) { @Override public void initData(DiscoveryNode node) { - cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, isEmpty)); + cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate)); } /** @@ -163,9 +167,9 @@ private HashMap getBatchData(NodeEntry nodeEntry) { V[] nodeShardEntries = nodeEntry.getData(); boolean[] emptyResponses = nodeEntry.getEmptyShardResponse(); HashMap shardData = new HashMap<>(); - for (Map.Entry shardIdIndex : shardIdToArray.entrySet()) { - ShardId shardId = shardIdIndex.getKey(); - Integer arrIndex = shardIdIndex.getValue(); + for (Map.Entry shardIdEntry : shardIdToArray.entrySet()) { + ShardId shardId = shardIdEntry.getKey(); + Integer arrIndex = shardIdEntry.getValue(); if (emptyResponses[arrIndex]) { shardData.put(shardId, emptyResponse); } else if (nodeShardEntries[arrIndex] != null) { @@ -194,13 +198,13 @@ static class NodeEntry extends BaseNodeEntry { // actually needed in allocation/explain API response. So instead of storing full empty response object // in cache, it's better to just store a boolean and create that object on the fly just before // decision-making. - private final Function isEmpty; + private final Predicate emptyShardResponsePredicate; - NodeEntry(String nodeId, Class clazz, int batchSize, Function isEmptyResponse) { + NodeEntry(String nodeId, Class clazz, int batchSize, Predicate emptyShardResponsePredicate) { super(nodeId); this.shardData = (V[]) Array.newInstance(clazz, batchSize); this.emptyShardResponse = new boolean[batchSize]; - this.isEmpty = isEmptyResponse; + this.emptyShardResponsePredicate = emptyShardResponsePredicate; } void doneFetching(Map shardDataFromNode, Map shardIdKey) { @@ -225,7 +229,7 @@ private void fillShardData(Map shardDataFromNode, Map shardData : shardDataFromNode.entrySet()) { if (shardData.getValue() != null) { ShardId shardId = shardData.getKey(); - if (isEmpty.apply(shardData.getValue())) { + if (emptyShardResponsePredicate.test(shardData.getValue())) { this.emptyShardResponse[shardIdKey.get(shardId)] = true; this.shardData[shardIdKey.get(shardId)] = null; } else { diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index 71c2c2cd2a444..2ddae1d8410c9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -233,7 +233,7 @@ public String toString() { return buf.toString(); } - public static Boolean isEmpty(GatewayStartedShard gatewayStartedShard) { + public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) { return gatewayStartedShard.allocationId() == null && gatewayStartedShard.primary() == false && gatewayStartedShard.storeException() == null diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index e6f6f43a03fe8..1b42a31a4fd84 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -119,6 +119,10 @@ public void testPutData() { this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.VALID))); this.shardCache.putData(node2, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.EMPTY))); + // assert that fetching is done as both node's responses are stored in cache + assertFalse(this.shardCache.getCache().get(node1.getId()).isFetching()); + assertFalse(this.shardCache.getCache().get(node2.getId()).isFetching()); + Map fetchData = shardCache.getCacheData( DiscoveryNodes.builder().add(node1).add(node2).build(), null @@ -201,10 +205,6 @@ private Map getFailedPrimaryResponse(List return shardData; } - public void removeShard(ShardId shardId) { - // batchInfo.remove(shardId); - } - private void fillShards(Map shardAttributesMap, int numberOfShards) { shardsInBatch = BatchTestUtil.setUpShards(numberOfShards); for (ShardId shardId : shardsInBatch) {