Skip to content

Commit

Permalink
Add ShardBatchCache to support caching for TransportNodesListGatewayS…
Browse files Browse the repository at this point in the history
…tartedShardsBatch (#12504) (#13156)

(cherry picked from commit 7103e56)

Signed-off-by: Aman Khare <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent bcd2d8a commit b20bd49
Show file tree
Hide file tree
Showing 15 changed files with 621 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static Map<ShardId, ShardAttributes> prepareRequestMap(String[] indices,
);
for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
shardIdShardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
}
}
return shardIdShardAttributesMap;
Expand Down
243 changes: 243 additions & 0 deletions server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;

import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

import reactor.util.annotation.NonNull;

/**
* Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch
* part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here.
* This separation also takes care of the extra generic type V which is only needed for batch
* transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}.
*
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
*
* @opensearch.internal
*/
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extends AsyncShardFetch<T> {

@SuppressWarnings("unchecked")
AsyncShardBatchFetch(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
V emptyShardResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(
logger,
type,
shardAttributesMap,
action,
batchId,
new ShardBatchCache<>(
logger,
type,
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
emptyShardResponse,
emptyShardResponsePredicate,
responseFactory
)
);
}

/**
* Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's
* assigned or failed.
*
* @param shardId shardId to be removed from the batch.
*/
public synchronized void clearShard(ShardId shardId) {
this.shardAttributesMap.remove(shardId);
this.cache.deleteShard(shardId);
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching
* approach. This cache class is not thread safe, all of its methods are being called from
* {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads.
*
* @param <T> Response type of transport action.
* @param <V> Data type of shard level response.
*/
static class ShardBatchCache<T extends BaseNodeResponse, V> extends AsyncShardFetchCache<T> {
private final Map<String, NodeEntry<V>> cache;
private final Map<ShardId, Integer> shardIdToArray;
private final int batchSize;
private final Class<V> shardResponseClass;
private final ShardBatchResponseFactory<T, V> responseFactory;
private final V emptyResponse;
private final Predicate<V> emptyShardResponsePredicate;
private final Logger logger;

public ShardBatchCache(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
String logKey,
Class<V> clazz,
V emptyResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
this.batchSize = shardAttributesMap.size();
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
cache = new HashMap<>();
shardIdToArray = new HashMap<>();
fillShardIdKeys(shardAttributesMap.keySet());
this.shardResponseClass = clazz;
this.emptyResponse = emptyResponse;
this.logger = logger;
this.responseFactory = responseFactory;
}

@Override
@NonNull
public Map<String, ? extends BaseNodeEntry> getCache() {
return cache;
}

@Override
public void deleteShard(ShardId shardId) {
if (shardIdToArray.containsKey(shardId)) {
Integer shardIdIndex = shardIdToArray.remove(shardId);
for (String nodeId : cache.keySet()) {
cache.get(nodeId).clearShard(shardIdIndex);
}
}
}

@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate));
}

/**
* Put the response received from data nodes into the cache.
* Get shard level data from batch, then filter out if any shards received failures.
* After that complete storing the data at node level and mark fetching as done.
*
* @param node node from which we got the response.
* @param response shard metadata coming from node.
*/
@Override
public void putData(DiscoveryNode node, T response) {
NodeEntry<V> nodeEntry = cache.get(node.getId());
Map<ShardId, V> batchResponse = responseFactory.getShardBatchData(response);
nodeEntry.doneFetching(batchResponse, shardIdToArray);
}

@Override
public T getData(DiscoveryNode node) {
return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId())));
}

private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
V[] nodeShardEntries = nodeEntry.getData();
boolean[] emptyResponses = nodeEntry.getEmptyShardResponse();
HashMap<ShardId, V> shardData = new HashMap<>();
for (Map.Entry<ShardId, Integer> shardIdEntry : shardIdToArray.entrySet()) {
ShardId shardId = shardIdEntry.getKey();
Integer arrIndex = shardIdEntry.getValue();
if (emptyResponses[arrIndex]) {
shardData.put(shardId, emptyResponse);
} else if (nodeShardEntries[arrIndex] != null) {
// ignore null responses here
shardData.put(shardId, nodeShardEntries[arrIndex]);
}
}
return shardData;
}

