Skip to content

Commit

Permalink
Add fail open login to all write paths with search retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Anshu Agarwal committed Nov 9, 2022
1 parent e64c59e commit d0015fb
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ public void testFailOpenOnSearch() throws IOException {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode())
{
if (stat.getNode().isDataNode()) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -261,24 +262,31 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();

if (next != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(next, clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(failure)) {
next = shardsIt.get(shardIndex).nextOrNull();
}

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -455,7 +456,8 @@ private boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException) {
|| e instanceof NodeNotConnectedException
|| e instanceof NodeClosedException) {
return true;
}
return false;
Expand All @@ -464,20 +466,23 @@ private boolean isInternalFailure(Exception e) {
private boolean shardInWeighedAwayAZ(SearchShardTarget nextShard) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.getNodeId());
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null)
{
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
}

}
return false;

}

private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -251,6 +252,12 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
// will work (it will just override it...)
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();

if (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard, clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
nextShard = shardIt.nextOrNull();
}
if (nextShard != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -244,7 +245,12 @@ private void perform(@Nullable final Exception currentFailure) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting, clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(currentFailure)) {
shardRouting = shardIt.nextOrNull();
}
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
Expand Down Expand Up @@ -273,6 +279,7 @@ private void perform(@Nullable final Exception currentFailure) {
);
}
final Writeable.Reader<Response> reader = getResponseReader();
ShardRouting finalShardRouting = shardRouting;
transportService.sendRequest(
node,
transportShardAction,
Expand All @@ -296,7 +303,7 @@ public void handleResponse(final Response response) {

@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
onFailure(finalShardRouting, exp);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.transport.NodeNotConnectedException;

import java.util.Map;
import java.util.stream.Stream;

public class WeightedRoutingHelper {

public static boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException) {
return true;
}
return false;
}

public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState clusterState) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.currentNodeId());
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();

if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
}
}

return false;
}

}

0 comments on commit d0015fb

Please sign in to comment.