Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for deserilization bug in weighted round robin metadata #11679

Merged
merged 8 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))
- Fix parsing of flat object fields with dots in keys ([#11425](https://github.com/opensearch-project/OpenSearch/pull/11425))
- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238))
- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679))
- Fix noop_update_total metric in indexing stats cannot be updated by bulk API ([#11485](https://github.com/opensearch-project/OpenSearch/pull/11485))
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))
- Fix template setting override for replication type ([#11417](https://github.com/opensearch-project/OpenSearch/pull/11417))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}

public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception {
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

//make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Contains metadata for weighted routing
Expand Down Expand Up @@ -99,7 +100,7 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE
public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Double attrValue;
String attributeName = null;
String attributeName = "";
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Double> weights = new HashMap<>();
WeightedRouting weightedRouting;
XContentParser.Token token;
Expand Down Expand Up @@ -162,12 +163,12 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingMetadata that = (WeightedRoutingMetadata) o;
return weightedRouting.equals(that.weightedRouting);
return weightedRouting.equals(that.weightedRouting) && version == that.version;
}

@Override
public int hashCode() {
return weightedRouting.hashCode();
return Objects.hash(weightedRouting.hashCode(), version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public boolean isSet() {

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeString(attributeName);
out.writeGenericValue(weights);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,60 @@

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.AbstractXContentTestCase;
import org.opensearch.test.AbstractDiffableSerializationTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class WeightedRoutingMetadataTests extends AbstractXContentTestCase<WeightedRoutingMetadata> {
public class WeightedRoutingMetadataTests extends AbstractDiffableSerializationTestCase<Metadata.Custom> {

@Override
protected Writeable.Reader<Metadata.Custom> instanceReader() {
return WeightedRoutingMetadata::new;
}

@Override
protected WeightedRoutingMetadata createTestInstance() {
String attributeName = "zone";
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
if (randomBoolean()) {
weights = new HashMap<>();
attributeName = "";
}
WeightedRouting weightedRouting = new WeightedRouting(attributeName, weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);

return weightedRoutingMetadata;
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}

@Override
protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException {
return WeightedRoutingMetadata.fromXContent(parser);
}

@Override
protected boolean supportsUnknownFields() {
return false;
protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) {

WeightedRouting weightedRouting = new WeightedRouting("", new HashMap<>());
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);
return weightedRoutingMetadata;
}

@Override
protected Writeable.Reader<Diff<Metadata.Custom>> diffReader() {
return WeightedRoutingMetadata::readDiffFrom;
}

}
Loading