Skip to content

Commit

Permalink
Add NodeCache interface to make code more readable and remove failed …
Browse files Browse the repository at this point in the history
…shard handling from single shard cache

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 5, 2024
1 parent 11e145d commit dca5b7c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 65 deletions.
78 changes: 23 additions & 55 deletions server/src/main/java/org/opensearch/gateway/BaseShardCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
Expand All @@ -27,19 +26,13 @@
import java.util.Map;
import java.util.Set;

import reactor.util.annotation.NonNull;

/**
* Common functionalities of a cache for storing shard metadata. Cache maintains node level responses.
* Setting up the cache is required from implementation class. While set up, we need 3 functionalities from the user.
* initData : how to initialize an entry of shard cache for a node.
* putData : how to store the response of transport action in the cache.
* getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or
* {@link ReplicaShardAllocator}
* Setting up the cache is required from implementation class.
*
* @param <K> Response type of transport action which has the data to be stored in the cache.
*/
public abstract class BaseShardCache<K extends BaseNodeResponse> {
public abstract class BaseShardCache<K extends BaseNodeResponse> implements NodeCache<K> {
private final Logger logger;
private final String logKey;
private final String type;
Expand All @@ -50,44 +43,10 @@ protected BaseShardCache(Logger logger, String logKey, String type) {
this.type = type;
}

/**
* Initialize cache's entry for a node.
*
* @param node for which node we need to initialize the cache.
*/
public abstract void initData(DiscoveryNode node);

/**
* Store the response in the cache from node.
*
* @param node node from which we got the response.
* @param response shard metadata coming from node.
*/
public abstract void putData(DiscoveryNode node, K response);

/**
* Populate the response from cache.
*
* @param node node for which we need the response.
* @return actual response.
*/
public abstract K getData(DiscoveryNode node);

/**
* Provide the list of shards which got failures, these shards should be retried
* @return list of failed shards
*/
public abstract List<ShardId> getFailedShards();

@NonNull
public abstract Map<String, ? extends BaseNodeEntry> getCache();

public abstract void clearShardCache(ShardId shardId);

/**
* Returns the number of fetches that are currently ongoing.
*/
public int getInflightFetches() {
int getInflightFetches() {
int count = 0;
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.isFetching()) {
Expand All @@ -101,7 +60,7 @@ public int getInflightFetches() {
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
*/
public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
// verify that all current data nodes are there
for (final DiscoveryNode node : nodes.getDataNodes().values()) {
if (getCache().containsKey(node.getId()) == false) {
Expand All @@ -116,7 +75,7 @@ public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
* Finds all the nodes that need to be fetched. Those are nodes that have no
* data, and are not in fetch mode.
*/
public List<String> findNodesToFetch() {
List<String> findNodesToFetch() {
List<String> nodesToFetch = new ArrayList<>();
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
Expand All @@ -129,7 +88,7 @@ public List<String> findNodesToFetch() {
/**
* Are there any nodes that are fetching data?
*/
public boolean hasAnyNodeFetching() {
boolean hasAnyNodeFetching() {
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.isFetching()) {
return true;
Expand All @@ -146,7 +105,7 @@ public boolean hasAnyNodeFetching() {
* @param failedNodes return failedNodes with the nodes where fetch has failed.
* @return Map of cache data for every DiscoveryNode.
*/
public Map<DiscoveryNode, K> populateCache(DiscoveryNodes nodes, Set<String> failedNodes) {
Map<DiscoveryNode, K> populateCache(DiscoveryNodes nodes, Set<String> failedNodes) {
Map<DiscoveryNode, K> fetchData = new HashMap<>();
for (Iterator<? extends Map.Entry<String, ? extends BaseNodeEntry>> it = getCache().entrySet().iterator(); it.hasNext();) {
Map.Entry<String, BaseNodeEntry> entry = (Map.Entry<String, BaseNodeEntry>) it.next();
Expand All @@ -171,7 +130,7 @@ public Map<DiscoveryNode, K> populateCache(DiscoveryNodes nodes, Set<String> fai
return fetchData;
}

public void processResponses(List<K> responses, long fetchingRound) {
void processResponses(List<K> responses, long fetchingRound) {
for (K response : responses) {
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
if (nodeEntry != null) {
Expand Down Expand Up @@ -218,9 +177,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail
// if the entry is there, for the right fetching round and not marked as failed already, process it
Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
// if the request got rejected or timed out, we need to try it again next time...
if (unwrappedCause instanceof OpenSearchRejectedExecutionException
|| unwrappedCause instanceof ReceiveTimeoutTransportException
|| unwrappedCause instanceof OpenSearchTimeoutException) {
if (retryableException(unwrappedCause)) {
nodeEntry.restartFetching();
} else {
logger.warn(
Expand All @@ -232,7 +189,13 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail
}
}

public void processFailures(List<FailedNodeException> failures, long fetchingRound) {
boolean retryableException(Throwable unwrappedCause) {
return unwrappedCause instanceof OpenSearchRejectedExecutionException
|| unwrappedCause instanceof ReceiveTimeoutTransportException
|| unwrappedCause instanceof OpenSearchTimeoutException;
}

void processFailures(List<FailedNodeException> failures, long fetchingRound) {
for (FailedNodeException failure : failures) {
logger.trace("{} processing failure {} for [{}]", logKey, failure, type);
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
Expand All @@ -242,11 +205,16 @@ public void processFailures(List<FailedNodeException> failures, long fetchingRou
}
}

public void remove(String nodeId) {
/**
* Common function for removing whole node entry.
*
* @param nodeId nodeId to be cleaned.
*/
void remove(String nodeId) {
this.getCache().remove(nodeId);
}

public void markAsFetching(List<String> nodeIds, long fetchingRound) {
void markAsFetching(List<String> nodeIds, long fetchingRound) {
for (String nodeId : nodeIds) {
getCache().get(nodeId).markAsFetching(fetchingRound);
}
Expand Down
71 changes: 71 additions & 0 deletions server/src/main/java/org/opensearch/gateway/NodeCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;

import java.util.Map;

import reactor.util.annotation.NonNull;

/**
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or
* {@link TransportNodesListShardStoreMetadata} using the given functionalities.
* <p>
* initData : how to initialize an entry of shard cache for a node.
* putData : how to store the response of transport action in the cache.
* getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or
* {@link ReplicaShardAllocator}
*
* @param <K> Response type of transport action which has the data to be stored in the cache.
*/
public interface NodeCache<K extends BaseNodeResponse> {

/**
* Initialize cache's entry for a node.
*
* @param node for which node we need to initialize the cache.
*/
void initData(DiscoveryNode node);

/**
* Store the response in the cache from node.
*
* @param node node from which we got the response.
* @param response shard metadata coming from node.
*/
void putData(DiscoveryNode node, K response);

/**
* Populate the response from cache.
*
* @param node node for which we need the response.
* @return actual response.
*/
K getData(DiscoveryNode node);

/**
* Get actual map object of the cache
*
* @return map of nodeId and NodeEntry extending BaseNodeEntry
*/
@NonNull
Map<String, ? extends BaseShardCache.BaseNodeEntry> getCache();

/**
* Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will
* automatically be cleaned up once shards are assigned.
*
* @param shardId for which we need to free up the cached data.
*/
void deleteData(ShardId shardId);
}
15 changes: 5 additions & 10 deletions server/src/main/java/org/opensearch/gateway/ShardCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.opensearch.common.Nullable;
import org.opensearch.core.index.shard.ShardId;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import reactor.util.annotation.NonNull;

/**
* Cache implementation of transport actions returning single shard related data in the response.
*
Expand Down Expand Up @@ -48,20 +48,15 @@ public K getData(DiscoveryNode node) {
return cache.get(node.getId()).getValue();
}

@NonNull
@Override
public Map<String, ? extends BaseNodeEntry> getCache() {
return cache;
}

@Override
public void clearShardCache(ShardId shardId) {
cache.clear();
}

@Override
public List<ShardId> getFailedShards() {
// Single shard cache does not need to return that shard itself because handleFailure will take care of retries
return Collections.emptyList();
public void deleteData(ShardId shardId) {
cache.clear(); // single shard cache can clear the full map
}

/**
Expand Down

0 comments on commit dca5b7c

Please sign in to comment.