Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Shard Routing] Fail open requests on search shard failures #5072

Merged
merged 37 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
02f1fa8
Fail open changes
Nov 9, 2022
9e69711
Refactor code
Nov 10, 2022
5ee2b02
Merge branch 'main' into feature/poc-fail-open
Nov 10, 2022
c0e5b67
Fix test
Nov 10, 2022
f3006a5
Add integ test with network disruption and refactor code
Nov 10, 2022
683786d
Merge branch 'main' into feature/poc-fail-open
Dec 6, 2022
2834af8
Add log statement
Dec 6, 2022
4ad2bed
Add integ test for search aggregations and flag open flag
Dec 20, 2022
644fa60
Refactor integ tests
Dec 21, 2022
7e0df4a
Add integ test for multiget with fail open
Dec 21, 2022
a22b341
Add changelog
Dec 21, 2022
614d85e
Refactor code
Dec 27, 2022
a02f332
Make fail open enabled by default
Dec 27, 2022
aac9d4b
Fail open on unassigned shard copies
Dec 28, 2022
836af1c
Add tests
Dec 30, 2022
2dc6699
Fix tests
Dec 30, 2022
16e295e
Merge branch 'main' into feature/poc-fail-open
Dec 30, 2022
605a4dc
Fix precommit build
Jan 2, 2023
4a90174
Fix test
Jan 2, 2023
9ce9be8
Change internal error logic to check for 5xx status
Jan 3, 2023
e76deba
Fix test
Jan 3, 2023
048eea6
Merge branch 'main' into feature/poc-fail-open
Jan 4, 2023
a429c21
Fix integ test failure
Jan 4, 2023
d2cf1a2
Merge branch 'main' into feature/poc-fail-open
Jan 5, 2023
d0a48e3
Address review comments
Jan 6, 2023
fca7ae9
Fix precommit failure
Jan 6, 2023
14499b0
Merge branch 'main' into feature/poc-fail-open
Jan 8, 2023
6f7aafa
Merge branch 'main' into feature/poc-fail-open
Jan 9, 2023
4c8e50c
Fix tests
Jan 9, 2023
df3e416
Modify changelog
Jan 9, 2023
cc6d1e9
Address review comments
Jan 9, 2023
7ca22c5
Remove duplicate shards from routing interator
Jan 9, 2023
f376eab
add test to valiate request state persistence
Jan 9, 2023
33a04b5
fix test comment
Jan 9, 2023
ef961bd
Address review comments
Jan 9, 2023
97ea739
log exception
Jan 9, 2023
7df7358
Address review comments
Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# CHANGELOG

All notable changes to this project are documented in this file.
Expand All @@ -15,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -261,24 +262,25 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();
ShardRouting next = FailAwareWeightedRouting.getInstance().findNext(shardsIt.get(shardIndex), clusterService.state(), failure);

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e);

final boolean lastShard = nextShard == null;
logger.debug(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String getClusterAlias() {
return clusterAlias;
}

SearchShardTarget nextOrNull() {
public SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();
ShardRouting nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), e);

if (nextShard != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
ShardRouting shardRouting = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), currentFailure);

if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
Expand Down Expand Up @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) {
);
}
final Writeable.Reader<Response> reader = getResponseReader();
ShardRouting finalShardRouting = shardRouting;
transportService.sendRequest(
node,
transportShardAction,
Expand All @@ -296,7 +299,7 @@ public void handleResponse(final Response response) {

@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
onFailure(finalShardRouting, exp);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Cust
public static final String VERSION = "_version";
public static final long INITIAL_VERSION = -1;
public static final long VERSION_UNSET_VALUE = -2;
public static final int WEIGHED_AWAY_WEIGHT = 0;

public long getVersion() {
return version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardIterator;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.index.shard.ShardId;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchShardTarget;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
* nodes whose attribute value weight for weighted shard routing is set to zero.
*/

public enum FailAwareWeightedRouting {
INSTANCE;

private static final Logger logger = LogManager.getLogger(FailAwareWeightedRouting.class);

private final static List<RestStatus> internalErrorRestStatusList = List.of(
RestStatus.INTERNAL_SERVER_ERROR,
RestStatus.BAD_GATEWAY,
RestStatus.SERVICE_UNAVAILABLE,
RestStatus.GATEWAY_TIMEOUT
);

public static FailAwareWeightedRouting getInstance() {
return INSTANCE;

}
Comment on lines +33 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the most apt way to create singletons, you could achieve it by a static property

    public static final FailAwareWeightedRouting INSTANCE = new FailAwareWeightedRouting();


/**
* *
* @return true if exception is due to cluster availability issues
*/
private boolean isInternalFailure(Exception exception) {
if (exception instanceof OpenSearchException) {
// checking for 5xx failures
return internalErrorRestStatusList.contains(((OpenSearchException) exception).status());
}
return false;
}

/**
* This function checks if the shard is present in data node with weighted routing weight set to 0,
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non
* retryable exception.
*
* @param nodeId the node with the shard copy
* @return true if the node has attribute value with shard routing weight set to zero, else false
*/
private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null && weightedRouting.isSet()) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);

for (Object key : keys.toArray()) {
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) {
return true;
}
}
}
}
return false;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) {
SearchShardTarget next = shardIt.nextOrNull();
while (next != null && isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
logger.info(
() -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.getShardId(), exception)
);
break;
}
next = shardIt.nextOrNull();
}
return next;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) {
ShardRouting next = shardsIt.nextOrNull();

while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.shardId(), exception));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitL incorrect usage of logger with exception

break;
}
next = shardsIt.nextOrNull();
}
return next;
}

/**
* *
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
}

private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
List<ShardRouting> shards = clusterState.routingTable().shardRoutingTable(shardId).shards();
for (ShardRouting shardRouting : shards) {
if (!shardRouting.active()) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand All @@ -57,6 +60,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap();

private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class);

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
* If we can come up with a better variable name, it would be nice...
Expand Down Expand Up @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt(
*
* @param weightedRouting entity
* @param nodes discovered nodes in the cluster
* @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present
* in node attribute value with weight zero
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
) {
final int seed = shuffler.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
if (isFailOpenEnabled) {
try {
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);
keys.forEach(key -> {
ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes);
while (iterator.remaining() > 0) {
ordered.add(iterator.nextOrNull());
}
});
} catch (IllegalArgumentException e) {
// this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard
// copies found is zero
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}
orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList());
return new PlainShardIterator(shardId, orderedListWithDistinctShards);
}

/**
Expand Down
Loading