diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 097775b7ab4ac..3b78b87c30204 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -15,18 +15,22 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.search.stats.SearchStats; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -158,4 +162,95 @@ public void testSearchWithWRRShardRouting() throws IOException { } } + public void testFailOpenOnSearch() throws IOException { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + int nodeCountPerAZ = 1; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)) + ); + ensureGreen(); + logger.info("--> creating indices for test"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get(); + } + refresh("test"); + + ClusterState state1 = internalCluster().clusterService().state(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_b.get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + // making search requests + for (int i = 0; i < 50; i++) { + SearchResponse searchResponse = internalCluster().smartClient() + .prepareSearch("test") + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } + + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + List nodeIdsFromZoneWithWeightZero = new ArrayList<>(); + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals("c")) { + nodeIdsFromZoneWithWeightZero.add(node.getId()); + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + Assert.assertTrue(searchStats.getQueryCount() > 0L); + Assert.assertTrue(searchStats.getFetchCount() > 0L); + } + + } + } } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7d9ab4ff93f59..a53a867705d28 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; @@ -261,16 +262,23 @@ private void onFailure(ShardRouting shardRouting, Exception e) { tryNext(e, false); } - private ShardRouting nextRoutingOrNull() { + private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); + + if (next != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(next, clusterService.state()) + && !WeightedRoutingHelper.isInternalFailure(failure)) { + next = shardsIt.get(shardIndex).nextOrNull(); + } + if (next != null) { return next; } moveToNextShard(); - return nextRoutingOrNull(); + return nextRoutingOrNull(failure); } private void moveToNextShard() { @@ -278,7 +286,7 @@ private void moveToNextShard() { } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); + ShardRouting shardRouting = nextRoutingOrNull(lastFailure); if (shardRouting == null) { if (canMatchShard == false) { listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 0876bf93a557b..4a36755210f5e 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -41,21 +41,27 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.NoShardAvailableActionException; import org.opensearch.action.ShardOperationFailedException; +import org.opensearch.action.UnavailableShardsException; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.GroupShardsIterator; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.shard.ShardId; +import org.opensearch.node.NodeClosedException; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.transport.NodeNotConnectedException; import org.opensearch.transport.Transport; import java.util.ArrayDeque; @@ -70,6 +76,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator} @@ -445,11 +452,48 @@ ShardSearchFailure[] buildShardFailures() { return failures; } + private boolean isInternalFailure(Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (e instanceof NoShardAvailableActionException + || e instanceof UnavailableShardsException + || e instanceof NodeNotConnectedException + || e instanceof NodeClosedException) { + return true; + } + return false; + } + + private boolean shardInWeighedAwayAZ(SearchShardTarget nextShard) { + DiscoveryNode targetNode = clusterState.nodes().get(nextShard.getNodeId()); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == 0) + .map(Map.Entry::getKey); + if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) { + return true; + } + } + + } + return false; + + } + private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard onShardFailure(shardIndex, shard, e); - final SearchShardTarget nextShard = shardIt.nextOrNull(); + SearchShardTarget nextShard = shardIt.nextOrNull(); + + if (nextShard != null && shardInWeighedAwayAZ(nextShard) && !isInternalFailure(e)) { + nextShard = shardIt.nextOrNull(); + } final boolean lastShard = nextShard == null; logger.debug( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index a69853dc6a3c0..ccbcdc7db86ba 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; @@ -251,6 +252,12 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int // will work (it will just override it...) setFailure(shardIt, shardIndex, e); ShardRouting nextShard = shardIt.nextOrNull(); + + if (nextShard != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard, clusterState) + && !WeightedRoutingHelper.isInternalFailure(e)) { + nextShard = shardIt.nextOrNull(); + } if (nextShard != null) { if (e != null) { if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df39bd29493dd..d06056bbd5af0 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.routing.WeightedRoutingHelper; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; @@ -244,7 +245,12 @@ private void perform(@Nullable final Exception currentFailure) { lastFailure = currentFailure; this.lastFailure = currentFailure; } - final ShardRouting shardRouting = shardIt.nextOrNull(); + ShardRouting shardRouting = shardIt.nextOrNull(); + if (shardRouting != null + && WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting, clusterService.state()) + && !WeightedRoutingHelper.isInternalFailure(currentFailure)) { + shardRouting = shardIt.nextOrNull(); + } if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { @@ -273,6 +279,7 @@ private void perform(@Nullable final Exception currentFailure) { ); } final Writeable.Reader reader = getResponseReader(); + ShardRouting finalShardRouting = shardRouting; transportService.sendRequest( node, transportShardAction, @@ -296,7 +303,7 @@ public void handleResponse(final Response response) { @Override public void handleException(TransportException exp) { - onFailure(shardRouting, exp); + onFailure(finalShardRouting, exp); } } ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 9026e7068e9fe..cf9e23a64edfe 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -57,6 +57,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -317,6 +318,20 @@ public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weighted List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); ordered.addAll(orderedInitializingShards); } + + // append shards for attribute value with weight zero to the end, so that shard search requests can be tried on + // shard copies in case of request failure from other attribute values. + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == 0) + .map(Map.Entry::getKey); + if (keys != null) { + ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt("zone:" + keys.findFirst().get(), nodes); + while (iterator.remaining() > 0) { + ordered.add(iterator.nextOrNull()); + } + } return new PlainShardIterator(shardId, ordered); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java new file mode 100644 index 0000000000000..e91e130d41afc --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingHelper.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.NoShardAvailableActionException; +import org.opensearch.action.UnavailableShardsException; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.transport.NodeNotConnectedException; + +import java.util.Map; +import java.util.stream.Stream; + +public class WeightedRoutingHelper { + + public static boolean isInternalFailure(Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (e instanceof NoShardAvailableActionException + || e instanceof UnavailableShardsException + || e instanceof NodeNotConnectedException) { + return true; + } + return false; + } + + public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState clusterState) { + DiscoveryNode targetNode = clusterState.nodes().get(nextShard.currentNodeId()); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == 0) + .map(Map.Entry::getKey); + if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) { + return true; + } + } + } + + return false; + } + +}