Skip to content

Commit

Permalink
Cleanup java doc and review comments changes
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Apr 9, 2024
1 parent d61477f commit 65229fc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;

import reactor.util.annotation.NonNull;

/**
* Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch
* part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here.
* Cleanup of failed shards is necessary in a batch and based on that a reroute should be triggered to take care of
* those in the next run. This separation also takes care of the extra generic type V which is only needed for batch
* transport actions like {@link TransportNodesListGatewayStartedShardsBatch}.
* This separation also takes care of the extra generic type V which is only needed for batch
* transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}.
*
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
Expand All @@ -44,8 +46,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
V emptyResponse,
Function<V, Boolean> isEmptyResponse,
V emptyShardResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(
Expand All @@ -60,8 +62,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
emptyResponse,
isEmptyResponse,
emptyShardResponse,
emptyShardResponsePredicate,
responseFactory
)
);
Expand All @@ -82,7 +84,8 @@ public void clearShard(ShardId shardId) {
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching
* approach.
* approach. This cache class is not thread safe, all of its methods are being called from
* {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads.
*
* @param <T> Response type of transport action.
* @param <V> Data type of shard level response.
Expand All @@ -94,7 +97,7 @@ static class ShardBatchCache<T extends BaseNodeResponse, V> extends AsyncShardFe
private final Class<V> shardResponseClass;
private final ShardBatchResponseFactory<T, V> responseFactory;
private final V emptyResponse;
private final Function<V, Boolean> isEmpty;
private final Predicate<V> emptyShardResponsePredicate;
private final Logger logger;

public ShardBatchCache(
Expand All @@ -104,12 +107,12 @@ public ShardBatchCache(
String logKey,
Class<V> clazz,
V emptyResponse,
Function<V, Boolean> isEmptyResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
this.batchSize = shardAttributesMap.size();
this.isEmpty = isEmptyResponse;
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
cache = new HashMap<>();
shardIdToArray = new HashMap<>();
fillShardIdKeys(shardAttributesMap.keySet());
Expand All @@ -120,6 +123,7 @@ public ShardBatchCache(
}

@Override
@NonNull
public Map<String, ? extends BaseNodeEntry> getCache() {
return cache;
}
Expand All @@ -136,7 +140,7 @@ public void deleteShard(ShardId shardId) {

@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, isEmpty));
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate));
}

/**
Expand All @@ -163,9 +167,9 @@ private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
V[] nodeShardEntries = nodeEntry.getData();
boolean[] emptyResponses = nodeEntry.getEmptyShardResponse();
HashMap<ShardId, V> shardData = new HashMap<>();
for (Map.Entry<ShardId, Integer> shardIdIndex : shardIdToArray.entrySet()) {
ShardId shardId = shardIdIndex.getKey();
Integer arrIndex = shardIdIndex.getValue();
for (Map.Entry<ShardId, Integer> shardIdEntry : shardIdToArray.entrySet()) {
ShardId shardId = shardIdEntry.getKey();
Integer arrIndex = shardIdEntry.getValue();
if (emptyResponses[arrIndex]) {
shardData.put(shardId, emptyResponse);
} else if (nodeShardEntries[arrIndex] != null) {
Expand Down Expand Up @@ -194,13 +198,13 @@ static class NodeEntry<V> extends BaseNodeEntry {
// actually needed in allocation/explain API response. So instead of storing full empty response object
// in cache, it's better to just store a boolean and create that object on the fly just before
// decision-making.
private final Function<V, Boolean> isEmpty;
private final Predicate<V> emptyShardResponsePredicate;

NodeEntry(String nodeId, Class<V> clazz, int batchSize, Function<V, Boolean> isEmptyResponse) {
NodeEntry(String nodeId, Class<V> clazz, int batchSize, Predicate<V> emptyShardResponsePredicate) {
super(nodeId);
this.shardData = (V[]) Array.newInstance(clazz, batchSize);
this.emptyShardResponse = new boolean[batchSize];
this.isEmpty = isEmptyResponse;
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
}

void doneFetching(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
Expand All @@ -225,7 +229,7 @@ private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integ
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (isEmpty.apply(shardData.getValue())) {
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public String toString() {
return buf.toString();
}

public static Boolean isEmpty(GatewayStartedShard gatewayStartedShard) {
public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) {
return gatewayStartedShard.allocationId() == null
&& gatewayStartedShard.primary() == false
&& gatewayStartedShard.storeException() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public void testPutData() {
this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.VALID)));
this.shardCache.putData(node2, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.EMPTY)));

// assert that fetching is done as both node's responses are stored in cache
assertFalse(this.shardCache.getCache().get(node1.getId()).isFetching());
assertFalse(this.shardCache.getCache().get(node2.getId()).isFetching());

Map<DiscoveryNode, NodeGatewayStartedShardsBatch> fetchData = shardCache.getCacheData(
DiscoveryNodes.builder().add(node1).add(node2).build(),
null
Expand Down Expand Up @@ -201,10 +205,6 @@ private Map<ShardId, GatewayStartedShard> getFailedPrimaryResponse(List<ShardId>
return shardData;
}

public void removeShard(ShardId shardId) {
// batchInfo.remove(shardId);
}

private void fillShards(Map<ShardId, ShardAttributes> shardAttributesMap, int numberOfShards) {
shardsInBatch = BatchTestUtil.setUpShards(numberOfShards);
for (ShardId shardId : shardsInBatch) {
Expand Down

0 comments on commit 65229fc

Please sign in to comment.