From 38d3171fd58a14627e07b6ca3a5e1cab1087c56a Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 12 Feb 2024 11:51:52 +0530 Subject: [PATCH] Fix for deserilization bug in weighted round robin metadata (#11679) * Fix for deserialization bug in weighted round robin metadata Signed-off-by: Anshu Agarwal * Add changelog Signed-off-by: Anshu Agarwal * Add null check Signed-off-by: Anshu Agarwal * Add integ test Signed-off-by: Anshu Agarwal * spotless fix Signed-off-by: Anshu Agarwal --------- Signed-off-by: Anshu Agarwal Co-authored-by: Anshu Agarwal --- CHANGELOG.md | 1 + .../cluster/routing/WeightedRoutingIT.java | 142 ++++++++++++++++++ .../metadata/WeightedRoutingMetadata.java | 7 +- .../cluster/routing/WeightedRouting.java | 1 + .../WeightedRoutingMetadataTests.java | 41 ++++- 5 files changed, 184 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d79a202af68c0..0ee5bff71496e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -151,6 +151,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 2e0dd579d6910..d6d22c95ee5a2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -13,12 +13,14 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.plugins.Plugin; import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.transport.MockTransportService; @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep assertFalse(nodeLocalHealth.isTimedOut()); assertTrue(nodeLocalHealth.hasDiscoveredClusterManager()); } + + public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // check weighted routing metadata after node restart, ensure node comes healthy after restart + internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index bc24dd22f5c6e..b303c3a2034d5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Contains metadata for weighted routing @@ -99,7 +100,7 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOE public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException { String attrKey = null; Double attrValue; - String attributeName = null; + String attributeName = ""; Map weights = new HashMap<>(); WeightedRouting weightedRouting; XContentParser.Token token; @@ -162,12 +163,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WeightedRoutingMetadata that = (WeightedRoutingMetadata) o; - return weightedRouting.equals(that.weightedRouting); + return weightedRouting.equals(that.weightedRouting) && version == that.version; } @Override public int hashCode() { - return weightedRouting.hashCode(); + return Objects.hash(weightedRouting.hashCode(), version); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index 2b93a1483b801..468fac08d2946 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -54,6 +54,7 @@ public boolean isSet() { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeString(attributeName); out.writeGenericValue(weights); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index b78d1b56364eb..e19bde5d53d8a 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -8,29 +8,60 @@ package org.opensearch.cluster.metadata; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.test.AbstractXContentTestCase; +import org.opensearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; +import java.util.HashMap; import java.util.Map; -public class WeightedRoutingMetadataTests extends AbstractXContentTestCase { +public class WeightedRoutingMetadataTests extends AbstractDiffableSerializationTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return WeightedRoutingMetadata::new; + } + @Override protected WeightedRoutingMetadata createTestInstance() { + String attributeName = "zone"; Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); - WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + if (randomBoolean()) { + weights = new HashMap<>(); + attributeName = ""; + } + WeightedRouting weightedRouting = new WeightedRouting(attributeName, weights); WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); + return weightedRoutingMetadata; } + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + } + @Override protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException { return WeightedRoutingMetadata.fromXContent(parser); } @Override - protected boolean supportsUnknownFields() { - return false; + protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { + + WeightedRouting weightedRouting = new WeightedRouting("", new HashMap<>()); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); + return weightedRoutingMetadata; } + + @Override + protected Writeable.Reader> diffReader() { + return WeightedRoutingMetadata::readDiffFrom; + } + }