Skip to content

Commit

Permalink
Cluster local health call to throw exception if node is decommissione…
Browse files Browse the repository at this point in the history
…d or weighed away (opensearch-project#6232)

[Backport 2.x] Cluster local health call to throw exception if node is decommissioned or weighed away (opensearch-project#6232)
Signed-off-by: Anshu Agarwal <[email protected]>
Co-authored-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored Feb 8, 2023
1 parent 14ed58e commit 6d492d1
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Refactor] Use local opensearch.common.SetOnce instead of lucene's utility class ([#5947](https://github.com/opensearch-project/OpenSearch/pull/5947))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https://github.com/opensearch-project/OpenSearch/pull/6008))
- [Refactor] core.common to new opensearch-common library ([#5976](https://github.com/opensearch-project/OpenSearch/pull/5976))
- Cluster local health call to throw exception if node is decommissioned or weighed away ([#6198](https://github.com/opensearch-project/OpenSearch/pull/6198))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public boolean innerMatch(LogEvent event) {
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.setEnsureNodeWeighedIn(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());
Expand All @@ -329,7 +329,7 @@ public boolean innerMatch(LogEvent event) {
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.setEnsureNodeWeighedIn(true)
.execute()
.actionGet()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -546,4 +547,135 @@ public void testPutAndDeleteWithVersioning() throws Exception {
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}

public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 3 nodes on different zones");
int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// Check cluster health for weighed in node, health check should return a response with 200 status code
ClusterHealthResponse nodeLocalHealth = client(nodes_in_zone_a.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());

// Check cluster health for weighed away node, health check should respond with an exception
NodeWeighedAwayException ex = expectThrows(
NodeWeighedAwayException.class,
() -> client(nodes_in_zone_c.get(0)).admin().cluster().prepareHealth().setLocal(true).setEnsureNodeWeighedIn(true).get()
);
assertTrue(ex.getMessage().contains("local node is weighed away"));

logger.info("--> running cluster health on an index that does not exists");
ClusterHealthResponse healthResponse = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth("test1")
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.setTimeout("1s")
.execute()
.actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(true));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(healthResponse.getIndices().isEmpty(), equalTo(true));

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// wait for leader checker to fail
Thread.sleep(13000);

// Check cluster health for weighed in node when cluster manager is not discovered, health check should
// return a response with 200 status code
nodeLocalHealth = client(nodes_in_zone_a.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertFalse(nodeLocalHealth.hasDiscoveredClusterManager());

// Check cluster health for weighed away node when cluster manager is not discovered, health check should
// return a response with 200 status code with cluster manager discovered as false
// ensure_node_weighed_in is not executed if cluster manager is not discovered
nodeLocalHealth = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertFalse(nodeLocalHealth.hasDiscoveredClusterManager());

networkDisruption.stopDisrupting();
Thread.sleep(1000);

// delete weights
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// Check local cluster health
nodeLocalHealth = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
Expand Down Expand Up @@ -1655,6 +1656,7 @@ private enum OpenSearchExceptionHandle {
168,
V_2_6_0
),
NODE_WEIGHED_AWAY_EXCEPTION(NodeWeighedAwayException.class, NodeWeighedAwayException::new, 169, V_2_6_0),
INDEX_CREATE_BLOCK_EXCEPTION(
org.opensearch.cluster.block.IndexCreateBlockException.class,
org.opensearch.cluster.block.IndexCreateBlockException::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
private boolean ensureNodeCommissioned = false;
private boolean ensureNodeWeighedIn = false;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'cluster'.
Expand Down Expand Up @@ -105,7 +105,7 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
level = in.readEnum(Level.class);
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeCommissioned = in.readBoolean();
ensureNodeWeighedIn = in.readBoolean();
}
}

Expand Down Expand Up @@ -142,7 +142,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(level);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeCommissioned);
out.writeBoolean(ensureNodeWeighedIn);
}
}

Expand Down Expand Up @@ -328,8 +328,8 @@ public String getAwarenessAttribute() {
return awarenessAttribute;
}

public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommissioned) {
this.ensureNodeCommissioned = ensureNodeCommissioned;
public final ClusterHealthRequest ensureNodeWeighedIn(boolean ensureNodeWeighedIn) {
this.ensureNodeWeighedIn = ensureNodeWeighedIn;
return this;
}

Expand All @@ -338,8 +338,8 @@ public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommi
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureNodeCommissioned() {
return ensureNodeCommissioned;
public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

@Override
Expand All @@ -349,8 +349,8 @@ public ActionRequestValidationException validate() {
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
if (ensureNodeWeighedIn && local == false) {
return addValidationError("not a local request to ensure local node commissioned or weighed in", null);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public ClusterHealthRequestBuilder setLevel(String level) {
/**
* Specifies if the local request should ensure that the local node is commissioned
*/
public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) {
request.ensureNodeCommissioned(ensureNodeCommissioned);
public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNodeCommissioned) {
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -140,12 +143,13 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {
if (request.ensureNodeCommissioned()
if (request.ensureNodeWeighedIn()
&& discovery instanceof Coordinator
&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
return;
}

final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down Expand Up @@ -274,7 +278,18 @@ private void executeHealth(

final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
if (validationPredicate.test(currentState)) {
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
ClusterHealthResponse clusterHealthResponse = getResponse(request, currentState, waitCount, TimeoutState.OK);
if (request.ensureNodeWeighedIn() && clusterHealthResponse.hasDiscoveredClusterManager()) {
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
assert request.local() == true : "local node request false for request for local node weighed in";
boolean weighedAway = WeightedRoutingUtils.isWeighedAway(localNode.getId(), clusterService.state());
if (weighedAway) {
listener.onFailure(new NodeWeighedAwayException("local node is weighed away"));
return;
}
}

listener.onResponse(clusterHealthResponse);
} else {
final ClusterStateObserver observer = new ClusterStateObserver(
currentState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardIterator;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.index.shard.ShardId;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchShardTarget;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
Expand Down Expand Up @@ -58,37 +54,6 @@ private boolean isInternalFailure(Exception exception) {
return false;
}

/**
* This function checks if the shard is present in data node with weighted routing weight set to 0,
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non
* retryable exception.
*
* @param nodeId the node with the shard copy
* @return true if the node has attribute value with shard routing weight set to zero, else false
*/
private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null && weightedRouting.isSet()) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);

for (Object key : keys.toArray()) {
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) {
return true;
}
}
}
}
return false;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
Expand All @@ -99,7 +64,7 @@ private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
*/
public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) {
SearchShardTarget next = shardIt.nextOrNull();
while (next != null && isWeighedAway(next.getNodeId(), clusterState)) {
while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
Expand All @@ -122,7 +87,7 @@ public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterStat
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) {
ShardRouting next = shardsIt.nextOrNull();

while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) {
while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
Expand Down
Loading

0 comments on commit 6d492d1

Please sign in to comment.