From a9709d2ac9fae800fa4125afa73bd51682cf3845 Mon Sep 17 00:00:00 2001 From: Shivansh Arora <31575408+shiv0408@users.noreply.github.com> Date: Mon, 29 Apr 2024 13:03:48 +0530 Subject: [PATCH] [Backport 2.x] Async Batch shards changes for GatewayAllocator (#13431) * Async Batch shards changes for GatewayAllocator (#8746) Changes for create/update/delete batches for batch mode for async fetch for both primary & replica. It also added the node scope setting to enable/ disable batch mode. Signed-off-by: Gaurav Chandani Signed-off-by: Shivansh Arora Signed-off-by: Aman Khare (cherry picked from commit a69cd0864a673579a9cb2190da765bc1ebfd0a22) --- CHANGELOG.md | 1 + .../gateway/RecoveryFromGatewayIT.java | 331 ++++++++ .../org/opensearch/cluster/ClusterModule.java | 14 +- .../cluster/routing/RoutingNode.java | 4 + .../routing/allocation/AllocationService.java | 41 + .../allocation/ExistingShardsAllocator.java | 40 +- .../common/settings/ClusterSettings.java | 4 + .../gateway/BaseGatewayShardAllocator.java | 2 + .../gateway/ReplicaShardBatchAllocator.java | 2 +- .../gateway/ShardsBatchGatewayAllocator.java | 721 ++++++++++++++++++ ...sportNodesListShardStoreMetadataBatch.java | 19 +- .../main/java/org/opensearch/node/Node.java | 10 +- .../cluster/ClusterModuleTests.java | 11 +- .../gateway/GatewayAllocatorTests.java | 360 +++++++++ .../gateway/ShardBatchCacheTests.java | 13 +- .../test/gateway/TestGatewayAllocator.java | 1 + .../TestShardBatchGatewayAllocator.java | 144 ++++ 17 files changed, 1698 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java create mode 100644 server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java create mode 100644 test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java diff --git a/CHANGELOG.md b/CHANGELOG.md index efac82b24aa83..be772d5720a66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) - [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) +- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index ba03532a9aa2f..bc0557ddc2afa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -32,24 +32,31 @@ package org.opensearch.gateway; +import org.apache.lucene.index.CorruptIndexException; import org.opensearch.Version; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.ElectionSchedulerFactory; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentFactory; @@ -62,6 +69,7 @@ import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.Engine; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoveryState; @@ -71,6 +79,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.InternalTestCluster.RestartCallback; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -94,6 +103,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; +import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN; +import static org.opensearch.cluster.health.ClusterHealthStatus.RED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -750,6 +761,276 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { ensureGreen("test"); } + public void testBatchModeEnabled() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes(2); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + ensureGreen("test"); + Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); + Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); + ensureRed("test"); + ensureStableCluster(1); + + logger.info("--> Now do a protective reroute"); + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + + // Now start both data nodes and ensure batch mode is working + logger.info("--> restarting the stopped nodes"); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); + ensureStableCluster(3); + ensureGreen("test"); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); + } + + public void testBatchModeDisabled() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes(2); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + + ensureGreen("test"); + Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); + Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); + ensureStableCluster(1); + + logger.info("--> Now do a protective reroute"); + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + ensureRed("test"); + + assertFalse(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + + // assert no batches created + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + + logger.info("--> restarting the stopped nodes"); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); + ensureStableCluster(3); + ensureGreen("test"); + } + + public void testNBatchesCreationAndAssignment() throws Exception { + // we will reduce batch size to 5 to make sure we have enough batches to test assignment + // Total number of primary shards = 50 (50 indices*1) + // Total number of replica shards = 50 (50 indices*1) + // Total batches creation for primaries and replicas will be 10 each + + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes(2); + createNIndices(50, "test"); + ensureStableCluster(3); + IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get(); + assertThat(indicesStats.getSuccessfulShards(), equalTo(100)); + ClusterHealthResponse health = client().admin() + .cluster() + .health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m")) + .actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(GREEN, health.getStatus()); + + String clusterManagerName = internalCluster().getClusterManagerName(); + Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName); + Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); + Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); + + internalCluster().stopCurrentClusterManagerNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); + + // Now start cluster manager node and post that verify batches created + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder() + .put("node.name", clusterManagerName) + .put(clusterManagerDataPathSettings) + .put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5) + .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true) + .build() + ); + ensureStableCluster(1); + + logger.info("--> Now do a protective reroute"); // to avoid any race condition in test + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches()); + health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(RED, health.getStatus()); + assertEquals(100, health.getUnassignedShards()); + assertEquals(0, health.getInitializingShards()); + assertEquals(0, health.getActiveShards()); + assertEquals(0, health.getRelocatingShards()); + assertEquals(0, health.getNumberOfDataNodes()); + + // Now start both data nodes and ensure batch mode is working + logger.info("--> restarting the stopped nodes"); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); + ensureStableCluster(3); + + // wait for cluster to turn green + health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(GREEN, health.getStatus()); + assertEquals(0, health.getUnassignedShards()); + assertEquals(0, health.getInitializingShards()); + assertEquals(100, health.getActiveShards()); + assertEquals(0, health.getRelocatingShards()); + assertEquals(2, health.getNumberOfDataNodes()); + assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches()); + } + + public void testCulpritShardInBatch() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes(3); + createNIndices(4, "test"); + ensureStableCluster(4); + ClusterHealthResponse health = client().admin() + .cluster() + .health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")) + .actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(GREEN, health.getStatus()); + assertEquals(8, health.getActiveShards()); + + String culpritShardIndexName = "test0"; + Index idx = resolveIndex(culpritShardIndexName); + for (String node : internalCluster().nodesInclude(culpritShardIndexName)) { + IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + IndexService indexShards = indexServices.indexServiceSafe(idx); + Integer shardId = 0; + IndexShard shard = indexShards.getShard(0); + logger.debug("--> failing shard [{}] on node [{}]", shardId, node); + shard.failShard("test", new CorruptIndexException("test corrupted", "")); + logger.debug("--> failed shard [{}] on node [{}]", shardId, node); + } + + String clusterManagerName = internalCluster().getClusterManagerName(); + Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName); + Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0)); + Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1)); + Settings node2DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(2)); + + internalCluster().stopCurrentClusterManagerNode(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(2))); + + // Now start cluster manager node and post that verify batches created + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder() + .put("node.name", clusterManagerName) + .put(clusterManagerDataPathSettings) + .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true) + .build() + ); + ensureStableCluster(1); + + logger.info("--> Now do a protective reroute"); // to avoid any race condition in test + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches()); + assertTrue(clusterRerouteResponse.isAcknowledged()); + health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet(); + assertFalse(health.isTimedOut()); + assertEquals(RED, health.getStatus()); + assertEquals(8, health.getUnassignedShards()); + assertEquals(0, health.getInitializingShards()); + assertEquals(0, health.getActiveShards()); + assertEquals(0, health.getRelocatingShards()); + assertEquals(0, health.getNumberOfDataNodes()); + + logger.info("--> restarting the stopped nodes"); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build()); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build()); + internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(2)).put(node2DataPathSettings).build()); + ensureStableCluster(4); + + health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m")).actionGet(); + + assertEquals(RED, health.getStatus()); + assertTrue(health.isTimedOut()); + assertEquals(0, health.getNumberOfPendingTasks()); + assertEquals(0, health.getNumberOfInFlightFetch()); + assertEquals(6, health.getActiveShards()); + assertEquals(2, health.getUnassignedShards()); + assertEquals(0, health.getInitializingShards()); + assertEquals(0, health.getRelocatingShards()); + assertEquals(3, health.getNumberOfDataNodes()); + } + + private void createNIndices(int n, String prefix) { + + for (int i = 0; i < n; i++) { + createIndex( + prefix + i, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + // index doc2 + client().prepareIndex(prefix + i).setId("1").setSource("foo", "bar").get(); + + // index doc 2 + client().prepareIndex(prefix + i).setId("2").setSource("foo2", "bar2").get(); + ensureGreen(prefix + i); + } + } + public void testSingleShardFetchUsingBatchAction() { String indexName = "test"; int numOfShards = 1; @@ -909,6 +1190,56 @@ public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2); } + public void testDeleteRedIndexInBatchMode() throws Exception { + internalCluster().startClusterManagerOnlyNodes( + 1, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + List dataOnlyNodes = internalCluster().startDataOnlyNodes( + 2, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build() + ); + createIndex( + "test", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "test1", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "test2", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + createIndex( + "testg", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + + ensureGreen("test", "test1", "test2", "testg"); + internalCluster().stopRandomDataNode(); + ensureStableCluster(2); + + ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance( + ShardsBatchGatewayAllocator.class, + internalCluster().getClusterManagerName() + ); + ensureRed("test", "test1", "test2"); + + assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings())); + + logger.info("--> Now do a reroute so batches are created"); // to avoid any race condition in test + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertTrue(clusterRerouteResponse.isAcknowledged()); + + AcknowledgedResponse deleteIndexResponse = client().admin().indices().prepareDelete("test").get(); + assertTrue(deleteIndexResponse.isAcknowledged()); + + ensureYellow("testg"); + IndicesExistsResponse indexExistResponse = client().admin().indices().prepareExists("test").get(); + assertFalse(indexExistResponse.isExists()); + } + private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) { for (String index : indices) { createIndex( diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index e5aa51bbc0cf7..e3c4bc80eb4a7 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -92,6 +92,7 @@ import org.opensearch.core.common.io.stream.Writeable.Reader; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.ingest.IngestMetadata; import org.opensearch.persistent.PersistentTasksCustomMetadata; import org.opensearch.persistent.PersistentTasksNodeService; @@ -152,7 +153,13 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this.allocationService = new AllocationService( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + settings + ); } public static List getNamedWriteables() { @@ -420,6 +427,7 @@ public AllocationService getAllocationService() { @Override protected void configure() { bind(GatewayAllocator.class).asEagerSingleton(); + bind(ShardsBatchGatewayAllocator.class).asEagerSingleton(); bind(AllocationService.class).toInstance(allocationService); bind(ClusterService.class).toInstance(clusterService); bind(NodeConnectionsService.class).asEagerSingleton(); @@ -439,10 +447,10 @@ protected void configure() { bind(ShardsAllocator.class).toInstance(shardsAllocator); } - public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) { + public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) { final Map existingShardsAllocators = new HashMap<>(); existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator); - + existingShardsAllocators.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, shardsBatchGatewayAllocator); for (ClusterPlugin clusterPlugin : clusterPlugins) { for (Map.Entry existingShardsAllocatorEntry : clusterPlugin.getExistingShardsAllocators() .entrySet()) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 15ec41d5c3fbb..24c3077960444 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -204,6 +204,10 @@ public int size() { return shards.size(); } + public Collection getInitializingShards() { + return initializingShards; + } + /** * Add a new shard to this node * @param shard Shard to create on this Node diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 8779828aea425..ae496e27ebf35 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.RestoreInProgress; @@ -54,8 +55,10 @@ import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.snapshots.SnapshotsInfoService; import java.util.ArrayList; @@ -73,6 +76,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE; /** * This service manages the node allocation of a cluster. For this reason the @@ -87,6 +91,7 @@ public class AllocationService { private static final Logger logger = LogManager.getLogger(AllocationService.class); private final AllocationDeciders allocationDeciders; + private Settings settings; private Map existingShardsAllocators; private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; @@ -109,11 +114,23 @@ public AllocationService( ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService + ) { + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY); + } + + public AllocationService( + AllocationDeciders allocationDeciders, + ShardsAllocator shardsAllocator, + ClusterInfoService clusterInfoService, + SnapshotsInfoService snapshotsInfoService, + Settings settings + ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; + this.settings = settings; } /** @@ -548,6 +565,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { existingShardsAllocator.beforeAllocation(allocation); } + /* + Use batch mode if enabled and there is no custom allocator set for Allocation service + */ + Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings); + if (batchModeEnabled + && allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_2_14_0) + && existingShardsAllocators.size() == 2) { + /* + If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator + Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards + */ + allocateAllUnassignedShards(allocation); + return; + } + logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set"); + final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); while (primaryIterator.hasNext()) { final ShardRouting shardRouting = primaryIterator.next(); @@ -569,6 +602,14 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } + private void allocateAllUnassignedShards(RoutingAllocation allocation) { + ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); + allocator.allocateAllUnassignedShards(allocation, true); + allocator.afterPrimariesBeforeReplicas(allocation); + // Replicas Assignment + allocator.allocateAllUnassignedShards(allocation, false); + } + private void disassociateDeadNodes(RoutingAllocation allocation) { for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext();) { RoutingNode node = it.next(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index f1889cdf780d4..fb2a37237f8b6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -39,12 +39,13 @@ import org.opensearch.common.Nullable; import org.opensearch.common.settings.Setting; import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import java.util.List; /** * Searches for, and allocates, shards for which there is an existing on-disk copy somewhere in the cluster. The default implementation is - * {@link GatewayAllocator}, but plugins can supply their own implementations too. + * {@link GatewayAllocator} and {@link ShardsBatchGatewayAllocator}, but plugins can supply their own implementations too. * * @opensearch.internal */ @@ -60,6 +61,26 @@ public interface ExistingShardsAllocator { Setting.Property.PrivateIndex ); + /** + * Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk. + * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate + * in one or more go. + * + * Enable this setting if your ExistingShardAllocator is implementing the + * {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method. + * The default implementation of this method is not optimized and assigns shards one by one. + * + * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, + * {@link ShardsBatchGatewayAllocator}. + * + * This setting is experimental at this point. + */ + Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( + "cluster.allocator.existing_shards_allocator.batch_enabled", + false, + Setting.Property.NodeScope + ); + /** * Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate. */ @@ -80,6 +101,23 @@ void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ); + /** + * Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible. + * Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard + * and is kept here for backward compatibility. + * + * Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator} + */ + default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting shardRouting = iterator.next(); + if (shardRouting.primary() == primary) { + allocateUnassigned(shardRouting, allocation, iterator); + } + } + } + /** * Returns an explanation for a single unassigned shard. */ diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 4753c18648bf9..2140fc59f35cd 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -65,6 +65,7 @@ import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; @@ -103,6 +104,7 @@ import org.opensearch.gateway.DanglingIndicesState; import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; @@ -269,6 +271,7 @@ public void apply(Settings value, Settings current, Settings previous) { DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, + ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE, FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, @@ -331,6 +334,7 @@ public void apply(Settings value, Settings current, Settings previous) { GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING, GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, + ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index e0831293fc7e1..eed5de65258fc 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -135,6 +135,8 @@ private void executeDecision( } } + public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {} + protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 3459f1591b633..be7867b7823f6 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -56,7 +56,7 @@ public void processExistingRecoveries(RoutingAllocation allocation, List GATEWAY_ALLOCATOR_BATCH_SIZE = Setting.longSetting( + "cluster.allocator.gateway.batch_size", + DEFAULT_SHARD_BATCH_SIZE, + 1, + 10000, + Setting.Property.NodeScope + ); + + private final RerouteService rerouteService; + private final PrimaryShardBatchAllocator primaryShardBatchAllocator; + private final ReplicaShardBatchAllocator replicaShardBatchAllocator; + private Set lastSeenEphemeralIds = Collections.emptySet(); + + // visible for testing + protected final ConcurrentMap batchIdToStartedShardBatch = ConcurrentCollections.newConcurrentMap(); + + // visible for testing + protected final ConcurrentMap batchIdToStoreShardBatch = ConcurrentCollections.newConcurrentMap(); + private final TransportNodesListGatewayStartedShardsBatch batchStartedAction; + private final TransportNodesListShardStoreMetadataBatch batchStoreAction; + + @Inject + public ShardsBatchGatewayAllocator( + RerouteService rerouteService, + TransportNodesListGatewayStartedShardsBatch batchStartedAction, + TransportNodesListShardStoreMetadataBatch batchStoreAction, + Settings settings + ) { + this.rerouteService = rerouteService; + this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator(); + this.replicaShardBatchAllocator = new InternalReplicaBatchShardAllocator(); + this.batchStartedAction = batchStartedAction; + this.batchStoreAction = batchStoreAction; + this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings); + } + + @Override + public void cleanCaches() { + Stream.of(batchIdToStartedShardBatch, batchIdToStoreShardBatch).forEach(b -> { + Releasables.close(b.values().stream().map(shardsBatch -> shardsBatch.asyncBatch).collect(Collectors.toList())); + b.clear(); + }); + } + + // for tests + protected ShardsBatchGatewayAllocator() { + this.rerouteService = null; + this.batchStartedAction = null; + this.primaryShardBatchAllocator = null; + this.batchStoreAction = null; + this.replicaShardBatchAllocator = null; + this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE; + } + + // for tests + + @Override + public int getNumberOfInFlightFetches() { + int count = 0; + for (ShardsBatch batch : batchIdToStartedShardBatch.values()) { + count += (batch.getNumberOfInFlightFetches() * batch.getBatchedShards().size()); + } + for (ShardsBatch batch : batchIdToStoreShardBatch.values()) { + count += (batch.getNumberOfInFlightFetches() * batch.getBatchedShards().size()); + } + + return count; + } + + @Override + public void applyStartedShards(final List startedShards, final RoutingAllocation allocation) { + for (ShardRouting startedShard : startedShards) { + safelyRemoveShardFromBothBatch(startedShard); + } + } + + @Override + public void applyFailedShards(final List failedShards, final RoutingAllocation allocation) { + for (FailedShard failedShard : failedShards) { + safelyRemoveShardFromBothBatch(failedShard.getRoutingEntry()); + } + } + + @Override + public void beforeAllocation(final RoutingAllocation allocation) { + assert primaryShardBatchAllocator != null; + assert replicaShardBatchAllocator != null; + ensureAsyncFetchStorePrimaryRecency(allocation); + } + + @Override + public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) { + assert replicaShardBatchAllocator != null; + List> storedShardBatches = batchIdToStoreShardBatch.values() + .stream() + .map(ShardsBatch::getBatchedShardRoutings) + .collect(Collectors.toList()); + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardBatchAllocator.processExistingRecoveries(allocation, storedShardBatches); + } + } + + @Override + public void allocateUnassigned( + ShardRouting shardRouting, + RoutingAllocation allocation, + UnassignedAllocationHandler unassignedAllocationHandler + ) { + throw new UnsupportedOperationException("ShardsBatchGatewayAllocator does not support allocating unassigned shards"); + } + + @Override + public void allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) { + + assert primaryShardBatchAllocator != null; + assert replicaShardBatchAllocator != null; + innerAllocateUnassignedBatch(allocation, primaryShardBatchAllocator, replicaShardBatchAllocator, primary); + } + + protected void innerAllocateUnassignedBatch( + RoutingAllocation allocation, + PrimaryShardBatchAllocator primaryBatchShardAllocator, + ReplicaShardBatchAllocator replicaBatchShardAllocator, + boolean primary + ) { + // create batches for unassigned shards + Set batchesToAssign = createAndUpdateBatches(allocation, primary); + if (batchesToAssign.isEmpty()) { + return; + } + if (primary) { + batchIdToStartedShardBatch.values() + .stream() + .filter(batch -> batchesToAssign.contains(batch.batchId)) + .forEach( + shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation) + ); + } else { + batchIdToStoreShardBatch.values() + .stream() + .filter(batch -> batchesToAssign.contains(batch.batchId)) + .forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation)); + } + } + + // visible for testing + protected Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) { + Set batchesToBeAssigned = new HashSet<>(); + RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); + ConcurrentMap currentBatches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + // get all batched shards + Map currentBatchedShards = new HashMap<>(); + for (Map.Entry batchEntry : currentBatches.entrySet()) { + batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey())); + } + + Set newShardsToBatch = Sets.newHashSet(); + Set batchedShardsToAssign = Sets.newHashSet(); + // add all unassigned shards to the batch if they are not already in a batch + unassigned.forEach(shardRouting -> { + if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { + assert shardRouting.unassigned(); + newShardsToBatch.add(shardRouting); + } + // if shard is already batched update to latest shardRouting information in the batches + // Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also + // store other information like relocating node, targetRelocatingShard etc. And it can be updated after + // batches are created. If we don't update the ShardRouting object, stale data would be passed from the + // batch. This stale data can end up creating a same decision which has already been taken, and we'll see + // failure in executeDecision of BaseGatewayShardAllocator. Previous non-batch mode flow also used to + // pass ShardRouting object directly from unassignedIterator, so we're following the same behaviour. + else if (shardRouting.primary() == primary) { + String batchId = currentBatchedShards.get(shardRouting.shardId()); + batchesToBeAssigned.add(batchId); + currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting); + batchedShardsToAssign.add(shardRouting.shardId()); + } + }); + + allocation.routingNodes().forEach(routingNode -> routingNode.getInitializingShards().forEach(shardRouting -> { + if (currentBatchedShards.containsKey(shardRouting.shardId()) && shardRouting.primary() == primary) { + batchedShardsToAssign.add(shardRouting.shardId()); + // Set updated shard routing in batch if it already exists + String batchId = currentBatchedShards.get(shardRouting.shardId()); + currentBatches.get(batchId).batchInfo.get(shardRouting.shardId()).setShardRouting(shardRouting); + } + })); + + refreshShardBatches(currentBatches, batchedShardsToAssign, primary); + + Iterator iterator = newShardsToBatch.iterator(); + assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; + + long batchSize = maxBatchSize; + Map perBatchShards = new HashMap<>(); + while (iterator.hasNext()) { + ShardRouting currentShard = iterator.next(); + ShardEntry shardEntry = new ShardEntry( + new ShardAttributes( + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()) + ), + currentShard + ); + perBatchShards.put(currentShard.shardId(), shardEntry); + batchSize--; + iterator.remove(); + // add to batch if batch size full or last shard in unassigned list + if (batchSize == 0 || iterator.hasNext() == false) { + String batchUUId = UUIDs.base64UUID(); + ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary); + // add the batch to list of current batches + addBatch(shardsBatch, primary); + batchesToBeAssigned.add(batchUUId); + perBatchShards.clear(); + batchSize = maxBatchSize; + } + } + return batchesToBeAssigned; + } + + private void refreshShardBatches( + ConcurrentMap currentBatches, + Set batchedShardsToAssign, + boolean primary + ) { + // cleanup shard from batches if they are not present in unassigned list from allocation object. This is + // needed as AllocationService.reroute can also be called directly by API flows for example DeleteIndices. + // So, as part of calling reroute, those shards will be removed from allocation object. It'll handle the + // scenarios where shards can be removed from unassigned list without "start" or "failed" event. + for (Map.Entry batchEntry : currentBatches.entrySet()) { + Iterator shardIdIterator = batchEntry.getValue().getBatchedShards().iterator(); + while (shardIdIterator.hasNext()) { + ShardId shardId = shardIdIterator.next(); + if (batchedShardsToAssign.contains(shardId) == false) { + shardIdIterator.remove(); + batchEntry.getValue().clearShardFromCache(shardId); + } + } + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + deleteBatchIfEmpty(batches, batchEntry.getValue().getBatchId()); + } + } + + private void addBatch(ShardsBatch shardsBatch, boolean primary) { + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + if (batches.containsKey(shardsBatch.getBatchId())) { + throw new IllegalStateException("Batch already exists. BatchId = " + shardsBatch.getBatchId()); + } + batches.put(shardsBatch.getBatchId(), shardsBatch); + } + + /** + * Safely remove a shard from the appropriate batch depending on if it is primary or replica + * If the shard is not in a batch, this is a no-op. + * Cleans the batch if it is empty after removing the shard. + * This method should be called when removing the shard from the batch instead {@link ShardsBatch#removeFromBatch(ShardRouting)} + * so that we can clean up the batch if it is empty and release the fetching resources + * + * @param shardRouting shard to be removed + * @param primary from which batch shard needs to be removed + */ + protected void safelyRemoveShardFromBatch(ShardRouting shardRouting, boolean primary) { + String batchId = primary ? getBatchId(shardRouting, true) : getBatchId(shardRouting, false); + if (batchId == null) { + logger.debug("Shard[{}] is not batched", shardRouting); + return; + } + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + ShardsBatch batch = batches.get(batchId); + batch.removeFromBatch(shardRouting); + deleteBatchIfEmpty(batches, batchId); + } + + /** + * Safely remove shard from both the batches irrespective of its primary or replica, + * For the corresponding shardId. The method intends to clean up the batch if it is empty + * after removing the shard + * @param shardRouting shard to remove + */ + protected void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) { + safelyRemoveShardFromBatch(shardRouting, true); + safelyRemoveShardFromBatch(shardRouting, false); + } + + private void deleteBatchIfEmpty(ConcurrentMap batches, String batchId) { + if (batches.containsKey(batchId)) { + ShardsBatch batch = batches.get(batchId); + if (batch.getBatchedShards().isEmpty()) { + Releasables.close(batch.getAsyncFetcher()); + batches.remove(batchId); + } + } + } + + protected String getBatchId(ShardRouting shardRouting, boolean primary) { + ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + + return batches.entrySet() + .stream() + .filter(entry -> entry.getValue().getBatchedShards().contains(shardRouting.shardId())) + .findFirst() + .map(Map.Entry::getKey) + .orElse(null); + } + + @Override + public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { + assert unassignedShard.unassigned(); + assert routingAllocation.debugDecision(); + if (getBatchId(unassignedShard, unassignedShard.primary()) == null) { + createAndUpdateBatches(routingAllocation, unassignedShard.primary()); + } + assert getBatchId(unassignedShard, unassignedShard.primary()) != null; + if (unassignedShard.primary()) { + assert primaryShardBatchAllocator != null; + return primaryShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); + } else { + assert replicaShardBatchAllocator != null; + return replicaShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); + } + } + + /** + * Clear the fetched data for the primary to ensure we do not cancel recoveries based on excessively stale data. + */ + private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) { + DiscoveryNodes nodes = allocation.nodes(); + if (hasNewNodes(nodes)) { + final Set newEphemeralIds = StreamSupport.stream(Spliterators.spliterator(nodes.getDataNodes().entrySet(), 0), false) + .map(node -> node.getValue().getEphemeralId()) + .collect(Collectors.toSet()); + // Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node + // drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other + // ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but + // making the wrong decision here is not catastrophic so we only need to cover the common case. + + logger.trace( + () -> new ParameterizedMessage( + "new nodes {} found, clearing primary async-fetch-store cache", + Sets.difference(newEphemeralIds, lastSeenEphemeralIds) + ) + ); + batchIdToStoreShardBatch.values().forEach(batch -> clearCacheForBatchPrimary(batch, allocation)); + + // recalc to also (lazily) clear out old nodes. + this.lastSeenEphemeralIds = newEphemeralIds; + } + } + + private static void clearCacheForBatchPrimary(ShardsBatch batch, RoutingAllocation allocation) { + // We need to clear the cache for the primary shard to ensure we do not cancel recoveries based on excessively + // stale data. We do this by clearing the cache of nodes for all the active primaries of replicas in the current batch. + // Although this flow can be optimized by only clearing the cache for the primary shard but currently + // when we want to fetch data we do for complete node, for doing this a new fetch flow will also handle just + // fetching the data for a single shard on the node and fill that up in our cache + // Opened issue #13352 - to track the improvement + List primaries = batch.getBatchedShards() + .stream() + .map(allocation.routingNodes()::activePrimary) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + AsyncShardBatchFetch fetch = batch.getAsyncFetcher(); + primaries.forEach(shardRouting -> fetch.clearCacheForNode(shardRouting.currentNodeId())); + } + + private boolean hasNewNodes(DiscoveryNodes nodes) { + for (final DiscoveryNode node : nodes.getDataNodes().values()) { + if (lastSeenEphemeralIds.contains(node.getEphemeralId()) == false) { + return true; + } + } + return false; + } + + class InternalBatchAsyncFetch extends AsyncShardBatchFetch { + InternalBatchAsyncFetch( + Logger logger, + String type, + Map map, + AsyncShardFetch.Lister, T> action, + String batchUUId, + Class clazz, + V emptyShardResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory + ) { + super(logger, type, map, action, batchUUId, clazz, emptyShardResponse, emptyShardResponsePredicate, responseFactory); + } + + @Override + protected void reroute(String reroutingKey, String reason) { + logger.trace("{} scheduling reroute for {}", reroutingKey, reason); + assert rerouteService != null; + rerouteService.reroute( + "async_shard_batch_fetch", + Priority.HIGH, + ActionListener.wrap( + r -> logger.trace("{} scheduled reroute completed for {}", reroutingKey, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", reroutingKey, reason), e) + ) + ); + } + } + + class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator { + + @Override + @SuppressWarnings("unchecked") + protected AsyncShardFetch.FetchResult fetchData( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + return (AsyncShardFetch.FetchResult< + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) fetchDataAndCleanIneligibleShards( + eligibleShards, + inEligibleShards, + allocation + ); + } + + } + + class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator { + @Override + @SuppressWarnings("unchecked") + protected AsyncShardFetch.FetchResult fetchData( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + return (AsyncShardFetch.FetchResult< + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>) fetchDataAndCleanIneligibleShards( + eligibleShards, + inEligibleShards, + allocation + ); + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + String batchId = getBatchId(shard, shard.primary()); + return batchId != null; + } + } + + AsyncShardFetch.FetchResult fetchDataAndCleanIneligibleShards( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + // get batch id for anyone given shard. We are assuming all shards will have same batchId + ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + String batchId = getBatchId(shardRouting, shardRouting.primary()); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + ConcurrentMap batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + if (batches.containsKey(batchId) == false) { + logger.debug("Batch {} has no shards batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no shards batch"); + } + + ShardsBatch shardsBatch = batches.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary())); + + if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + Map> shardToIgnoreNodes = new HashMap<>(); + for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncShardBatchFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncShardFetch.FetchResult fetchResult = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + if (fetchResult.hasData()) { + fetchResult.processAllocation(allocation); + } + + return fetchResult; + } + + /** + * Holds information about a batch of shards to be allocated. + * Async fetcher is used to fetch the data for the batch. + *

+ * Visible for testing + */ + public class ShardsBatch { + private final String batchId; + private final boolean primary; + + private final InternalBatchAsyncFetch asyncBatch; + + private final Map batchInfo; + + public ShardsBatch(String batchId, Map shardsWithInfo, boolean primary) { + this.batchId = batchId; + this.batchInfo = new HashMap<>(shardsWithInfo); + // create a ShardId -> customDataPath map for async fetch + Map shardIdsMap = batchInfo.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getShardAttributes())); + this.primary = primary; + if (this.primary) { + asyncBatch = new InternalBatchAsyncFetch<>( + logger, + "batch_shards_started", + shardIdsMap, + batchStartedAction, + batchId, + GatewayStartedShard.class, + new GatewayStartedShard(null, false, null, null), + GatewayStartedShard::isEmpty, + new ShardBatchResponseFactory<>(true) + ); + } else { + asyncBatch = new InternalBatchAsyncFetch<>( + logger, + "batch_shards_store", + shardIdsMap, + batchStoreAction, + batchId, + NodeStoreFilesMetadata.class, + new NodeStoreFilesMetadata(new StoreFilesMetadata(null, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), null), + NodeStoreFilesMetadata::isEmpty, + new ShardBatchResponseFactory<>(false) + ); + } + } + + protected void removeShard(ShardId shardId) { + this.batchInfo.remove(shardId); + } + + private TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata buildEmptyReplicaShardResponse() { + return new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + null, + Store.MetadataSnapshot.EMPTY, + Collections.emptyList() + ), + null + ); + } + + private void removeFromBatch(ShardRouting shard) { + removeShard(shard.shardId()); + clearShardFromCache(shard.shardId()); + // assert that fetcher and shards are the same as batched shards + assert batchInfo.size() == asyncBatch.shardAttributesMap.size() : "Shards size is not equal to fetcher size"; + } + + private void clearShardFromCache(ShardId shardId) { + asyncBatch.clearShard(shardId); + } + + public List getBatchedShardRoutings() { + return batchInfo.values().stream().map(ShardEntry::getShardRouting).collect(Collectors.toList()); + } + + public Set getBatchedShards() { + return batchInfo.keySet(); + } + + public String getBatchId() { + return batchId; + } + + public AsyncShardBatchFetch getAsyncFetcher() { + return asyncBatch; + } + + public int getNumberOfInFlightFetches() { + return asyncBatch.getNumberOfInFlightFetches(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof ShardsBatch == false) { + return false; + } + ShardsBatch shardsBatch = (ShardsBatch) o; + return batchId.equals(shardsBatch.getBatchId()) && batchInfo.keySet().equals(shardsBatch.getBatchedShards()); + } + + @Override + public int hashCode() { + return Objects.hash(batchId); + } + + @Override + public String toString() { + return "batchId: " + batchId; + } + + } + + /** + * Holds information about a shard to be allocated in a batch. + */ + static class ShardEntry { + + private final ShardAttributes shardAttributes; + + private ShardRouting shardRouting; + + public ShardEntry(ShardAttributes shardAttributes, ShardRouting shardRouting) { + this.shardAttributes = shardAttributes; + this.shardRouting = shardRouting; + } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public ShardAttributes getShardAttributes() { + return shardAttributes; + } + + public ShardEntry setShardRouting(ShardRouting shardRouting) { + this.shardRouting = shardRouting; + return this; + } + } + + public int getNumberOfStartedShardBatches() { + return batchIdToStartedShardBatch.size(); + } + + public int getNumberOfStoreShardBatches() { + return batchIdToStoreShardBatch.size(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index 8a4dfdfcf7635..d8282182787b5 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -276,7 +276,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - boolean isEmpty(NodeStoreFilesMetadata response) { + public static boolean isEmpty(NodeStoreFilesMetadata response) { return response.storeFilesMetadata() == null || response.storeFilesMetadata().isEmpty() && response.getStoreFileFetchException() == null; } @@ -329,7 +329,13 @@ public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { super(in); - this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); + this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, i -> { + if (i.readBoolean()) { + return new NodeStoreFilesMetadata(i); + } else { + return null; + } + }); } public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { @@ -344,7 +350,14 @@ public Map getNodeStoreFilesMetadataBatch() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> { + if (v != null) { + o.writeBoolean(true); + v.writeTo(o); + } else { + o.writeBoolean(false); + } + }); } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a8bc25b8c3935..696496abf255e 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -135,6 +135,7 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; @@ -1328,9 +1329,12 @@ protected Node( // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there. // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation - // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a - // reroute, which needs to call into the allocation service. We close the loop here: - clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class)); + // service needs access to the existing shards allocators (e.g. the GatewayAllocator, ShardsBatchGatewayAllocator) which + // need to be able to trigger a reroute, which needs to call into the allocation service. We close the loop here: + clusterModule.setExistingShardsAllocators( + injector.getInstance(GatewayAllocator.class), + injector.getInstance(ShardsBatchGatewayAllocator.class) + ); List pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index b30ebaf183084..557e4dc2ca8c5 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -73,6 +73,7 @@ import org.opensearch.gateway.GatewayAllocator; import org.opensearch.plugins.ClusterPlugin; import org.opensearch.test.gateway.TestGatewayAllocator; +import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; import java.util.Arrays; import java.util.Collection; @@ -296,7 +297,10 @@ public void testRejectsReservedExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); + expectThrows( + IllegalArgumentException.class, + () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), new TestShardBatchGatewayAllocator()) + ); } public void testRejectsDuplicateExistingShardsAllocatorName() { @@ -308,7 +312,10 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { null, threadContext ); - expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); + expectThrows( + IllegalArgumentException.class, + () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator(), new TestShardBatchGatewayAllocator()) + ); } private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) { diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java new file mode 100644 index 0000000000000..bb59a5792ec8c --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -0,0 +1,360 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.snapshots.SnapshotShardSizeInfo; +import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class GatewayAllocatorTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class); + TestShardBatchGatewayAllocator testShardsBatchGatewayAllocator = null; + ClusterState clusterState = null; + RoutingAllocation testAllocation = null; + String indexPrefix = "TEST"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(); + } + + public void testSingleBatchCreation() { + createIndexAndUpdateClusterState(1, 3, 1); + createBatchesAndAssert(1); + } + + public void testTwoBatchCreation() { + createIndexAndUpdateClusterState(2, 1020, 1); + createBatchesAndAssert(2); + + List listOfBatches = new ArrayList<>( + testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().values() + ); + assertNotEquals(listOfBatches.get(0), listOfBatches.get(1)); + + // test for replicas + listOfBatches = new ArrayList<>(testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().values()); + assertNotEquals(listOfBatches.get(0), listOfBatches.get(1)); + } + + public void testNonDuplicationOfBatch() { + createIndexAndUpdateClusterState(1, 3, 1); + Tuple, Set> batches = createBatchesAndAssert(1); + assertEquals(1, batches.v1().size()); + assertEquals(1, batches.v2().size()); + + // again try to create batch and verify no new batch is created since shard is already batched and no new unassigned shard + assertEquals(batches.v1(), testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, true)); + assertEquals(batches.v2(), testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, false)); + } + + public void testCorrectnessOfBatch() { + createIndexAndUpdateClusterState(2, 1020, 1); + createBatchesAndAssert(2); + Set shardsSet1 = clusterState.routingTable() + .index(indexPrefix + 0) + .getShards() + .values() + .stream() + .map(IndexShardRoutingTable::getShardId) + .collect(Collectors.toSet()); + Set shardsSet2 = clusterState.routingTable() + .index(indexPrefix + 1) + .getShards() + .values() + .stream() + .map(IndexShardRoutingTable::getShardId) + .collect(Collectors.toSet()); + shardsSet1.addAll(shardsSet2); + + Set shardsInAllbatches = testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .values() + .stream() + .map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShards) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + assertEquals(shardsInAllbatches, shardsSet1); + shardsInAllbatches = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .values() + .stream() + .map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShards) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + assertEquals(shardsInAllbatches, shardsSet1); + + Set primariesInAllBatches = testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .values() + .stream() + .map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShardRoutings) + .flatMap(List::stream) + .collect(Collectors.toSet()); + primariesInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary() == true)); + + Set replicasInAllBatches = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .values() + .stream() + .map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShardRoutings) + .flatMap(List::stream) + .collect(Collectors.toSet()); + + replicasInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary() == false)); + } + + public void testAsyncFetcherCreationInBatch() { + createIndexAndUpdateClusterState(1, 3, 1); + Tuple, Set> batchesTuple = createBatchesAndAssert(1); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + ShardsBatchGatewayAllocator.ShardsBatch shardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .get(primaryBatches.iterator().next()); + AsyncShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + // assert asyncFetcher is not null + assertNotNull(asyncFetcher); + shardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().get(replicaBatches.iterator().next()); + asyncFetcher = shardsBatch.getAsyncFetcher(); + assertNotNull(asyncFetcher); + } + + public void testSafelyRemoveShardFromBatch() { + createIndexAndUpdateClusterState(2, 1023, 1); + + Tuple, Set> batchesTuple = createBatchesAndAssert(2); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + ShardsBatchGatewayAllocator.ShardsBatch primaryShardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .get(primaryBatches.iterator().next()); + ShardRouting primaryShardRouting = primaryShardsBatch.getBatchedShardRoutings().iterator().next(); + assertEquals(2, replicaBatches.size()); + ShardsBatchGatewayAllocator.ShardsBatch replicaShardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .get(replicaBatches.iterator().next()); + ShardRouting replicaShardRouting = replicaShardsBatch.getBatchedShardRoutings().iterator().next(); + + // delete 1 shard routing from each batch + testShardsBatchGatewayAllocator.safelyRemoveShardFromBatch(primaryShardRouting); + + testShardsBatchGatewayAllocator.safelyRemoveShardFromBatch(replicaShardRouting); + // verify that shard routing is removed from both batches + assertFalse(primaryShardsBatch.getBatchedShards().contains(primaryShardRouting.shardId())); + assertFalse(replicaShardsBatch.getBatchedShards().contains(replicaShardRouting.shardId())); + + // try to remove that shard again to see if its no op and doent result in exception + testShardsBatchGatewayAllocator.safelyRemoveShardFromBatch(primaryShardRouting); + testShardsBatchGatewayAllocator.safelyRemoveShardFromBatch(replicaShardRouting); + + // now remove all shard routings to verify that batch only gets deleted + primaryShardsBatch.getBatchedShardRoutings().forEach(testShardsBatchGatewayAllocator::safelyRemoveShardFromBatch); + replicaShardsBatch.getBatchedShardRoutings().forEach(testShardsBatchGatewayAllocator::safelyRemoveShardFromBatch); + + assertFalse(testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().containsKey(primaryShardsBatch.getBatchId())); + assertFalse(testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().containsKey(replicaShardsBatch.getBatchId())); + assertEquals(1, testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(1, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size()); + } + + public void testSafelyRemoveShardFromBothBatch() { + createIndexAndUpdateClusterState(1, 3, 1); + createBatchesAndAssert(1); + ShardsBatchGatewayAllocator.ShardsBatch primaryShardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .values() + .iterator() + .next(); + ShardsBatchGatewayAllocator.ShardsBatch replicaShardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .values() + .iterator() + .next(); + + ShardRouting anyPrimary = primaryShardsBatch.getBatchedShardRoutings().iterator().next(); + // remove first shard routing from both batches + testShardsBatchGatewayAllocator.safelyRemoveShardFromBothBatch(anyPrimary); + + // verify that shard routing is removed from both batches + assertFalse(primaryShardsBatch.getBatchedShards().contains(anyPrimary.shardId())); + assertFalse(replicaShardsBatch.getBatchedShards().contains(anyPrimary.shardId())); + + // try to remove that shard again to see if its no op and doesnt result in exception + testShardsBatchGatewayAllocator.safelyRemoveShardFromBothBatch(anyPrimary); + + // now remove all shard routings to verify that batch gets deleted + primaryShardsBatch.getBatchedShardRoutings().forEach(testShardsBatchGatewayAllocator::safelyRemoveShardFromBothBatch); + replicaShardsBatch.getBatchedShardRoutings().forEach(testShardsBatchGatewayAllocator::safelyRemoveShardFromBothBatch); + + assertFalse(testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().containsKey(primaryShardsBatch.getBatchId())); + assertFalse(testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().containsKey(replicaShardsBatch.getBatchId())); + assertEquals(0, testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(0, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size()); + } + + public void testGetBatchIdExisting() { + createIndexAndUpdateClusterState(2, 1020, 1); + // get all shardsRoutings for test index + List allShardRoutings1 = clusterState.routingTable() + .index(indexPrefix + 0) + .getShards() + .values() + .stream() + .map(IndexShardRoutingTable::getShards) + .flatMap(List::stream) + .collect(Collectors.toList()); + List allShardRouting2 = clusterState.routingTable() + .index(indexPrefix + 1) + .getShards() + .values() + .stream() + .map(IndexShardRoutingTable::getShards) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Tuple, Set> batchesTuple = createBatchesAndAssert(2); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + // create a map of shards to batch id for primaries + + Map shardIdToBatchIdForStartedShards = new HashMap<>(); + allShardRoutings1.addAll(allShardRouting2); + assertEquals(4080, allShardRoutings1.size()); + for (ShardRouting shardRouting : allShardRoutings1) { + for (String batchId : primaryBatches) { + if (shardRouting.primary() == true + && testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch() + .get(batchId) + .getBatchedShards() + .contains(shardRouting.shardId())) { + if (shardIdToBatchIdForStartedShards.containsKey(shardRouting.shardId())) { + fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); + } + assertTrue(shardRouting.primary()); + shardIdToBatchIdForStartedShards.put(shardRouting.shardId(), batchId); + } + } + } + Map shardIdToBatchIdForStoreShards = new HashMap<>(); + + for (ShardRouting shardRouting : allShardRoutings1) { + for (String batchId : replicaBatches) { + if (shardRouting.primary() == false + && testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch() + .get(batchId) + .getBatchedShards() + .contains(shardRouting.shardId())) { + if (shardIdToBatchIdForStoreShards.containsKey(shardRouting.shardId())) { + fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); + } + assertFalse(shardRouting.primary()); + shardIdToBatchIdForStoreShards.put(shardRouting.shardId(), batchId); + } + } + } + + assertEquals(4080, shardIdToBatchIdForStartedShards.size() + shardIdToBatchIdForStoreShards.size()); + // now compare the maps with getBatchId() call + for (ShardRouting shardRouting : allShardRoutings1) { + if (shardRouting.primary()) { + assertEquals( + shardIdToBatchIdForStartedShards.get(shardRouting.shardId()), + testShardsBatchGatewayAllocator.getBatchId(shardRouting, true) + ); + } else { + assertEquals( + shardIdToBatchIdForStoreShards.get(shardRouting.shardId()), + testShardsBatchGatewayAllocator.getBatchId(shardRouting, false) + ); + } + } + } + + public void testGetBatchIdNonExisting() { + createIndexAndUpdateClusterState(1, 1, 1); + List allShardRoutings = clusterState.routingTable() + .index(indexPrefix + 0) + .getShards() + .values() + .stream() + .map(IndexShardRoutingTable::getShards) + .flatMap(List::stream) + .collect(Collectors.toList()); + allShardRoutings.forEach(shard -> assertNull(testShardsBatchGatewayAllocator.getBatchId(shard, shard.primary()))); + } + + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { + if (count == 0) return; + Metadata.Builder metadata = Metadata.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for (int i = 0; i < count; i++) { + String indexName = indexPrefix + i; + metadata.put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ); + } + for (int i = 0; i < count; i++) { + String indexName = indexPrefix + i; + routingTableBuilder = routingTableBuilder.addAsNew(metadata.build().index(indexName)); + } + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata.build()) + .routingTable(routingTableBuilder.build()) + .build(); + testAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.emptyList()), + new RoutingNodes(clusterState, false), + clusterState, + ClusterInfo.EMPTY, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ); + } + + // call this after index creation and update cluster state + private Tuple, Set> createBatchesAndAssert(int expectedBatchSize) { + Set primaryBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, true); + Set replicaBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, false); + assertEquals(expectedBatchSize, primaryBatches.size()); + assertEquals(expectedBatchSize, replicaBatches.size()); + assertEquals(expectedBatchSize, testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(expectedBatchSize, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size()); + assertEquals(testShardsBatchGatewayAllocator.getBatchIdToStartedShardBatch().keySet(), primaryBatches); + assertEquals(testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().keySet(), replicaBatches); + return new Tuple<>(primaryBatches, replicaBatches); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 1b42a31a4fd84..12030ad41d508 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -29,8 +29,7 @@ public class ShardBatchCacheTests extends OpenSearchAllocationTestCase { private static final String BATCH_ID = "b1"; private final DiscoveryNode node1 = newNode("node1"); private final DiscoveryNode node2 = newNode("node2"); - // Needs to be enabled once ShardsBatchGatewayAllocator is pushed - // private final Map batchInfo = new HashMap<>(); + private final Map batchInfo = new HashMap<>(); private AsyncShardBatchFetch.ShardBatchCache shardCache; private List shardsInBatch = new ArrayList<>(); private static final int NUMBER_OF_SHARDS_DEFAULT = 10; @@ -162,7 +161,7 @@ public void testShardsDataWithException() { null ); - // assertEquals(5, batchInfo.size()); + assertEquals(10, batchInfo.size()); assertEquals(2, fetchData.size()); assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); assertTrue(fetchData.get(node2).getNodeGatewayStartedShardsBatch().isEmpty()); @@ -210,10 +209,10 @@ private void fillShards(Map shardAttributesMap, int nu for (ShardId shardId : shardsInBatch) { ShardAttributes attr = new ShardAttributes(""); shardAttributesMap.put(shardId, attr); - // batchInfo.put( - // shardId, - // new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) - // ); + batchInfo.put( + shardId, + new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) + ); } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index b1695ff00e0cc..b9f52a62f823a 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -183,4 +183,5 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); } + } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java new file mode 100644 index 0000000000000..53a4e90adb976 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.gateway; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.AsyncShardFetch; +import org.opensearch.gateway.PrimaryShardBatchAllocator; +import org.opensearch.gateway.ReplicaShardBatchAllocator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator { + + Map> knownAllocations = new HashMap<>(); + DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; + Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); + + PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() { + @Override + protected AsyncShardFetch.FetchResult fetchData( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + Map foundShards = new HashMap<>(); + HashMap> shardsToIgnoreNodes = new HashMap<>(); + for (Map.Entry> entry : knownAllocations.entrySet()) { + String nodeId = entry.getKey(); + Map shardsOnNode = entry.getValue(); + HashMap adaptedResponse = new HashMap<>(); + + for (ShardRouting shardRouting : eligibleShards) { + ShardId shardId = shardRouting.shardId(); + Set ignoreNodes = allocation.getIgnoreNodes(shardId); + + if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) { + TransportNodesGatewayStartedShardHelper.GatewayStartedShard nodeShard = + new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( + shardsOnNode.get(shardId).allocationId().getId(), + shardsOnNode.get(shardId).primary(), + getReplicationCheckpoint(shardId, nodeId) + ); + adaptedResponse.put(shardId, nodeShard); + shardsToIgnoreNodes.put(shardId, ignoreNodes); + } + foundShards.put( + currentNodes.get(nodeId), + new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch( + currentNodes.get(nodeId), + adaptedResponse + ) + ); + } + } + return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes); + } + }; + + ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { + + @Override + protected AsyncShardFetch.FetchResult fetchData( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap()); + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return true; + } + }; + + @Override + public void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { + currentNodes = allocation.nodes(); + innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); + } + + @Override + public void beforeAllocation(RoutingAllocation allocation) {} + + @Override + public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {} + + public Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) { + return super.createAndUpdateBatches(allocation, primary); + } + + public void safelyRemoveShardFromBatch(ShardRouting shard) { + super.safelyRemoveShardFromBatch(shard, shard.primary()); + } + + public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) { + super.safelyRemoveShardFromBothBatch(shardRouting); + } + + public String getBatchId(ShardRouting shard, boolean primary) { + return super.getBatchId(shard, primary); + } + + public Map getBatchIdToStartedShardBatch() { + return batchIdToStartedShardBatch; + } + + public Map getBatchIdToStoreShardBatch() { + return batchIdToStoreShardBatch; + } + + @Override + public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { + return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation); + } + + protected ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { + return shardIdNodeToReplicationCheckPointMap.getOrDefault(getReplicationCheckPointKey(shardId, nodeName), null); + } + + public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { + return shardId.toString() + "_" + nodeName; + } +}