-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Weighted Shard Routing] Fail open requests on search shard failures #5072
Changes from 23 commits
02f1fa8
9e69711
5ee2b02
c0e5b67
f3006a5
683786d
2834af8
4ad2bed
644fa60
7e0df4a
a22b341
614d85e
a02f332
aac9d4b
836af1c
2dc6699
16e295e
605a4dc
4a90174
9ce9be8
e76deba
048eea6
a429c21
d2cf1a2
d0a48e3
fca7ae9
14499b0
6f7aafa
4c8e50c
df3e416
cc6d1e9
7ca22c5
f376eab
33a04b5
ef961bd
97ea739
7df7358
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ | |
import org.opensearch.action.ShardOperationFailedException; | ||
import org.opensearch.action.support.TransportActions; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.routing.FailOpenRouting; | ||
import org.opensearch.cluster.routing.GroupShardsIterator; | ||
import org.opensearch.common.Nullable; | ||
import org.opensearch.common.lease.Releasable; | ||
|
@@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh | |
// we always add the shard failure for a specific shard instance | ||
// we do make sure to clean it on a successful response from a shard | ||
onShardFailure(shardIndex, shard, e); | ||
final SearchShardTarget nextShard = shardIt.nextOrNull(); | ||
SearchShardTarget nextShard = new FailOpenRouting(e, clusterState).findNext(shardIt); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we verify that if weights are set only then branch off to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, that is not the case. In all cases, the code flow moves to |
||
|
||
final boolean lastShard = nextShard == null; | ||
logger.debug( | ||
() -> new ParameterizedMessage( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cluster.routing; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.action.search.SearchShardIterator; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.metadata.WeightedRoutingMetadata; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.index.shard.ShardId; | ||
import org.opensearch.search.SearchShardTarget; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* This class contains logic to find next shard to retry search request in case of failure from other shard copy. | ||
* This decides if retryable shard search requests can be tried on shard copies present in data | ||
* nodes whose attribute value weight for weighted shard routing is set to zero. | ||
*/ | ||
|
||
public class FailOpenRouting { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets write UTs for this class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rename to |
||
|
||
private static final Logger logger = LogManager.getLogger(FailOpenRouting.class); | ||
|
||
/* exception causing failure for previous shard copy */ | ||
private Exception exception; | ||
|
||
private ClusterState clusterState; | ||
|
||
public FailOpenRouting(Exception e, ClusterState clusterState) { | ||
this.exception = e; | ||
this.clusterState = clusterState; | ||
} | ||
|
||
/** | ||
* * | ||
* @return true if exception is due to cluster availability issues | ||
*/ | ||
private boolean isInternalFailure() { | ||
if (exception instanceof OpenSearchException) { | ||
return ((OpenSearchException) exception).status().getStatus() / 100 == 5; | ||
} | ||
return false; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please lets be more specific here, prefer using something like RestStatus#INTERNAL_SERVER_ERROR |
||
|
||
/** | ||
* This function checks if the shard is present in data node with weighted routing weight set to 0, | ||
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non | ||
* retryable exception. | ||
* | ||
* @param nodeId the node with the shard copy | ||
* @return true if the node has attribute value with shard routing weight set to zero, else false | ||
*/ | ||
private boolean shardInNodeWithZeroWeightedRouting(String nodeId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Rename to |
||
DiscoveryNode node = clusterState.nodes().get(nodeId); | ||
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); | ||
if (weightedRoutingMetadata != null) { | ||
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); | ||
// TODO: check weighted routing has weights set after merging versioning changes | ||
if (weightedRouting != null) { | ||
// Fetch weighted routing attributes with weight set as zero | ||
Stream<String> keys = weightedRouting.weights() | ||
.entrySet() | ||
.stream() | ||
.filter(entry -> entry.getValue().intValue() == 0) | ||
.map(Map.Entry::getKey); | ||
|
||
for (Object key : keys.toArray()) { | ||
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) { | ||
return true; | ||
} | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* This function returns next shard copy to retry search request in case of failure from previous copy returned | ||
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard | ||
* routing weight set to zero | ||
* | ||
* @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested | ||
* @return the next shard copy | ||
*/ | ||
public SearchShardTarget findNext(final SearchShardIterator shardIt) { | ||
SearchShardTarget next = shardIt.nextOrNull(); | ||
while (next != null && shardInNodeWithZeroWeightedRouting(next.getNodeId())) { | ||
|
||
if (canFailOpen(next.getShardId())) { | ||
logFailOpen(next.getShardId()); | ||
break; | ||
} | ||
next = shardIt.nextOrNull(); | ||
} | ||
return next; | ||
} | ||
|
||
/** | ||
* This function returns next shard copy to retry search request in case of failure from previous copy returned | ||
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard | ||
* routing weight set to zero | ||
* | ||
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested | ||
* @return the next shard copy | ||
*/ | ||
public ShardRouting findNext(final ShardsIterator shardsIt) { | ||
ShardRouting next = shardsIt.nextOrNull(); | ||
|
||
while (next != null && shardInNodeWithZeroWeightedRouting(next.currentNodeId())) { | ||
if (canFailOpen(next.shardId())) { | ||
logFailOpen(next.shardId()); | ||
break; | ||
} | ||
next = shardsIt.nextOrNull(); | ||
} | ||
return next; | ||
} | ||
|
||
private void logFailOpen(ShardId shardID) { | ||
logger.info(() -> new ParameterizedMessage("{}: Fail open executed", shardID)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit : do we need a function here or can inline it directly in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this is used at multiple place, I think its better to make a function |
||
|
||
/** | ||
* * | ||
* @return true if can fail open ie request shard copies present in nodes with weighted shard | ||
* routing weight set to zero | ||
*/ | ||
private boolean canFailOpen(ShardId shardId) { | ||
return isInternalFailure() || hasInActiveShardCopies(shardId); | ||
} | ||
|
||
private boolean hasInActiveShardCopies(ShardId shardId) { | ||
List<ShardRouting> shards = clusterState.routingTable().shardRoutingTable(shardId).shards(); | ||
for (ShardRouting shardRouting : shards) { | ||
if (!shardRouting.active()) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,8 @@ | |
|
||
package org.opensearch.cluster.routing; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.node.DiscoveryNodes; | ||
import org.opensearch.common.Nullable; | ||
|
@@ -57,6 +59,7 @@ | |
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
|
||
import static java.util.Collections.emptyMap; | ||
|
||
|
@@ -89,6 +92,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> { | |
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap(); | ||
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap(); | ||
|
||
private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class); | ||
|
||
/** | ||
* The initializing list, including ones that are initializing on a target node because of relocation. | ||
* If we can come up with a better variable name, it would be nice... | ||
|
@@ -305,10 +310,17 @@ public ShardIterator activeInitializingShardsRankedIt( | |
* | ||
* @param weightedRouting entity | ||
* @param nodes discovered nodes in the cluster | ||
* @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present | ||
* in node attribute value with weight zero | ||
* @return an iterator over active and initializing shards, ordered by weighted round-robin | ||
* scheduling policy. Making sure that initializing shards are the last to iterate through. | ||
*/ | ||
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { | ||
public ShardIterator activeInitializingShardsWeightedIt( | ||
WeightedRouting weightedRouting, | ||
DiscoveryNodes nodes, | ||
double defaultWeight, | ||
boolean isFailOpenEnabled | ||
) { | ||
final int seed = shuffler.nextSeed(); | ||
List<ShardRouting> ordered = new ArrayList<>(); | ||
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); | ||
|
@@ -317,6 +329,29 @@ public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weighted | |
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); | ||
ordered.addAll(orderedInitializingShards); | ||
} | ||
|
||
// append shards for attribute value with weight zero, so that shard search requests can be tried on | ||
// shard copies in case of request failure from other attribute values. | ||
if (isFailOpenEnabled) { | ||
try { | ||
Stream<String> keys = weightedRouting.weights() | ||
.entrySet() | ||
.stream() | ||
.filter(entry -> entry.getValue().intValue() == 0) | ||
.map(Map.Entry::getKey); | ||
keys.forEach(key -> { | ||
ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes); | ||
while (iterator.remaining() > 0) { | ||
ordered.add(iterator.nextOrNull()); | ||
} | ||
}); | ||
} catch (IllegalArgumentException e) { | ||
// this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard | ||
// copies found is zero | ||
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realised we aren't maintaining any state across request and hence we might just be implementing an incorrect version of weighted routing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. State is maintained across requests using seed https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java#L312 But since the shuffler sits in the IndexShardRoutingTable class, the state would be lost in case of modifications to Created a follow up task for this #5754 |
||
return new PlainShardIterator(shardId, ordered); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reword this as