From 70ff19cc655ffab7faf5dd05ce23c5fb426239b9 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 10 Jan 2023 17:24:02 +0530 Subject: [PATCH] Add support for API versioning and fail open in weighted shard routing (#5784) * [Weighted Routing] Add support for discovered master and remove local weights in the response (#5680) * Add support for discovered master and remove local weights in the weighted routing API response Signed-off-by: Anshu Agarwal * [Weighted Shard Routing] API versioning (#5255) * Support API versioning for weighted shard routing Signed-off-by: Anshu Agarwal * [Weighted Shard Routing] Fail open requests on search shard failures (#5072) * Fail open requests on search shard failures ( Signed-off-by: Anshu Agarwal * Address fail open comments (#5778) [Weighted Shard Routing] Refactor and fix singleton in FailAwareWeightedRouting Signed-off-by: Anshu Agarwal * remove unintended changes in changelog Signed-off-by: Anshu Agarwal * remove unintended changes from changelog Signed-off-by: Anshu Agarwal Signed-off-by: Anshu Agarwal Co-authored-by: Anshu Agarwal --- CHANGELOG.md | 3 + .../AwarenessAttributeDecommissionRestIT.java | 1 + .../AwarenessAttributeDecommissionIT.java | 7 + .../cluster/routing/WeightedRoutingIT.java | 224 +++++- .../search/SearchWeightedRoutingIT.java | 646 +++++++++++++++++- .../ClusterDeleteWeightedRoutingRequest.java | 109 ++- ...erDeleteWeightedRoutingRequestBuilder.java | 11 + .../ClusterGetWeightedRoutingResponse.java | 110 ++- .../TransportGetWeightedRoutingAction.java | 17 +- .../put/ClusterPutWeightedRoutingRequest.java | 69 +- ...usterPutWeightedRoutingRequestBuilder.java | 4 + ...TransportFieldCapabilitiesIndexAction.java | 10 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../action/search/SearchShardIterator.java | 2 +- .../broadcast/TransportBroadcastAction.java | 4 +- .../shard/TransportSingleShardAction.java | 7 +- .../java/org/opensearch/client/Requests.java | 4 +- .../metadata/WeightedRoutingMetadata.java | 67 +- .../routing/FailAwareWeightedRouting.java | 153 +++++ .../routing/IndexShardRoutingTable.java | 42 +- .../cluster/routing/OperationRouting.java | 19 +- .../cluster/routing/WeightedRouting.java | 10 + .../routing/WeightedRoutingService.java | 67 +- .../common/settings/ClusterSettings.java | 1 + ...estClusterDeleteWeightedRoutingAction.java | 20 +- .../RestClusterPutWeightedRoutingAction.java | 2 +- ...ClusterPutWeightedRoutingRequestTests.java | 21 +- ...lusterGetWeightedRoutingResponseTests.java | 2 +- ...ransportGetWeightedRoutingActionTests.java | 8 +- .../DecommissionServiceTests.java | 2 +- .../WeightedRoutingMetadataTests.java | 2 +- .../FailAwareWeightedRoutingTests.java | 266 ++++++++ .../routing/OperationRoutingTests.java | 5 +- .../routing/WeightedRoutingServiceTests.java | 6 +- .../structure/RoutingIteratorTests.java | 135 +++- ...tClusterAddWeightedRoutingActionTests.java | 11 +- ...usterDeleteWeightedRoutingActionTests.java | 79 +++ 37 files changed, 1972 insertions(+), 178 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4df0edacaa3a4..6f4dc194ff33a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,14 +14,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update to Gradle 7.6 ([#5382](https://github.com/opensearch-project/OpenSearch/pull/5382)) - Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299)) - Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) +- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072)) - Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) - Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) - Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582)) - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) +- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) - Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429)) - Added Request level Durability using Remote Translog functionality ([#5757](https://github.com/opensearch-project/OpenSearch/pull/5757)) +- Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680)) ### Dependencies - Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148)) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java index 4d9115b8962ea..b7228a75984fa 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 8dde5f3dc3a93..07580f17a67bc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -223,6 +223,7 @@ public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -453,6 +454,7 @@ private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterMana .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -565,6 +567,7 @@ public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned() .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -649,6 +652,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -769,6 +773,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -809,6 +814,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Ex .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -922,6 +928,7 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index bba07d878a42c..d3c0fa9ae73af 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -8,22 +8,39 @@ package org.opensearch.cluster.routing; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.rest.RestStatus; +import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) public class WeightedRoutingIT extends OpenSearchIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); + } + public void testPutWeightedRouting() { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") @@ -64,6 +81,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -73,6 +91,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(0) .get(); assertEquals(response.isAcknowledged(), true); } @@ -157,6 +176,7 @@ public void testGetWeightedRouting_WeightsNotSet() { .setAwarenessAttribute("zone") .get(); assertNull(weightedRoutingResponse.weights()); + assertNull(weightedRoutingResponse.getDiscoveredClusterManager()); } public void testGetWeightedRouting_WeightsAreSet() throws IOException { @@ -199,6 +219,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -209,8 +230,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .setAwarenessAttribute("zone") .get(); assertEquals(weightedRouting, weightedRoutingResponse.weights()); + assertTrue(weightedRoutingResponse.getDiscoveredClusterManager()); - // get api to fetch local node weight for a node in zone a + // get api to fetch local weighted routing for a node in zone a weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1))) .admin() .cluster() @@ -219,9 +241,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .setRequestLocal(true) .get(); assertEquals(weightedRouting, weightedRoutingResponse.weights()); - assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight()); + assertTrue(weightedRoutingResponse.getDiscoveredClusterManager()); - // get api to fetch local node weight for a node in zone b + // get api to fetch local weighted routing for a node in zone b weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1))) .admin() .cluster() @@ -230,9 +252,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .setRequestLocal(true) .get(); assertEquals(weightedRouting, weightedRoutingResponse.weights()); - assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight()); + assertTrue(weightedRoutingResponse.getDiscoveredClusterManager()); - // get api to fetch local node weight for a node in zone c + // get api to fetch local weighted routing for a node in zone c weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1))) .admin() .cluster() @@ -241,7 +263,82 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .setRequestLocal(true) .get(); assertEquals(weightedRouting, weightedRoutingResponse.weights()); - assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight()); + assertTrue(weightedRoutingResponse.getDiscoveredClusterManager()); + + } + + public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.fault_detection.leader_check.timeout", 10000 + "ms") + .put("cluster.fault_detection.leader_check.retry_count", 1) + .build(); + + int nodeCountPerAZ = 1; + + logger.info("--> starting a dedicated cluster manager node"); + String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + // put api call to set weights + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + + Set nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + // wait for leader checker to fail + Thread.sleep(13000); + + // get api to fetch local weighted routing for a node in zone a or b + ClusterGetWeightedRoutingResponse weightedRoutingResponse = internalCluster().client( + randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0)) + ).admin().cluster().prepareGetWeightedRouting().setAwarenessAttribute("zone").setRequestLocal(true).get(); + assertEquals(weightedRouting, weightedRoutingResponse.weights()); + assertFalse(weightedRoutingResponse.getDiscoveredClusterManager()); + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + } public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { @@ -270,6 +367,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -307,12 +405,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() { assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); - assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + ResourceNotFoundException exception = expectThrows( + ResourceNotFoundException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get() + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); } - public void testDeleteWeightedRouting_WeightsAreSet() { + public void testDeleteWeightedRouting_WeightsAreSet() throws IOException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -339,13 +439,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + } + + public void testPutAndDeleteWithVersioning() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // update weights api call with correct version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get(); + assertTrue(response.isAcknowledged()); + + // update weights api call with incorrect version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights); + UnsupportedWeightedRoutingStateException exception = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get() + ); + assertEquals(exception.status(), RestStatus.CONFLICT); + + // get weights call + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + + // update weights call using version returned by get api call + weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(weightedRoutingResponse.getVersion()) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights by awareness attribute + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin() + .cluster() + .prepareDeleteWeightedRouting() + .setAwarenessAttribute("zone") + .setVersion(2) + .get(); + assertTrue(deleteResponse.isAcknowledged()); + + // update weights again and make sure that version number got updated on delete + weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights + deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // delete weights call, incorrect version number + UnsupportedWeightedRoutingStateException deleteException = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get() + ); + assertEquals(RestStatus.CONFLICT, deleteException.status()); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 097775b7ab4ac..b0afbc6983c95 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -14,38 +14,60 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.search.stats.SearchStats; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase { + @Override - protected int numberOfReplicas() { - return 2; + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); } public void testSearchWithWRRShardRouting() throws IOException { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put("cluster.routing.weighted.fail_open", false) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -86,6 +108,7 @@ public void testSearchWithWRRShardRouting() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -101,7 +124,8 @@ public void testSearchWithWRRShardRouting() throws IOException { hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); } } - // search should not go to nodes in zone c + // search should not go to nodes in zone c with weight zero in case + // shard copies are available in other zones assertThat(hitNodes.size(), lessThanOrEqualTo(4)); DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); List nodeIdsFromZoneWithWeightZero = new ArrayList<>(); @@ -129,7 +153,7 @@ public void testSearchWithWRRShardRouting() throws IOException { logger.info("--> deleted shard routing weights for weighted round robin"); - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertEquals(deleteResponse.isAcknowledged(), true); hitNodes = new HashSet<>(); @@ -158,4 +182,618 @@ public void testSearchWithWRRShardRouting() throws IOException { } } + private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + Map> nodeMap = new HashMap<>(); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + nodeMap.put("a", nodes_in_zone_a); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + nodeMap.put("b", nodes_in_zone_b); + + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + nodeMap.put("c", nodes_in_zone_c); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + return nodeMap; + + } + + private void setUpIndexing(int numShards, int numReplicas) { + assertAcked( + prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas) + ) + ); + ensureGreen(); + + logger.info("--> creating indices for test"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get(); + } + refresh("test"); + } + + private void setShardRoutingWeights(Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + } + + /** + * Shard routing request fail without fail-open if there are no healthy nodes in active az to serve request + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to check that search requests fail. + * @throws Exception throws Exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount > 0); + logger.info("--> failed request count is [()]", failedCount); + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request with fail open enabled is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs (with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to make sure shard search requests do not fail. + * @throws IOException throws exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount == 0); + assertSearchInAZ("c"); + } + + /** + * Shard routing request with fail open disabled is not served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are not served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + int failedShardCount = 0; + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + failedShardCount += searchResponse.getFailedShards(); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + Assert.assertTrue(failedShardCount > 0); + // assert that no search request goes to az with weight zero + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs.(with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + private void assertNoSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + assertEquals(0, searchStats.getQueryCount()); + assertEquals(0, searchStats.getFetchCount()); + } + } + } + } + + private void assertSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + Assert.assertTrue(searchStats.getFetchCount() > 0L); + Assert.assertTrue(searchStats.getQueryCount() > 0L); + } + } + } + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testSearchAggregationWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + assertAcked( + prepareCreate("index").setMapping("f", "type=keyword") + .setSettings(Settings.builder().put("index" + ".number_of_shards", 10).put("index" + ".number_of_replicas", 1)) + ); + + int numDocs = 10; + List docs = new ArrayList<>(); + for (int i = 0; i < numDocs; ++i) { + docs.add(client().prepareIndex("index").setSource("f", Integer.toString(i / 3))); + } + indexRandom(true, docs); + ensureGreen(); + refresh("index"); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[51]; + int size = 17; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("index") + .setSize(20) + .addAggregation(terms("f").field("f")) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + Aggregations aggregations = searchResponse.getAggregations(); + assertNotNull(aggregations); + Terms terms = aggregations.get("f"); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open enabled. No request failure on network disruption + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + assertThat(multiGetResponse.getResponses()[0].isFailed(), equalTo(false)); + assertThat(multiGetResponse.getResponses()[1].isFailed(), equalTo(false)); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open disabled. Assert that some requests do fail. + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) { + failedCount++; + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + Assert.assertTrue(failedCount > 0); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index 71eab8ff35a2d..5451cec1db21d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -8,12 +8,26 @@ package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; import java.io.IOException; +import java.util.Map; /** * Request to delete weights for weighted round-robin shard routing policy. @@ -21,10 +35,42 @@ * @opensearch.internal */ public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { - public ClusterDeleteWeightedRoutingRequest() {} + private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class); + + private long version; + private String awarenessAttribute; + + public void setVersion(long version) { + this.version = version; + } + + ClusterDeleteWeightedRoutingRequest() { + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + } public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); + version = in.readLong(); + if (in.available() != 0) { + awarenessAttribute = in.readString(); + } + } + + public long getVersion() { + return version; + } + + public String getAwarenessAttribute() { + return awarenessAttribute; + } + + public void setAwarenessAttribute(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; } @Override @@ -32,13 +78,72 @@ public ActionRequestValidationException validate() { return null; } + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterDeleteWeightedRoutingRequest source(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setRequestBody(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + public void setRequestBody(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String versionAttr = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + } else { + throw new OpenSearchParseException( + "failed to parse delete weighted routing request body [{}], unknown type", + fieldName + ); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + this.version = Long.parseLong(parser.text()); + } + } else { + throw new OpenSearchParseException("failed to parse delete weighted routing request body"); + } + } + } catch (IOException e) { + logger.error("error while parsing delete request for weighted routing request object", e); + } + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeLong(version); + if (awarenessAttribute != null) { + out.writeString(awarenessAttribute); + } } @Override public String toString() { - return "ClusterDeleteWeightedRoutingRequest"; + return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java index 19976ac6b07aa..bb34fea589534 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) { super(client, action, new ClusterDeleteWeightedRoutingRequest()); } + + public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) { + request.setVersion(version); + return this; + } + + public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) { + request.setAwarenessAttribute(attribute); + return this; + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index bb77576b63d20..9a2858f17c53e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -31,26 +32,46 @@ * @opensearch.internal */ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject { - private WeightedRouting weightedRouting; - private String localNodeWeight; - private static final String NODE_WEIGHT = "node_weight"; - public String getLocalNodeWeight() { - return localNodeWeight; + private static final String WEIGHTS = "weights"; + private long version; + private final Boolean discoveredClusterManager; + + private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager"; + + public WeightedRouting getWeightedRouting() { + return weightedRouting; + } + + private final WeightedRouting weightedRouting; + + public long getVersion() { + return version; + } + + public Boolean getDiscoveredClusterManager() { + return discoveredClusterManager; } ClusterGetWeightedRoutingResponse() { this.weightedRouting = null; + this.discoveredClusterManager = null; } - public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) { - this.localNodeWeight = localNodeWeight; + public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager, long version) { + this.discoveredClusterManager = discoveredClusterManager; this.weightedRouting = weightedRouting; + this.version = version; } ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException { if (in.available() != 0) { this.weightedRouting = new WeightedRouting(in); + this.version = in.readLong(); + this.discoveredClusterManager = in.readOptionalBoolean(); + } else { + this.weightedRouting = null; + this.discoveredClusterManager = null; } } @@ -67,6 +88,10 @@ public WeightedRouting weights() { public void writeTo(StreamOutput out) throws IOException { if (weightedRouting != null) { weightedRouting.writeTo(out); + out.writeLong(version); + } + if (discoveredClusterManager != null) { + out.writeOptionalBoolean(discoveredClusterManager); } } @@ -74,11 +99,15 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (this.weightedRouting != null) { + builder.startObject(WEIGHTS); for (Map.Entry entry : weightedRouting.weights().entrySet()) { builder.field(entry.getKey(), entry.getValue().toString()); } - if (localNodeWeight != null) { - builder.field(NODE_WEIGHT, localNodeWeight); + + builder.endObject(); + builder.field(WeightedRoutingMetadata.VERSION, version); + if (discoveredClusterManager != null) { + builder.field(DISCOVERED_CLUSTER_MANAGER, discoveredClusterManager); } } builder.endObject(); @@ -88,26 +117,59 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); XContentParser.Token token; - String attrKey = null, attrValue = null; - String localNodeWeight = null; + String attrKey = null, attrValue; + Boolean discoveredClusterManager = null; Map weights = new HashMap<>(); + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + String weightsAttr; + String fieldName = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrKey = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrValue = parser.text(); - if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { - localNodeWeight = attrValue; - } else if (attrKey != null) { - weights.put(attrKey, Double.parseDouble(attrValue)); + fieldName = parser.currentName(); + if (fieldName != null + && (fieldName.equals(WeightedRoutingMetadata.VERSION) || fieldName.equals(DISCOVERED_CLUSTER_MANAGER))) { + continue; + } else if (fieldName != null && fieldName.equals(WEIGHTS)) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object", fieldName); } - } else { - throw new OpenSearchParseException("failed to parse weighted routing response"); - } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + if (attrKey != null) { + weights.put(attrKey, Double.parseDouble(attrValue)); + } + } else { + throw new OpenSearchParseException("failed to parse weighted routing request attribute [{}]", attrKey); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER + && fieldName != null + && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); + + } else if (token == XContentParser.Token.VALUE_BOOLEAN + && fieldName != null + && fieldName.equals(DISCOVERED_CLUSTER_MANAGER)) { + discoveredClusterManager = Boolean.parseBoolean(parser.text()); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request"); + } } + WeightedRouting weightedRouting = new WeightedRouting("", weights); - return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting); + return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager, version); } @Override @@ -115,11 +177,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o; - return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight); + return weightedRouting.equals(that.weightedRouting) && discoveredClusterManager.equals(that.discoveredClusterManager); } @Override public int hashCode() { - return Objects.hash(weightedRouting, localNodeWeight); + return Objects.hash(weightedRouting, discoveredClusterManager); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 9421967a5df26..280fca29944e3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -20,7 +20,6 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.WeightedRoutingService; import org.opensearch.cluster.service.ClusterService; @@ -89,19 +88,13 @@ protected void clusterManagerOperation( weightedRoutingService.verifyAwarenessAttribute(request.getAwarenessAttribute()); WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE); ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(); - String weight = null; if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) { WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); - if (request.local()) { - DiscoveryNode localNode = state.getNodes().getLocalNode(); - if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) { - String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute()); - if (weightedRouting.weights().containsKey(attrVal)) { - weight = weightedRouting.weights().get(attrVal).toString(); - } - } - } - clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting); + clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse( + weightedRouting, + state.nodes().getClusterManagerNodeId() != null, + weightedRoutingMetadata.getVersion() + ); } listener.onResponse(clusterGetWeightedRoutingResponse); } catch (Exception ex) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index 8adbf13a000c5..cba4d0e8e796c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; @@ -43,6 +44,15 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private WeightedRouting weightedRouting; private String attributeName; + private long version; + + public void version(long version) { + this.version = version; + } + + public long getVersion() { + return this.version; + } public ClusterPutWeightedRoutingRequest() {} @@ -62,13 +72,14 @@ public void attributeName(String attributeName) { public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { super(in); weightedRouting = new WeightedRouting(in); + version = in.readLong(); } public ClusterPutWeightedRoutingRequest(String attributeName) { this.attributeName = attributeName; } - public void setWeightedRouting(Map source) { + public void setWeightedRouting(Map source) { try { if (source.isEmpty()) { throw new OpenSearchParseException(("Empty request body")); @@ -96,22 +107,56 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) XContentParser.Token token; // move to the first alias parser.nextToken(); + String versionAttr = null; + String weightsAttr; + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrValue = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrWeight = Double.parseDouble(parser.text()); - weights.put(attrValue, attrWeight); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else if (fieldName != null && fieldName.equals("weights")) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object [{}]", fieldName); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); + } } else { throw new OpenSearchParseException( - "failed to parse weighted routing request attribute [{}], " + "unknown type", - attrWeight + "failed to parse weighted routing request " + "[{}], unknown " + "type", + attributeName ); } } this.weightedRouting = new WeightedRouting(this.attributeName, weights); + this.version = version; } catch (IOException e) { - logger.error("error while parsing put for weighted routing request object", e); + logger.error("error while parsing put weighted routing request object", e); } } @@ -127,6 +172,9 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } + if (version == WeightedRoutingMetadata.VERSION_UNSET_VALUE) { + validationException = addValidationError("Version is missing", validationException); + } int countValueWithZeroWeights = 0; double weight; try { @@ -164,7 +212,7 @@ public ActionRequestValidationException validate() { * @param source weights definition from request body * @return this request */ - public ClusterPutWeightedRoutingRequest source(Map source) { + public ClusterPutWeightedRoutingRequest source(Map source) { setWeightedRouting(source); return this; } @@ -173,11 +221,12 @@ public ClusterPutWeightedRoutingRequest source(Map source) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); weightedRouting.writeTo(out); + out.writeLong(version); } @Override public String toString() { - return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "version= " + version + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java index b437f4c54d8d6..adfb2cf02f6d9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -30,4 +30,8 @@ public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRoutin return this; } + public ClusterPutWeightedRoutingRequestBuilder setVersion(long version) { + request.version(version); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 99962741299ca..e33e03b90ddfc 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -262,16 +263,17 @@ private void onFailure(ShardRouting shardRouting, Exception e) { tryNext(e, false); } - private ShardRouting nextRoutingOrNull() { + private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } - ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); + ShardRouting next = FailAwareWeightedRouting.getInstance().findNext(shardsIt.get(shardIndex), clusterService.state(), failure); + if (next != null) { return next; } moveToNextShard(); - return nextRoutingOrNull(); + return nextRoutingOrNull(failure); } private void moveToNextShard() { @@ -279,7 +281,7 @@ private void moveToNextShard() { } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); + ShardRouting shardRouting = nextRoutingOrNull(lastFailure); if (shardRouting == null) { if (canMatchShard == false) { listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 0876bf93a557b..1a37406e19f14 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.action.ShardOperationFailedException; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard onShardFailure(shardIndex, shard, e); - final SearchShardTarget nextShard = shardIt.nextOrNull(); + SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e); + final boolean lastShard = nextShard == null; logger.debug( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java index 72951f60c286e..45e4c1a54eeba 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java @@ -119,7 +119,7 @@ public String getClusterAlias() { return clusterAlias; } - SearchShardTarget nextOrNull() { + public SearchShardTarget nextOrNull() { final String nodeId = targetNodesIterator.nextOrNull(); if (nodeId != null) { return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices); diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index a69853dc6a3c0..10645c744b2f3 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int // we set the shard failure always, even if its the first in the replication group, and the next one // will work (it will just override it...) setFailure(shardIt, shardIndex, e); - ShardRouting nextShard = shardIt.nextOrNull(); + ShardRouting nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), e); + if (nextShard != null) { if (e != null) { if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df39bd29493dd..d8c4913e595a4 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardsIterator; import org.opensearch.cluster.service.ClusterService; @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) { lastFailure = currentFailure; this.lastFailure = currentFailure; } - final ShardRouting shardRouting = shardIt.nextOrNull(); + ShardRouting shardRouting = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), currentFailure); + if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) { ); } final Writeable.Reader reader = getResponseReader(); + ShardRouting finalShardRouting = shardRouting; transportService.sendRequest( node, transportShardAction, @@ -296,7 +299,7 @@ public void handleResponse(final Response response) { @Override public void handleException(TransportException exp) { - onFailure(shardRouting, exp); + onFailure(finalShardRouting, exp); } } ); diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 21f2a2d906602..cad5bac8acf0d 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -578,8 +578,8 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String * * @return delete weight request */ - public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() { - return new ClusterDeleteWeightedRoutingRequest(); + public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest(String attributeName) { + return new ClusterDeleteWeightedRoutingRequest(attributeName); } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 07cdc949c4529..c7136785606f6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -36,6 +36,16 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable weights = new HashMap<>(); - WeightedRouting weightedRouting = null; + WeightedRouting weightedRouting; XContentParser.Token token; - String awarenessField = null; + String awarenessField; + String versionAttr = null; + long version = VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - awarenessField = parser.currentName(); + String attr = parser.currentName(); + if (attr != null && attr.equals(VERSION)) { + versionAttr = parser.currentName(); + continue; + } else { + awarenessField = parser.currentName(); + } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected " + "object", - awarenessField - ); + throw new OpenSearchParseException("failed to parse weighted routing metadata [{}], expected object", awarenessField); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { attributeName = parser.currentName(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected" + " object", + "failed to parse weighted routing metadata [{}], expected object", attributeName ); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { - attrValue = Double.parseDouble(parser.text()); - weights.put(attrKey, attrValue); + if (attrKey != null && attrKey.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } else { + attrValue = Double.parseDouble(parser.text()); + weights.put(attrKey, attrValue); + } + } else { throw new OpenSearchParseException( "failed to parse weighted routing metadata attribute " + "[{}], unknown type", @@ -123,10 +147,14 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws } } } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } } } weightedRouting = new WeightedRouting(attributeName, weights); - return new WeightedRoutingMetadata(weightedRouting); + return new WeightedRoutingMetadata(weightedRouting, version); } @Override @@ -144,18 +172,21 @@ public int hashCode() { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(weightedRouting, builder); + toXContent(weightedRouting, builder, version); return builder; } - public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException { + public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder, long version) throws IOException { builder.startObject(AWARENESS); - builder.startObject(weightedRouting.attributeName()); - for (Map.Entry entry : weightedRouting.weights().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); + if (weightedRouting.isSet()) { + builder.startObject(weightedRouting.attributeName()); + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); } builder.endObject(); - builder.endObject(); + builder.field(VERSION, version); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java new file mode 100644 index 0000000000000..0ddaed2157514 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.search.SearchShardIterator; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.index.shard.ShardId; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.SearchShardTarget; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** + * This class contains logic to find next shard to retry search request in case of failure from other shard copy. + * This decides if retryable shard search requests can be tried on shard copies present in data + * nodes whose attribute value weight for weighted shard routing is set to zero. + */ + +public class FailAwareWeightedRouting { + + public static final FailAwareWeightedRouting INSTANCE = new FailAwareWeightedRouting(); + private static final Logger logger = LogManager.getLogger(FailAwareWeightedRouting.class); + + private final static List internalErrorRestStatusList = List.of( + RestStatus.INTERNAL_SERVER_ERROR, + RestStatus.BAD_GATEWAY, + RestStatus.SERVICE_UNAVAILABLE, + RestStatus.GATEWAY_TIMEOUT + ); + + public static FailAwareWeightedRouting getInstance() { + return INSTANCE; + } + + /** + * * + * @return true if exception is due to cluster availability issues + */ + private boolean isInternalFailure(Exception exception) { + if (exception instanceof OpenSearchException) { + // checking for 5xx failures + return internalErrorRestStatusList.contains(((OpenSearchException) exception).status()); + } + return false; + } + + /** + * This function checks if the shard is present in data node with weighted routing weight set to 0, + * In such cases we fail open, if shard search request for the shard from other shard copies fail with non + * retryable exception. + * + * @param nodeId the node with the shard copy + * @return true if the node has attribute value with shard routing weight set to zero, else false + */ + private boolean isWeighedAway(String nodeId, ClusterState clusterState) { + DiscoveryNode node = clusterState.nodes().get(nodeId); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null && weightedRouting.isSet()) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + + for (Object key : keys.toArray()) { + if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) { + return true; + } + } + } + } + return false; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) { + SearchShardTarget next = shardIt.nextOrNull(); + while (next != null && isWeighedAway(next.getNodeId(), clusterState)) { + SearchShardTarget nextShard = next; + if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception); + break; + } + next = shardIt.nextOrNull(); + } + return next; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) { + ShardRouting next = shardsIt.nextOrNull(); + + while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) { + ShardRouting nextShard = next; + if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception); + break; + } + next = shardsIt.nextOrNull(); + } + return next; + } + + /** + * * + * @return true if can fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + */ + private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { + return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + } + + private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { + List shards = clusterState.routingTable().shardRoutingTable(shardId).shards(); + for (ShardRouting shardRouting : shards) { + if (!shardRouting.active()) { + return true; + } + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 9026e7068e9fe..207570c1d56b2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -32,6 +32,9 @@ package org.opensearch.cluster.routing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -57,6 +60,8 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable { private volatile Map> activeShardsByWeight = emptyMap(); private volatile Map> initializingShardsByWeight = emptyMap(); + private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class); + /** * The initializing list, including ones that are initializing on a target node because of relocation. * If we can come up with a better variable name, it would be nice... @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt( * * @param weightedRouting entity * @param nodes discovered nodes in the cluster + * @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present + * in node attribute value with weight zero * @return an iterator over active and initializing shards, ordered by weighted round-robin * scheduling policy. Making sure that initializing shards are the last to iterate through. */ - public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + public ShardIterator activeInitializingShardsWeightedIt( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + boolean isFailOpenEnabled + ) { final int seed = shuffler.nextSeed(); List ordered = new ArrayList<>(); List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + List orderedListWithDistinctShards; ordered.addAll(shuffler.shuffle(orderedActiveShards, seed)); if (!allInitializingShards.isEmpty()) { List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); ordered.addAll(orderedInitializingShards); } - return new PlainShardIterator(shardId, ordered); + + // append shards for attribute value with weight zero, so that shard search requests can be tried on + // shard copies in case of request failure from other attribute values. + if (isFailOpenEnabled) { + try { + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + keys.forEach(key -> { + ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes); + while (iterator.remaining() > 0) { + ordered.add(iterator.nextOrNull()); + } + }); + } catch (IllegalArgumentException e) { + // this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard + // copies found is zero + logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); + } + } + orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList()); + return new PlainShardIterator(shardId, orderedListWithDistinctShards); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 9026da667ccb0..d7df1a2c2181b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -83,10 +83,18 @@ public class OperationRouting { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + public static final Setting WEIGHTED_ROUTING_FAILOPEN_ENABLED = Setting.boolSetting( + "cluster.routing.weighted.fail_open", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; + private volatile boolean isFailOpenEnabled; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -98,9 +106,11 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { ); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); + this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); + clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -115,6 +125,10 @@ void setWeightedRoutingDefaultWeight(double weightedRoutingDefaultWeight) { this.weightedRoutingDefaultWeight = weightedRoutingDefaultWeight; } + void setFailOpenEnabled(boolean isFailOpenEnabled) { + this.isFailOpenEnabled = isFailOpenEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -324,11 +338,12 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null) { + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, - getWeightedRoutingDefaultWeight() + getWeightedRoutingDefaultWeight(), + isFailOpenEnabled ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index df2d8d595eaab..bff8dd833f2de 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,6 +26,11 @@ public class WeightedRouting implements Writeable { private String attributeName; private Map weights; + public WeightedRouting() { + this.attributeName = ""; + this.weights = new HashMap<>(3); + } + public WeightedRouting(String attributeName, Map weights) { this.attributeName = attributeName; this.weights = weights; @@ -40,6 +46,10 @@ public WeightedRouting(StreamInput in) throws IOException { weights = (Map) in.readGenericValue(); } + public boolean isSet() { + return (!this.attributeName.isEmpty() && !this.weights.isEmpty()); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(attributeName); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 895b790f8499a..9992930a1a7f6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; @@ -77,13 +78,16 @@ public WeightedRoutingService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + } public void registerWeightedRoutingMetadata( final ClusterPutWeightedRoutingRequest request, final ActionListener listener ) { - final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + final WeightedRouting newWeightedRouting = new WeightedRouting(request.getWeightedRouting()); + + final long requestVersion = request.getVersion(); clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -94,19 +98,21 @@ public ClusterState execute(ClusterState currentState) { Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); if (weightedRoutingMetadata == null) { - logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + logger.info("add weighted routing weights in metadata [{}]", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { - if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { - logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + if (!newWeightedRouting.equals(weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { + logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); return currentState; } } mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); - logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -124,23 +130,37 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private boolean checkIfSameWeightsInMetadata( - WeightedRoutingMetadata newWeightedRoutingMetadata, - WeightedRoutingMetadata oldWeightedRoutingMetadata - ) { - return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); - } - public void deleteWeightedRoutingMetadata( final ClusterDeleteWeightedRoutingRequest request, final ActionListener listener ) { + final long requestVersion = request.getVersion(); + final String awarenessAttribute = request.getAwarenessAttribute(); clusterService.submitStateUpdateTask("delete_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { logger.info("Deleting weighted routing metadata from the cluster state"); + Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - mdBuilder.removeCustom(WeightedRoutingMetadata.TYPE); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); + + if ((weightedRoutingMetadata != null && awarenessAttribute == null) + || (weightedRoutingMetadata != null + && weightedRoutingMetadata.getWeightedRouting().attributeName().equals(awarenessAttribute))) { + weightedRoutingMetadata = new WeightedRoutingMetadata(new WeightedRouting(), weightedRoutingMetadata.getVersion() + 1); + } else { + throw new ResourceNotFoundException( + String.format( + Locale.ROOT, + "weighted routing metadata does not have weights set for awareness attribute %s", + awarenessAttribute + ) + ); + } + + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights deleted"); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -153,7 +173,6 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); - assert newState.metadata().weightedRoutingMetadata() == null; listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); } }); @@ -183,7 +202,7 @@ public void verifyAwarenessAttribute(String attributeName) { if (getAwarenessAttributes().contains(attributeName) == false) { ActionRequestValidationException validationException = null; validationException = addValidationError( - String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + String.format(Locale.ROOT, "invalid awareness attribute %s requested for weighted routing", attributeName), validationException ); throw validationException; @@ -270,4 +289,18 @@ private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, Clus ); } } + + private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetadata weightedRoutingMetadata) { + if ((weightedRoutingMetadata == null && requestedVersion != WeightedRoutingMetadata.INITIAL_VERSION) + || (weightedRoutingMetadata != null && requestedVersion != weightedRoutingMetadata.getVersion())) { + throw new UnsupportedWeightedRoutingStateException( + String.format( + Locale.ROOT, + "requested version is %s but cluster weighted routing metadata is at a " + "different version %s ", + requestedVersion, + weightedRoutingMetadata != null ? weightedRoutingMetadata.getVersion() : WeightedRoutingMetadata.INITIAL_VERSION + ) + ); + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f97df4e7cdd5e..0668acf3d066e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -540,6 +540,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, + OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java index 9742cc373d520..d9dedf8d14506 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.DELETE; /** @@ -35,7 +36,12 @@ public class RestClusterDeleteWeightedRoutingAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(DELETE, "/_cluster/routing/awareness/weights")); + return unmodifiableList( + asList( + new Route(DELETE, "/_cluster/routing/awareness/weights"), + new Route(DELETE, "/_cluster/routing/awareness/{attribute}/weights") + ) + ); } @Override @@ -45,9 +51,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = createRequest(request); return channel -> client.admin() .cluster() .deleteWeightedRouting(clusterDeleteWeightedRoutingRequest, new RestToXContentListener<>(channel)); } + + public static ClusterDeleteWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest( + request.param("attribute") + ); + request.applyContentParser(p -> deleteWeightedRoutingRequest.source(p.mapStrings())); + return deleteWeightedRoutingRequest; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 1cf44e665cf84..5f845b7a66c1f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -51,7 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); - request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapOrdered())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 5e456158941b8..9d0ed8e03d7f2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -19,16 +19,18 @@ public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { public void testSetWeightedRoutingWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); - assertEquals(request.getWeightedRouting(), weightedRouting); + assertEquals(weightedRouting, request.getWeightedRouting()); + assertEquals(1, request.getVersion()); } public void testValidate_ValuesAreProper() { - String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -45,7 +47,7 @@ public void testValidate_MissingWeights() { } public void testValidate_AttributeMissing() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\": \"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -54,7 +56,7 @@ public void testValidate_AttributeMissing() { } public void testValidate_MoreThanHalfWithZeroWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + "\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -63,4 +65,13 @@ public void testValidate_MoreThanHalfWithZeroWeight() { actionRequestValidationException.getMessage().contains("Maximum expected number of routing weights having zero weight is [1]") ); } + + public void testValidate_VersionMissing() { + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\"}}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Version is missing")); + } } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index e9add55ca774b..6107c03e5a891 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true, 0); return response; } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java index f28e932e068ac..df5b6566b503e 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java @@ -174,7 +174,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -191,7 +191,6 @@ public void testGetWeightedRouting_WeightsNotSetInMetadata() { ClusterState state = clusterService.state(); ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); - assertEquals(response.getLocalNodeWeight(), null); assertEquals(response.weights(), null); } @@ -231,7 +230,8 @@ public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() { ClusterServiceUtils.setState(clusterService, builder); ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); - assertEquals("0.0", response.getLocalNodeWeight()); + assertEquals(true, response.getDiscoveredClusterManager()); + assertEquals(weights, response.getWeightedRouting().weights()); } public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() { @@ -250,7 +250,7 @@ public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() { ClusterServiceUtils.setState(clusterService, builder); ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); - assertEquals(null, response.getLocalNodeWeight()); + assertEquals(null, response.getWeightedRouting()); } @After diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 989ba70533de0..c4124e027dc93 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -332,7 +332,7 @@ public void onFailure(Exception e) { private void setWeightedRoutingWeights(Map weights) { ClusterState clusterState = clusterService.state(); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index a0a9d2bd9586b..17b682618b1a8 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); return weightedRoutingMetadata; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java new file mode 100644 index 0000000000000..c9c616dab0dbc --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.Version; +import org.opensearch.action.OriginalIndicesTests; +import org.opensearch.action.search.SearchShardIterator; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.NodeNotConnectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; + +public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { + + private ClusterState setUpCluster() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + + // set up nodes + DiscoveryNode nodeA = new DiscoveryNode( + "node_zone_a", + buildNewFakeTransportAddress(), + singletonMap("zone", "a"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeB = new DiscoveryNode( + "node_zone_b", + buildNewFakeTransportAddress(), + singletonMap("zone", "b"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeC = new DiscoveryNode( + "node_zone_c", + buildNewFakeTransportAddress(), + singletonMap("zone", "c"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + + nodeBuilder.add(nodeA); + nodeBuilder.add(nodeB); + nodeBuilder.add(nodeC); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + + // set up weighted routing weights + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + + return clusterState; + + } + + public void testFindNextWithoutFailOpen() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNull(next); + } + + public void testFindNextWithFailOpenDueTo5xx() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // Node in zone b is disconnected + DiscoveryNode node = clusterState.nodes().get("node_zone_b"); + // fail open is executed and shard present in node with weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new NodeNotConnectedException(node, "Node is not " + "connected")); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } + + public void testFindNextWithFailOpenDueToUnassignedShard() throws IOException { + + ClusterState clusterState = setUpCluster(); + + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", true, ShardRoutingState.STARTED); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // since there is an unassigned shard in the cluster, fail open is executed and shard present in node with + // weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 014f2d237a306..ffdb2d39fb817 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -789,7 +789,7 @@ private ClusterState clusterStateForWeightedRouting(String[] indexNames, int num private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -904,6 +904,7 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep Settings setting = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) .build(); threadPool = new TestThreadPool("testThatOnlyNodesSupport"); @@ -1158,7 +1159,7 @@ private ClusterState updateStatetoTestWeightedRouting( // add weighted routing weights in metadata Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState.metadata(metadataBuilder); clusterState.routingTable(routingTableBuilder.build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 089fb453ca2c0..1f892b993d4d6 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -175,7 +175,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -260,13 +260,13 @@ public void testDeleteWeightedRoutingMetadata() throws InterruptedException { ClusterState.Builder builder = ClusterState.builder(state); ClusterServiceUtils.setState(clusterService, builder); - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest("zone"); + clusterDeleteWeightedRoutingRequest.setVersion(0); final CountDownLatch countDownLatch = new CountDownLatch(1); ActionListener listener = new ActionListener() { @Override public void onResponse(ClusterDeleteWeightedRoutingResponse clusterDeleteWeightedRoutingResponse) { assertTrue(clusterDeleteWeightedRoutingResponse.isAcknowledged()); - assertNull(clusterService.state().metadata().weightedRoutingMetadata()); countDownLatch.countDown(); } diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 8f5aa1b764551..4196b8882fa8f 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -550,22 +550,20 @@ public void testWeightedRoutingWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 1.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,21 +571,21 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(1, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - weights = Map.of("zone1", 0.0, "zone2", 0.0, "zone3", 0.0); + weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); - assertEquals(0, shardIterator.size()); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + assertEquals(3, shardIterator.size()); + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); } finally { terminate(threadPool); } @@ -646,14 +644,12 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with same WeightedRouting instance assertNotNull( @@ -662,13 +658,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with new instance of WeightedRouting but same weights Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); @@ -679,13 +673,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with different weights Map weights2 = Map.of("zone1", 1.0, "zone2", 0.0, "zone3", 1.0); @@ -696,13 +688,82 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + + } finally { + terminate(threadPool); + } + } + + /** + * Test to validate that shard routing state is maintained across requests + */ + public void testWeightedRoutingShardState() { + TestThreadPool threadPool = null; + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + Map weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + Map requestCount = new HashMap<>(); + + for (int i = 0; i < 5; i++) { + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + + assertEquals(3, shardIterator.size()); + ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + requestCount.put(shardRouting.currentNodeId(), requestCount.getOrDefault(shardRouting.currentNodeId(), 0) + 1); } + assertEquals(3, requestCount.get("node1").intValue()); + assertEquals(2, requestCount.get("node2").intValue()); + } finally { terminate(threadPool); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java index a4cd6224217b7..582fbfce315b2 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -34,14 +34,15 @@ public void setupAction() { } public void testCreateRequest_SupportedRequestBody() throws IOException { - String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); - assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1b").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals(1, clusterPutWeightedRoutingRequest.getVersion()); } public void testCreateRequest_UnsupportedRequestBody() throws IOException { @@ -54,7 +55,7 @@ public void testCreateRequest_UnsupportedRequestBody() throws IOException { public void testCreateRequest_MalformedRequestBody() throws IOException { Map params = new HashMap<>(); - String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0,\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..2589d68e4cf0b --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; + +import static java.util.Collections.singletonMap; + +public class RestClusterDeleteWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testDeleteRequest_SupportedRequestBody() throws IOException { + String req = "{\"_version\":2}"; + RestRequest restRequest = buildRestRequest(req); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( + restRequest + ); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + + restRequest = buildRestRequestWithAwarenessAttribute(req); + clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest(restRequest); + assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + } + + public void testDeleteRequest_BadRequest() throws IOException { + String req = "{\"_ver\":2}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } + + private RestRequest buildRestRequestWithAwarenessAttribute(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/weights") + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } +}