From d0015fb0b7318e81c3f28c352ea9c39068360bca Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Wed, 9 Nov 2022 19:18:59 +0530 Subject: [PATCH] Add fail open login to all write paths with search retries --- .../search/SearchWeightedRoutingIT.java | 3 +- ...TransportFieldCapabilitiesIndexAction.java | 14 ++++- .../search/AbstractSearchAsyncAction.java | 29 ++++++---- .../broadcast/TransportBroadcastAction.java | 7 +++ .../shard/TransportSingleShardAction.java | 11 +++- .../routing/WeightedRoutingHelper.java | 56 +++++++++++++++++++ 6 files changed, 101 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 1da789965ea2d..3b78b87c30204 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -246,8 +246,7 @@ public void testFailOpenOnSearch() throws IOException { NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); - if (stat.getNode().isDataNode()) - { + if (stat.getNode().isDataNode()) { Assert.assertTrue(searchStats.getQueryCount() > 0L); Assert.assertTrue(searchStats.getFetchCount() > 0L); } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7d9ab4ff93f59..a53a867705d28 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; @@ -261,16 +262,23 @@ private void onFailure(ShardRouting shardRouting, Exception e) { tryNext(e, false); } - private ShardRouting nextRoutingOrNull() { + private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); + + if (next != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(next, clusterService.state()) + && !WeightedRoutingHelper.isInternalFailure(failure)) { + next = shardsIt.get(shardIndex).nextOrNull(); + } + if (next != null) { return next; } moveToNextShard(); - return nextRoutingOrNull(); + return nextRoutingOrNull(failure); } private void moveToNextShard() { @@ -278,7 +286,7 @@ private void moveToNextShard() { } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); + ShardRouting shardRouting = nextRoutingOrNull(lastFailure); if (shardRouting == null) { if (canMatchShard == false) { listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index c2dc822c237cd..4a36755210f5e 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -54,6 +54,7 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.shard.ShardId; +import org.opensearch.node.NodeClosedException; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.AliasFilter; @@ -455,7 +456,8 @@ private boolean isInternalFailure(Exception e) { final Throwable cause = ExceptionsHelper.unwrapCause(e); if (e instanceof NoShardAvailableActionException || e instanceof UnavailableShardsException - || e instanceof NodeNotConnectedException) { + || e instanceof NodeNotConnectedException + || e instanceof NodeClosedException) { return true; } return false; @@ -464,20 +466,23 @@ private boolean isInternalFailure(Exception e) { private boolean shardInWeighedAwayAZ(SearchShardTarget nextShard) { DiscoveryNode targetNode = clusterState.nodes().get(nextShard.getNodeId()); WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); - WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); - if (weightedRouting != null) - { - // Fetch weighted routing attributes with weight set as zero - Stream keys = weightedRouting.weights() - .entrySet() - .stream() - .filter(entry -> entry.getValue().intValue() == 0) - .map(Map.Entry::getKey); - if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) { - return true; + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == 0) + .map(Map.Entry::getKey); + if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) { + return true; + } } + } return false; + } private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index a69853dc6a3c0..ccbcdc7db86ba 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; @@ -251,6 +252,12 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int // will work (it will just override it...) setFailure(shardIt, shardIndex, e); ShardRouting nextShard = shardIt.nextOrNull(); + + if (nextShard != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard, clusterState) + && !WeightedRoutingHelper.isInternalFailure(e)) { + nextShard = shardIt.nextOrNull(); + } if (nextShard != null) { if (e != null) { if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df39bd29493dd..d06056bbd5af0 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; @@ -244,7 +245,12 @@ private void perform(@Nullable final Exception currentFailure) { lastFailure = currentFailure; this.lastFailure = currentFailure; } - final ShardRouting shardRouting = shardIt.nextOrNull(); + ShardRouting shardRouting = shardIt.nextOrNull(); + if (shardRouting != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting, clusterService.state()) + && !WeightedRoutingHelper.isInternalFailure(currentFailure)) { + shardRouting = shardIt.nextOrNull(); + } if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { @@ -273,6 +279,7 @@ private void perform(@Nullable final Exception currentFailure) { ); } final Writeable.Reader reader = getResponseReader(); + ShardRouting finalShardRouting = shardRouting; transportService.sendRequest( node, transportShardAction, @@ -296,7 +303,7 @@ public void handleResponse(final Response response) { @Override public void handleException(TransportException exp) { - onFailure(shardRouting, exp); + onFailure(finalShardRouting, exp); } } ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java new file mode 100644 index 0000000000000..e91e130d41afc --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.NoShardAvailableActionException; +import org.opensearch.action.UnavailableShardsException; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.transport.NodeNotConnectedException; + +import java.util.Map; +import java.util.stream.Stream; + +public class WeightedRoutingHelper { + + public static boolean isInternalFailure(Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (e instanceof NoShardAvailableActionException + || e instanceof UnavailableShardsException + || e instanceof NodeNotConnectedException) { + return true; + } + return false; + } + + public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState clusterState) { + DiscoveryNode targetNode = clusterState.nodes().get(nextShard.currentNodeId()); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == 0) + .map(Map.Entry::getKey); + if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) { + return true; + } + } + } + + return false; + } + +}