From 0daa63ef9b66b0e9d2823972d3a054b9e3bca19e Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 28 Feb 2024 14:30:00 +0530 Subject: [PATCH] Add support for handling failed shards in the cache Signed-off-by: Aman Khare --- .../org/opensearch/gateway/AsyncShardFetch.java | 14 ++++++++++++++ .../org/opensearch/gateway/BaseShardCache.java | 7 +++++++ .../java/org/opensearch/gateway/ShardCache.java | 8 ++++++++ 3 files changed, 29 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 068b6c620983c..e35c3f9414c97 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -170,6 +170,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map failedShards = clearFailedShards(); List nodeIds = cache.findNodesToFetch(); if (nodeIds.isEmpty() == false) { // mark all node as fetching and go ahead and async fetch them @@ -205,12 +206,25 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map(fetchData, allIgnoreNodesMap); } } + private List clearFailedShards() { + // get failed shards from previous fetch and remove them + List failedShards = cache.getFailedShards(); + if (failedShards !=null && failedShards.isEmpty() == false ) { + shardAttributesMap.keySet().removeIf(failedShards::contains); + } + return failedShards; + } + /** * Called by the response handler of the async action to fetch data. Verifies that its still working * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index f9de73fbaf9ef..c34d00b0877d1 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -73,6 +73,13 @@ protected BaseShardCache(Logger logger, String logKey, String type) { */ public abstract K getData(DiscoveryNode node); + /** + * Provide the list of shards which got failures, these shards should be removed + * @return list of failed shards + */ + public abstract List getFailedShards(); + + @NonNull public abstract Map getCache(); diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index 8526b8cfb84ac..a5b23df3bf3b8 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -14,7 +14,9 @@ import org.opensearch.common.Nullable; import org.opensearch.core.index.shard.ShardId; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -55,6 +57,12 @@ public void clearShardCache(ShardId shardId) { cache.clear(); } + @Override + public List getFailedShards() { + // Single shard cache does not need to return that shard itself because handleFailure will take care of retries + return Collections.emptyList(); + } + /** * A node entry, holding the state of the fetched data for a specific shard * for a giving node.