Skip to content

Commit

Permalink
Add support for handling failed shards in the cache
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Feb 28, 2024
1 parent a6921f0 commit 0daa63e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
}

cache.fillShardCacheWithDataNodes(nodes);
List<ShardId> failedShards = clearFailedShards();
List<String> nodeIds = cache.findNodesToFetch();
if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
Expand Down Expand Up @@ -205,12 +206,25 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
+ allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum()
+ "]"
);
} else if (failedShards.isEmpty() == false) {
// trigger a reroute if there are any shards failed, to make sure they're picked up in next run
logger.trace("triggering another reroute for failed shards in {}", reroutingKey);
reroute("shards-failed", "shards failed in "+ reroutingKey);
}

return new FetchResult<>(fetchData, allIgnoreNodesMap);
}
}

private List<ShardId> clearFailedShards() {
// get failed shards from previous fetch and remove them
List<ShardId> failedShards = cache.getFailedShards();
if (failedShards !=null && failedShards.isEmpty() == false ) {
shardAttributesMap.keySet().removeIf(failedShards::contains);
}
return failedShards;
}

/**
* Called by the response handler of the async action to fetch data. Verifies that its still working
* on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ protected BaseShardCache(Logger logger, String logKey, String type) {
*/
public abstract K getData(DiscoveryNode node);

/**
* Provide the list of shards which got failures, these shards should be removed
* @return list of failed shards
*/
public abstract List<ShardId> getFailedShards();


@NonNull
public abstract Map<String, ? extends BaseNodeEntry> getCache();

Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/gateway/ShardCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.opensearch.common.Nullable;
import org.opensearch.core.index.shard.ShardId;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -55,6 +57,12 @@ public void clearShardCache(ShardId shardId) {
cache.clear();
}

@Override
public List<ShardId> getFailedShards() {
// Single shard cache does not need to return that shard itself because handleFailure will take care of retries
return Collections.emptyList();
}

/**
* A node entry, holding the state of the fetched data for a specific shard
* for a giving node.
Expand Down

0 comments on commit 0daa63e

Please sign in to comment.