Skip to content

Commit

Permalink
spotless apply
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Feb 23, 2024
1 parent c26c6c1 commit 608c8ae
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 28 deletions.
11 changes: 1 addition & 10 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,20 @@
package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -183,9 +176,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
cache.markAsFetching(nodeIds, fetchingRound);
DiscoveryNode[] discoNodesToFetch = nodeIds.stream()
.map(nodes::get)
.toArray(DiscoveryNode[]::new);
DiscoveryNode[] discoNodesToFetch = nodeIds.stream().map(nodes::get).toArray(DiscoveryNode[]::new);
asyncFetch(discoNodesToFetch, fetchingRound);
}

Expand Down
24 changes: 6 additions & 18 deletions server/src/main/java/org/opensearch/gateway/BaseShardCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.ReceiveTimeoutTransportException;
import reactor.util.annotation.NonNull;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -28,6 +27,8 @@
import java.util.Map;
import java.util.Set;

import reactor.util.annotation.NonNull;

/**
* Common functionalities of a cache for storing shard metadata. Cache maintains node level responses.
* Setting up the cache is required from implementation class. While set up, we need 3 functionalities from the user.
Expand Down Expand Up @@ -141,7 +142,7 @@ public boolean hasAnyNodeFetching() {
*/
public Map<DiscoveryNode, K> populateCache(DiscoveryNodes nodes, Set<String> failedNodes) {
Map<DiscoveryNode, K> fetchData = new HashMap<>();
for (Iterator<? extends Map.Entry<String, ? extends BaseNodeEntry>> it = getCache().entrySet().iterator(); it.hasNext(); ) {
for (Iterator<? extends Map.Entry<String, ? extends BaseNodeEntry>> it = getCache().entrySet().iterator(); it.hasNext();) {
Map.Entry<String, BaseNodeEntry> entry = (Map.Entry<String, BaseNodeEntry>) it.next();
String nodeId = entry.getKey();
BaseNodeEntry nodeEntry = entry.getValue();
Expand Down Expand Up @@ -170,8 +171,7 @@ public void processResponses(List<K> responses, long fetchingRound) {
if (nodeEntry != null) {
if (validateNodeResponse(nodeEntry, fetchingRound)) {
// if the entry is there, for the right fetching round and not marked as failed already, process it
logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type,
response);
logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, response);
putData(response.getNode(), response);
}
}
Expand All @@ -191,13 +191,7 @@ public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound)
);
return false;
} else if (nodeEntry.isFailed()) {
logger.trace(
"{} node {} has failed for [{}] (failure [{}])",
logKey,
nodeEntry.getNodeId(),
type,
nodeEntry.getFailure()
);
logger.trace("{} node {} has failed for [{}] (failure [{}])", logKey, nodeEntry.getNodeId(), type, nodeEntry.getFailure());
return false;
}
return true;
Expand All @@ -224,12 +218,7 @@ public void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failu
nodeEntry.restartFetching();
} else {
logger.warn(
() -> new ParameterizedMessage(
"{}: failed to list shard for {} on node [{}]",
logKey,
type,
failure.nodeId()
),
() -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", logKey, type, failure.nodeId()),
failure
);
nodeEntry.doneFetching(failure.getCause());
Expand Down Expand Up @@ -257,7 +246,6 @@ public void markAsFetching(List<String> nodeIds, long fetchingRound) {
}
}


/**
* A node entry, holding only node level fetching related information.
* Actual metadata of shard is stored in child classes.
Expand Down

0 comments on commit 608c8ae

Please sign in to comment.