Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Shard Routing] Fail open requests on search shard failures #5072

Merged
merged 37 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
02f1fa8
Fail open changes
Nov 9, 2022
9e69711
Refactor code
Nov 10, 2022
5ee2b02
Merge branch 'main' into feature/poc-fail-open
Nov 10, 2022
c0e5b67
Fix test
Nov 10, 2022
f3006a5
Add integ test with network disruption and refactor code
Nov 10, 2022
683786d
Merge branch 'main' into feature/poc-fail-open
Dec 6, 2022
2834af8
Add log statement
Dec 6, 2022
4ad2bed
Add integ test for search aggregations and flag open flag
Dec 20, 2022
644fa60
Refactor integ tests
Dec 21, 2022
7e0df4a
Add integ test for multiget with fail open
Dec 21, 2022
a22b341
Add changelog
Dec 21, 2022
614d85e
Refactor code
Dec 27, 2022
a02f332
Make fail open enabled by default
Dec 27, 2022
aac9d4b
Fail open on unassigned shard copies
Dec 28, 2022
836af1c
Add tests
Dec 30, 2022
2dc6699
Fix tests
Dec 30, 2022
16e295e
Merge branch 'main' into feature/poc-fail-open
Dec 30, 2022
605a4dc
Fix precommit build
Jan 2, 2023
4a90174
Fix test
Jan 2, 2023
9ce9be8
Change internal error logic to check for 5xx status
Jan 3, 2023
e76deba
Fix test
Jan 3, 2023
048eea6
Merge branch 'main' into feature/poc-fail-open
Jan 4, 2023
a429c21
Fix integ test failure
Jan 4, 2023
d2cf1a2
Merge branch 'main' into feature/poc-fail-open
Jan 5, 2023
d0a48e3
Address review comments
Jan 6, 2023
fca7ae9
Fix precommit failure
Jan 6, 2023
14499b0
Merge branch 'main' into feature/poc-fail-open
Jan 8, 2023
6f7aafa
Merge branch 'main' into feature/poc-fail-open
Jan 9, 2023
4c8e50c
Fix tests
Jan 9, 2023
df3e416
Modify changelog
Jan 9, 2023
cc6d1e9
Address review comments
Jan 9, 2023
7ca22c5
Remove duplicate shards from routing interator
Jan 9, 2023
f376eab
add test to valiate request state persistence
Jan 9, 2023
33a04b5
fix test comment
Jan 9, 2023
ef961bd
Address review comments
Jan 9, 2023
97ea739
log exception
Jan 9, 2023
7df7358
Address review comments
Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# CHANGELOG

All notable changes to this project are documented in this file.
Expand All @@ -15,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -261,24 +262,25 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();
ShardRouting next = new FailAwareWeightedRouting(failure, clusterService.state()).findNext(shardsIt.get(shardIndex));

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
SearchShardTarget nextShard = new FailAwareWeightedRouting(e, clusterState).findNext(shardIt);

