From 032bd5ef47e66a65f60f8562ec5ecfcd1895d512 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Thu, 1 Sep 2022 13:30:27 +0530 Subject: [PATCH] Call WRR API's Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- .../decommission/DecommissionController.java | 46 +++-- .../decommission/DecommissionService.java | 6 +- .../metadata/WeightedRoundRobinMetadata.java | 159 ++++++++++++++++++ 3 files changed, 194 insertions(+), 17 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index fee34e3ccd836..6b599aab60ec3 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -25,6 +25,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsAction; import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsRequest; +import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -33,7 +34,6 @@ import org.opensearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.WRRWeights; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -46,6 +46,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -145,35 +146,51 @@ public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOExcepti public void handleNodesDecommissionRequest( Set nodesToBeDecommissioned, + List zones, String reason, TimeValue timeout, ActionListener nodesRemovedListener ) { - setWeightForDecommissionedZone(); + setWeightForDecommissionedZone(zones); checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, nodesRemovedListener); } - private void setWeightForDecommissionedZone() { + private void setWeightForDecommissionedZone(List zones) { + ClusterState clusterState = clusterService.getClusterApplierService().state(); - final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest(nodes); + DecommissionAttributeMetadata decommissionAttributeMetadata = clusterState.metadata().custom(DecommissionAttributeMetadata.TYPE); + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DECOMMISSION_INIT) + : "unexpected status encountered while decommissioning nodes"; + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + + Map weights = new HashMap<>(); + zones.forEach(zone -> { + if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) { + weights.put(zone, "0"); + } else { + weights.put(zone, "1"); + } + }); + + // WRR API will validate invalid weights + final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest(); clusterWeightRequest.attributeName("zone"); - WRRWeights wrrWeights = new WRRWeights("zone", Map.of()) - nodesStatsRequest.clear(); - nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName()); + clusterWeightRequest.setWRRWeight(weights); transportService.sendRequest( transportService.getLocalNode(), ClusterPutWRRWeightsAction.NAME, - nodesStatsRequest, - new TransportResponseHandler() { + clusterWeightRequest, + new TransportResponseHandler() { @Override - public void handleResponse(NodesStatsResponse response) { - listener.onResponse(response); + public void handleResponse(ClusterPutWRRWeightsResponse response) { + logger.info("Weights were set successfully set."); } @Override public void handleException(TransportException exp) { - listener.onFailure(exp); + logger.info("Exception occurred while setting weights.Exception Messages - ", + exp.unwrapCause().getMessage()); } @Override @@ -182,12 +199,11 @@ public String executor() { } @Override - public NodesStatsResponse read(StreamInput in) throws IOException { - return new NodesStatsResponse(in); + public ClusterPutWRRWeightsResponse read(StreamInput in) throws IOException { + return new ClusterPutWRRWeightsResponse(in); } }); } - } void updateClusterStatusForDecommissioning( Set nodesToBeDecommissioned, diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 3903c928363d0..b3b8feeabe836 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -265,8 +265,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS private void initiateGracefulDecommission() { - - decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.DECOMMISSION_IN_PROGRESS, new ActionListener() { @@ -300,9 +298,13 @@ private void failDecommissionedNodes(ClusterState state) { : "unexpected status encountered while decommissioning nodes"; DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + // Awareness values refers to all zones in the cluster + List awarenessValues = forcedAwarenessAttributes.get(decommissionAttribute.attributeName()); + // execute nodes decommissioning decommissionController.handleNodesDecommissionRequest( nodesWithDecommissionAttribute(state, decommissionAttribute, false), + awarenessValues, "nodes-decommissioned", TimeValue.timeValueSeconds(30L), // TODO - read timeout from request while integrating with API new ActionListener() { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java new file mode 100644 index 0000000000000..e092054d50ffb --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java @@ -0,0 +1,159 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchParseException; +import org.opensearch.Version; +import org.opensearch.cluster.AbstractNamedDiffable; +import org.opensearch.cluster.NamedDiff; +import org.opensearch.cluster.routing.WRRWeights; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * Contains metadata for weighted round-robin shard routing weights + * + * @opensearch.internal + */ +public class WeightedRoundRobinMetadata extends AbstractNamedDiffable implements Metadata.Custom { + private static final Logger logger = LogManager.getLogger(WeightedRoundRobinMetadata.class); + public static final String TYPE = "wrr_shard_routing"; + private WRRWeights wrrWeight; + + public WRRWeights getWrrWeight() { + return wrrWeight; + } + + public void setWrrWeight(WRRWeights wrrWeight) { + this.wrrWeight = wrrWeight; + } + + public WeightedRoundRobinMetadata(StreamInput in) throws IOException { + this.wrrWeight = new WRRWeights(in); + } + + public WeightedRoundRobinMetadata(WRRWeights wrrWeight) { + this.wrrWeight = wrrWeight; + } + + @Override + public EnumSet context() { + return Metadata.API_AND_GATEWAY; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_2_3_0; + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + wrrWeight.writeTo(out); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.Custom.class, TYPE, in); + } + + public static WeightedRoundRobinMetadata fromXContent(XContentParser parser) throws IOException { + String attrKey = null; + Object attrValue; + String attributeName = null; + Map weights = new HashMap<>(); + WRRWeights wrrWeight = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + String awarenessKeyword = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + // parses awareness object + awarenessKeyword = parser.currentName(); + // awareness object contains object with awareness attribute name and its details + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", awarenessKeyword); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + attributeName = parser.currentName(); + // awareness attribute object contain wrr weight details + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", attributeName); + } + // parse weights, corresponding attribute key and puts it in a map + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + weights.put(attrKey, attrValue); + } else { + throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}], unknown type", attributeName); + } + } + } + } else { + throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}]", attributeName); + } + } + wrrWeight = new WRRWeights(attributeName, weights); + return new WeightedRoundRobinMetadata(wrrWeight); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WeightedRoundRobinMetadata that = (WeightedRoundRobinMetadata) o; + return wrrWeight.equals(that.wrrWeight); + } + + @Override + public int hashCode() { + return wrrWeight.hashCode(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + toXContent(wrrWeight, builder); + return builder; + } + + public static void toXContent(WRRWeights wrrWeight, XContentBuilder builder) throws IOException { + builder.startObject("awareness"); + builder.startObject(wrrWeight.attributeName()); + for (Map.Entry entry : wrrWeight.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} \ No newline at end of file