From dca5b7ce8018578cc19c1f80dcff5b05235f9811 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 5 Mar 2024 17:58:48 +0530 Subject: [PATCH] Add NodeCache interface to make code more readable and remove failed shard handling from single shard cache Signed-off-by: Aman Khare --- .../opensearch/gateway/BaseShardCache.java | 78 ++++++------------- .../org/opensearch/gateway/NodeCache.java | 71 +++++++++++++++++ .../org/opensearch/gateway/ShardCache.java | 15 ++-- 3 files changed, 99 insertions(+), 65 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/NodeCache.java diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index bc8df9043be2b..d0fe209bbd00f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -17,7 +17,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -27,19 +26,13 @@ import java.util.Map; import java.util.Set; -import reactor.util.annotation.NonNull; - /** * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. - * Setting up the cache is required from implementation class. While set up, we need 3 functionalities from the user. - * initData : how to initialize an entry of shard cache for a node. - * putData : how to store the response of transport action in the cache. - * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or - * {@link ReplicaShardAllocator} + * Setting up the cache is required from implementation class. * * @param Response type of transport action which has the data to be stored in the cache. */ -public abstract class BaseShardCache { +public abstract class BaseShardCache implements NodeCache { private final Logger logger; private final String logKey; private final String type; @@ -50,44 +43,10 @@ protected BaseShardCache(Logger logger, String logKey, String type) { this.type = type; } - /** - * Initialize cache's entry for a node. - * - * @param node for which node we need to initialize the cache. - */ - public abstract void initData(DiscoveryNode node); - - /** - * Store the response in the cache from node. - * - * @param node node from which we got the response. - * @param response shard metadata coming from node. - */ - public abstract void putData(DiscoveryNode node, K response); - - /** - * Populate the response from cache. - * - * @param node node for which we need the response. - * @return actual response. - */ - public abstract K getData(DiscoveryNode node); - - /** - * Provide the list of shards which got failures, these shards should be retried - * @return list of failed shards - */ - public abstract List getFailedShards(); - - @NonNull - public abstract Map getCache(); - - public abstract void clearShardCache(ShardId shardId); - /** * Returns the number of fetches that are currently ongoing. */ - public int getInflightFetches() { + int getInflightFetches() { int count = 0; for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.isFetching()) { @@ -101,7 +60,7 @@ public int getInflightFetches() { * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from * it nodes that are no longer part of the state. */ - public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { + void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { // verify that all current data nodes are there for (final DiscoveryNode node : nodes.getDataNodes().values()) { if (getCache().containsKey(node.getId()) == false) { @@ -116,7 +75,7 @@ public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { * Finds all the nodes that need to be fetched. Those are nodes that have no * data, and are not in fetch mode. */ - public List findNodesToFetch() { + List findNodesToFetch() { List nodesToFetch = new ArrayList<>(); for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { @@ -129,7 +88,7 @@ public List findNodesToFetch() { /** * Are there any nodes that are fetching data? */ - public boolean hasAnyNodeFetching() { + boolean hasAnyNodeFetching() { for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.isFetching()) { return true; @@ -146,7 +105,7 @@ public boolean hasAnyNodeFetching() { * @param failedNodes return failedNodes with the nodes where fetch has failed. * @return Map of cache data for every DiscoveryNode. */ - public Map populateCache(DiscoveryNodes nodes, Set failedNodes) { + Map populateCache(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); @@ -171,7 +130,7 @@ public Map populateCache(DiscoveryNodes nodes, Set fai return fetchData; } - public void processResponses(List responses, long fetchingRound) { + void processResponses(List responses, long fetchingRound) { for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); if (nodeEntry != null) { @@ -218,9 +177,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail // if the entry is there, for the right fetching round and not marked as failed already, process it Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); // if the request got rejected or timed out, we need to try it again next time... - if (unwrappedCause instanceof OpenSearchRejectedExecutionException - || unwrappedCause instanceof ReceiveTimeoutTransportException - || unwrappedCause instanceof OpenSearchTimeoutException) { + if (retryableException(unwrappedCause)) { nodeEntry.restartFetching(); } else { logger.warn( @@ -232,7 +189,13 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail } } - public void processFailures(List failures, long fetchingRound) { + boolean retryableException(Throwable unwrappedCause) { + return unwrappedCause instanceof OpenSearchRejectedExecutionException + || unwrappedCause instanceof ReceiveTimeoutTransportException + || unwrappedCause instanceof OpenSearchTimeoutException; + } + + void processFailures(List failures, long fetchingRound) { for (FailedNodeException failure : failures) { logger.trace("{} processing failure {} for [{}]", logKey, failure, type); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); @@ -242,11 +205,16 @@ public void processFailures(List failures, long fetchingRou } } - public void remove(String nodeId) { + /** + * Common function for removing whole node entry. + * + * @param nodeId nodeId to be cleaned. + */ + void remove(String nodeId) { this.getCache().remove(nodeId); } - public void markAsFetching(List nodeIds, long fetchingRound) { + void markAsFetching(List nodeIds, long fetchingRound) { for (String nodeId : nodeIds) { getCache().get(nodeId).markAsFetching(fetchingRound); } diff --git a/server/src/main/java/org/opensearch/gateway/NodeCache.java b/server/src/main/java/org/opensearch/gateway/NodeCache.java new file mode 100644 index 0000000000000..25f97e72a5d0c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/NodeCache.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; + +import java.util.Map; + +import reactor.util.annotation.NonNull; + +/** + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or + * {@link TransportNodesListShardStoreMetadata} using the given functionalities. + *

+ * initData : how to initialize an entry of shard cache for a node. + * putData : how to store the response of transport action in the cache. + * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or + * {@link ReplicaShardAllocator} + * + * @param Response type of transport action which has the data to be stored in the cache. + */ +public interface NodeCache { + + /** + * Initialize cache's entry for a node. + * + * @param node for which node we need to initialize the cache. + */ + void initData(DiscoveryNode node); + + /** + * Store the response in the cache from node. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + void putData(DiscoveryNode node, K response); + + /** + * Populate the response from cache. + * + * @param node node for which we need the response. + * @return actual response. + */ + K getData(DiscoveryNode node); + + /** + * Get actual map object of the cache + * + * @return map of nodeId and NodeEntry extending BaseNodeEntry + */ + @NonNull + Map getCache(); + + /** + * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will + * automatically be cleaned up once shards are assigned. + * + * @param shardId for which we need to free up the cached data. + */ + void deleteData(ShardId shardId); +} diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index 9669c0e24e7c9..a494c68074ba9 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -14,11 +14,11 @@ import org.opensearch.common.Nullable; import org.opensearch.core.index.shard.ShardId; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import reactor.util.annotation.NonNull; + /** * Cache implementation of transport actions returning single shard related data in the response. * @@ -48,20 +48,15 @@ public K getData(DiscoveryNode node) { return cache.get(node.getId()).getValue(); } + @NonNull @Override public Map getCache() { return cache; } @Override - public void clearShardCache(ShardId shardId) { - cache.clear(); - } - - @Override - public List getFailedShards() { - // Single shard cache does not need to return that shard itself because handleFailure will take care of retries - return Collections.emptyList(); + public void deleteData(ShardId shardId) { + cache.clear(); // single shard cache can clear the full map } /**