final boolean lastShard = nextShard == null;
logger.debug(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String getClusterAlias() {
return clusterAlias;
}

SearchShardTarget nextOrNull() {
public SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();
ShardRouting nextShard = new FailAwareWeightedRouting(e, clusterService.state()).findNext(shardIt);

if (nextShard != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
ShardRouting shardRouting = new FailAwareWeightedRouting(currentFailure, clusterService.state()).findNext(shardIt);

if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
Expand Down Expand Up @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) {
);
}
final Writeable.Reader<Response> reader = getResponseReader();
ShardRouting finalShardRouting = shardRouting;
transportService.sendRequest(
node,
transportShardAction,
Expand All @@ -296,7 +299,7 @@ public void handleResponse(final Response response) {

@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
onFailure(finalShardRouting, exp);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Cust
public static final String VERSION = "_version";
public static final long INITIAL_VERSION = -1;
public static final long VERSION_UNSET_VALUE = -2;
public static final int WEIGHED_AWAY_WEIGHT = 0;

public long getVersion() {
return version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardIterator;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.index.shard.ShardId;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchShardTarget;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
* nodes whose attribute value weight for weighted shard routing is set to zero.
*/

public class FailAwareWeightedRouting {

private static final Logger logger = LogManager.getLogger(FailAwareWeightedRouting.class);

/* exception causing failure for previous shard copy */
private Exception exception;

private ClusterState clusterState;

private final static List<RestStatus> internalErrorRestStatusList = List.of(
RestStatus.INTERNAL_SERVER_ERROR,
RestStatus.NOT_IMPLEMENTED,
RestStatus.BAD_GATEWAY,
RestStatus.SERVICE_UNAVAILABLE,
RestStatus.GATEWAY_TIMEOUT,
RestStatus.HTTP_VERSION_NOT_SUPPORTED,
RestStatus.INSUFFICIENT_STORAGE
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove

  1. RestStatus.NOT_IMPLEMENTED
  2. RestStatus.HTTP_VERSION_NOT_SUPPORTED
  3. RestStatus.INSUFFICIENT_STORAGE


public FailAwareWeightedRouting(Exception e, ClusterState clusterState) {
this.exception = e;
this.clusterState = clusterState;
}
Copy link
Collaborator

@Bukhtawar Bukhtawar Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use only cluster state in the constructor so that it could be used as a singleton instance and move exception to the method findNext and log the same exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't make FailAwareWeightedRouting singleton since even cluster state can change between requests

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah bad edit, I initially meant ClusterService for singleton but forgot to remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I have made FailAwareWeightedRouting singleton but we can't pass ClusterService in the constructor since ClusterService instance is not available in AbstractSearchAsyncAction.


/**
* *
* @return true if exception is due to cluster availability issues
*/
private boolean isInternalFailure() {
if (exception instanceof OpenSearchException) {
// checking for 5xx failures
return internalErrorRestStatusList.contains(((OpenSearchException) exception).status());
}
return false;
}

/**
* This function checks if the shard is present in data node with weighted routing weight set to 0,
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non
* retryable exception.
*
* @param nodeId the node with the shard copy
* @return true if the node has attribute value with shard routing weight set to zero, else false
*/
private boolean isWeighedAway(String nodeId) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
// TODO: check weighted routing has weights set after merging versioning changes
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);

for (Object key : keys.toArray()) {
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) {
return true;
}
}
}
}
return false;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public SearchShardTarget findNext(final SearchShardIterator shardIt) {
SearchShardTarget next = shardIt.nextOrNull();
while (next != null && isWeighedAway(next.getNodeId())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply check if there are shard routing that are not a part of the original iterator(without appended shards with weights zero) which can be used to retry/execute requests. The reason is there could be a race between when the request was first executed and when the request got retried(typically during concurrent failover requests)

T0 -> Weights(2:1:0)
T1 -> Requests failed on zone1 and then zone2
T2 -> Weights(0:1:2)
T3 -> Retry on third zone will be computed but this check(isWeighedAway) will fail this and prevent this request from executing successfully

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise that this check is put in place to avoid the shuffler from picking up zones that are weighed away, but can we do something to avoid independent check between when we add it to the iterator to when we actually consume it which is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this check is for weigh away zones, to avoid iterator to pick shards from weigh away az nodes in cases we don't want to fail open.

but can we do something to avoid independent check between when we add it to the iterator to when we actually consume it which is here

I think most of the cases will have only one check but in worst case scenario where there is a need to fail open like cases we may have to iterate through all the shards and do isWeighedAway check. This won't be a very costly operation since size of the list is fixed.
Even if we precompute, weights can change in between which will require one more check.

if (canFailOpen(next.getShardId())) {
logFailOpen(next.getShardId());
break;
}
next = shardIt.nextOrNull();
}
return next;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public ShardRouting findNext(final ShardsIterator shardsIt) {
ShardRouting next = shardsIt.nextOrNull();

while (next != null && isWeighedAway(next.currentNodeId())) {
if (canFailOpen(next.shardId())) {
logFailOpen(next.shardId());
break;
}
next = shardsIt.nextOrNull();
}
return next;
}

private void logFailOpen(ShardId shardID) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed", shardID));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will prefer this inline

}

/**
* *
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId) {
return isInternalFailure() || hasInActiveShardCopies(shardId);
}

private boolean hasInActiveShardCopies(ShardId shardId) {
List<ShardRouting> shards = clusterState.routingTable().shardRoutingTable(shardId).shards();
for (ShardRouting shardRouting : shards) {
if (!shardRouting.active()) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand All @@ -51,12 +54,14 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap();

private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class);

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
* If we can come up with a better variable name, it would be nice...
Expand Down Expand Up @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt(
*
* @param weightedRouting entity
* @param nodes discovered nodes in the cluster
* @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present
* in node attribute value with weight zero
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
) {
final int seed = shuffler.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
if (isFailOpenEnabled) {
try {
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);
keys.forEach(key -> {
ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes);
while (iterator.remaining() > 0) {
ordered.add(iterator.nextOrNull());
}
});
} catch (IllegalArgumentException e) {
// this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard
// copies found is zero
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}
orderedListWithDistinctShards = new ArrayList<>(new LinkedHashSet<>(ordered));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest

ordered.stream().distinct().collect(Collectors.toList())

return new PlainShardIterator(shardId, orderedListWithDistinctShards);
}

/**
Expand Down
Loading