Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Shard Routing] Fail open requests on search shard failures #5072

Merged
merged 37 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
02f1fa8
Fail open changes
Nov 9, 2022
9e69711
Refactor code
Nov 10, 2022
5ee2b02
Merge branch 'main' into feature/poc-fail-open
Nov 10, 2022
c0e5b67
Fix test
Nov 10, 2022
f3006a5
Add integ test with network disruption and refactor code
Nov 10, 2022
683786d
Merge branch 'main' into feature/poc-fail-open
Dec 6, 2022
2834af8
Add log statement
Dec 6, 2022
4ad2bed
Add integ test for search aggregations and flag open flag
Dec 20, 2022
644fa60
Refactor integ tests
Dec 21, 2022
7e0df4a
Add integ test for multiget with fail open
Dec 21, 2022
a22b341
Add changelog
Dec 21, 2022
614d85e
Refactor code
Dec 27, 2022
a02f332
Make fail open enabled by default
Dec 27, 2022
aac9d4b
Fail open on unassigned shard copies
Dec 28, 2022
836af1c
Add tests
Dec 30, 2022
2dc6699
Fix tests
Dec 30, 2022
16e295e
Merge branch 'main' into feature/poc-fail-open
Dec 30, 2022
605a4dc
Fix precommit build
Jan 2, 2023
4a90174
Fix test
Jan 2, 2023
9ce9be8
Change internal error logic to check for 5xx status
Jan 3, 2023
e76deba
Fix test
Jan 3, 2023
048eea6
Merge branch 'main' into feature/poc-fail-open
Jan 4, 2023
a429c21
Fix integ test failure
Jan 4, 2023
d2cf1a2
Merge branch 'main' into feature/poc-fail-open
Jan 5, 2023
d0a48e3
Address review comments
Jan 6, 2023
fca7ae9
Fix precommit failure
Jan 6, 2023
14499b0
Merge branch 'main' into feature/poc-fail-open
Jan 8, 2023
6f7aafa
Merge branch 'main' into feature/poc-fail-open
Jan 9, 2023
4c8e50c
Fix tests
Jan 9, 2023
df3e416
Modify changelog
Jan 9, 2023
cc6d1e9
Address review comments
Jan 9, 2023
7ca22c5
Remove duplicate shards from routing interator
Jan 9, 2023
f376eab
add test to valiate request state persistence
Jan 9, 2023
33a04b5
fix test comment
Jan 9, 2023
ef961bd
Address review comments
Jan 9, 2023
97ea739
log exception
Jan 9, 2023
7df7358
Address review comments
Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,45 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected int numberOfReplicas() {
return 2;
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

public void testSearchWithWRRShardRouting() throws IOException {
Expand Down Expand Up @@ -101,7 +115,8 @@ public void testSearchWithWRRShardRouting() throws IOException {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}
// search should not go to nodes in zone c
// search should not go to nodes in zone c with weight zero in case
// shard copies are available in other zones
assertThat(hitNodes.size(), lessThanOrEqualTo(4));
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
Expand Down Expand Up @@ -158,4 +173,249 @@ public void testSearchWithWRRShardRouting() throws IOException {
}
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs
* This is tested by setting up a 4 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Data nodes in zone a and b are stopped,
* assertions are put to make sure shard search requests do not fail.
* @throws IOException
*/
public void testFailOpenByStoppingDataNodes() throws IOException {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2))
);
ensureGreen();

logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get();
}
refresh("test");

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_b.get(0)));
ensureStableCluster(2);

Set<String> hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().smartClient()
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.get();
// assert that searches do not fail and are served by data node in zone c
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

ImmutableOpenMap<String, DiscoveryNode> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes();
String dataNodeInZoneCID = null;

for (Iterator<DiscoveryNode> it = dataNodes.valuesIt(); it.hasNext();) {
DiscoveryNode node = it.next();
if (node.getAttributes().get("zone").equals("c")) {
dataNodeInZoneCID = node.getId();
break;
}
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
assertTrue(stat.getNode().getId().equals(dataNodeInZoneCID));
}
}
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs.
* This is tested by setting up a 4 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Indices are created with one replica copy and network disruption is introduced,
* which makes node in zone a unresponsive.
* Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b.
* Assertions are put to make sure such shard search requests are served by data node in zone c.
* @throws IOException
*/
public void testFailOpenWithUnresponsiveNetworkDisruption() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index" + ".number_of_replicas", 1))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 50; i++) {
client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get();
}
refresh("test");

ClusterState state1 = internalCluster().clusterService().state();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

logger.info("--> creating network partition disruption");
final String clusterManagerNode1 = internalCluster().getClusterManagerName();
Set<String> nodesInOneSide = Stream.of(clusterManagerNode1, nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(nodes_in_zone_a.get(0)).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.UNRESPONSIVE
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

Set<String> hitNodes = new HashSet<>();
Future<SearchResponse>[] responses = new Future[50];
logger.info("--> making search requests");
for (int i = 0; i < 50; i++) {
responses[i] = internalCluster().client(nodes_in_zone_b.get(0))
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.execute();
}

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

for (int i = 0; i < 50; i++) {
try {
SearchResponse searchResponse = responses[i].get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
} catch (Exception t) {
fail("search should not fail");
}
}

ImmutableOpenMap<String, DiscoveryNode> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes();
String dataNodeInZoneAId = null;
String dataNodeInZoneBId = null;
String dataNodeInZoneCId = null;
for (Iterator<DiscoveryNode> it = dataNodes.valuesIt(); it.hasNext();) {
DiscoveryNode node = it.next();
switch (node.getAttributes().get("zone")) {
case "a":
dataNodeInZoneAId = node.getId();
break;
case "b":
dataNodeInZoneBId = node.getId();
break;
case "c":
dataNodeInZoneCId = node.getId();
}
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
if (stat.getNode().getId().equals(dataNodeInZoneBId)) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
} else if (stat.getNode().getId().equals(dataNodeInZoneCId)) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
} else {
// search requests do not hit data node in zone a
assertEquals(0, searchStats.getQueryCount());
assertEquals(0, searchStats.getFetchCount());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -261,24 +262,43 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();

// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note : we should test out cross cluster search works with this as well .

while (next != null && WeightedRoutingHelper.shardInWeighedAwayAZ(next.currentNodeId(), clusterService.state())) {
if (WeightedRoutingHelper.isInternalFailure(failure)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly feel we should add allow searches in weighed away nodes, when there are unassigned copies of that shard as well . This would prevent unnecessary 4XX to the user .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense. I can add that check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets avoid redundant handling of logic

ShardRouting shardToFailOpen = next;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.shardId()

)
);
break;
}
next = shardsIt.get(shardIndex).nextOrNull();
}

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -449,7 +450,24 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
SearchShardTarget nextShard = shardIt.nextOrNull();
// This checks if the shard is present in data node with weighted routing weight set to 0,
// In such cases we fail open, if shard search request for the shard from other shard copies fail with non
// retryable exception.
while (nextShard != null && WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.getNodeId(), clusterState)) {
if (WeightedRoutingHelper.isInternalFailure(e)) {
SearchShardTarget shardToFailOpen = nextShard;
logger.info(
() -> new ParameterizedMessage(
"{}: Fail open executed",
shardToFailOpen.getShardId()

)
);
break;
}
nextShard = shardIt.nextOrNull();
}
final boolean lastShard = nextShard == null;
logger.debug(
() -> new ParameterizedMessage(
Expand Down
Loading