private void fillShardIdKeys(Set<ShardId> shardIds) {
int shardIdIndex = 0;
for (ShardId shardId : shardIds) {
this.shardIdToArray.putIfAbsent(shardId, shardIdIndex++);
}
}

/**
* A node entry, holding the state of the fetched data for a specific shard
* for a giving node.
*/
static class NodeEntry<V> extends BaseNodeEntry {
private final V[] shardData;
private final boolean[] emptyShardResponse; // we can not rely on null entries of the shardData array,
// those null entries means that we need to ignore those entries. Empty responses on the other hand are
// actually needed in allocation/explain API response. So instead of storing full empty response object
// in cache, it's better to just store a boolean and create that object on the fly just before
// decision-making.
private final Predicate<V> emptyShardResponsePredicate;

NodeEntry(String nodeId, Class<V> clazz, int batchSize, Predicate<V> emptyShardResponsePredicate) {
super(nodeId);
this.shardData = (V[]) Array.newInstance(clazz, batchSize);
this.emptyShardResponse = new boolean[batchSize];
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
}

void doneFetching(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
fillShardData(shardDataFromNode, shardIdKey);
super.doneFetching();
}

void clearShard(Integer shardIdIndex) {
this.shardData[shardIdIndex] = null;
emptyShardResponse[shardIdIndex] = false;
}

V[] getData() {
return this.shardData;
}

boolean[] getEmptyShardResponse() {
return emptyShardResponse;
}

private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
}
}
}
}
}
}
}
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
protected final String type;
protected final Map<ShardId, ShardAttributes> shardAttributesMap;
private final Lister<BaseNodesResponse<T>, T> action;
private final AsyncShardFetchCache<T> cache;
protected final AsyncShardFetchCache<T> cache;
private final AtomicLong round = new AtomicLong();
private boolean closed;
private final String reroutingKey;
final String reroutingKey;
private final Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

@SuppressWarnings("unchecked")
Expand All @@ -99,7 +99,7 @@ protected AsyncShardFetch(
this.logger = logger;
this.type = type;
shardAttributesMap = new HashMap<>();
shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
shardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
Expand All @@ -120,14 +120,15 @@ protected AsyncShardFetch(
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
Lister<? extends BaseNodesResponse<T>, T> action,
String batchId
String batchId,
AsyncShardFetchCache<T> cache
) {
this.logger = logger;
this.type = type;
this.shardAttributesMap = shardAttributesMap;
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "BatchID=[" + batchId + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
this.cache = cache;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @opensearch.internal
*/
public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {

private final Logger logger;
private final String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

Expand Down Expand Up @@ -132,9 +133,7 @@ private static List<NodeGatewayStartedShard> adaptToNodeShardStates(

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch
.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
GatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId());
nodeShardStates.add(
new NodeGatewayStartedShard(
shardData.allocationId(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;

import java.util.Map;

/**
* A factory class to create new responses of batch transport actions like
* {@link TransportNodesListGatewayStartedShardsBatch} or {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}
*
* @param <T> Node level response returned by batch transport actions.
* @param <V> Shard level metadata returned by batch transport actions.
*/
public class ShardBatchResponseFactory<T extends BaseNodeResponse, V> {
private final boolean primary;

public ShardBatchResponseFactory(boolean primary) {
this.primary = primary;
}

public T getNewResponse(DiscoveryNode node, Map<ShardId, V> shardData) {
if (primary) {
return (T) new NodeGatewayStartedShardsBatch(node, (Map<ShardId, GatewayStartedShard>) shardData);
} else {
return (T) new NodeStoreFilesMetadataBatch(node, (Map<ShardId, NodeStoreFilesMetadata>) shardData);
}
}

public Map<ShardId, V> getShardBatchData(T response) {
if (primary) {
return (Map<ShardId, V>) ((NodeGatewayStartedShardsBatch) response).getNodeGatewayStartedShardsBatch();
} else {
return (Map<ShardId, V>) ((NodeStoreFilesMetadataBatch) response).getNodeStoreFilesMetadataBatch();
}
}

}
Loading

0 comments on commit b20bd49

Please sign in to comment.