From b5392d71b46abf4ad3f94eb8c7e237361ffca36b Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Wed, 4 Dec 2024 00:58:30 +0530 Subject: [PATCH] Support prefix list for remote repository attributes (#16271) (#16765) * Support prefix list for remote repository attributes Signed-off-by: Rajiv Kumar Vaidyanathan --- CHANGELOG.md | 1 + .../RemotePublicationConfigurationIT.java | 274 +--------------- .../RemoteRepositoryConfigurationIT.java | 308 ++++++++++++++++++ .../coordination/JoinTaskExecutor.java | 19 +- .../metadata/MetadataCreateIndexService.java | 8 +- .../cluster/node/DiscoveryNode.java | 32 +- .../InternalRemoteRoutingTableService.java | 5 +- .../remote/RemoteClusterStateService.java | 6 +- .../index/remote/RemoteIndexPathUploader.java | 20 +- .../RemoteMigrationIndexMetadataUpdater.java | 8 +- .../remotestore/RemoteStoreNodeAttribute.java | 248 +++++++++++--- .../RemoteStorePinnedTimestampService.java | 7 +- .../cluster/node/DiscoveryNodeTests.java | 2 +- .../remote/RemoteIndexPathUploaderTests.java | 14 +- .../node/RemoteStoreNodeAttributeTests.java | 54 +++ .../test/OpenSearchIntegTestCase.java | 2 +- .../test/RemoteStoreAttributeConstants.java | 19 ++ 17 files changed, 635 insertions(+), 392 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRepositoryConfigurationIT.java create mode 100644 test/framework/src/main/java/org/opensearch/test/RemoteStoreAttributeConstants.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 537c0bede6ba5..54c9988f14450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583)) - Add a flag in QueryShardContext to differentiate inner hit query ([#16600](https://github.com/opensearch-project/OpenSearch/pull/16600)) - Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489)) +- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271)) - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java index 57bf9eccbf5b4..1b5d924fa0b62 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java @@ -8,34 +8,9 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.common.settings.Settings; -import org.opensearch.plugins.Plugin; -import org.opensearch.remotemigration.MigrationBaseTestCase; -import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.fs.FsRepository; -import org.opensearch.repositories.fs.ReloadableFsRepository; -import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; -import org.junit.Assert; import org.junit.Before; -import java.util.Collection; -import java.util.Locale; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; -import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - /** * Tests the compatibility between types of nodes based on the configured repositories * Non Remote node [No Repositories configured] @@ -44,260 +19,15 @@ * Remote Node With Routing Table [Cluster State + Segment + Translog + Routing Table] */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemotePublicationConfigurationIT extends MigrationBaseTestCase { - private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; - - @Override - protected Collection> nodePlugins() { - /* Adding the following mock plugins: - - InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync - - MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated - */ - return Stream.concat( - super.nodePlugins().stream(), - Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class) - ).collect(Collectors.toList()); - } - +public class RemotePublicationConfigurationIT extends RemoteRepositoryConfigurationIT { @Before public void setUp() throws Exception { if (segmentRepoPath == null || translogRepoPath == null) { segmentRepoPath = randomRepoPath().toAbsolutePath(); translogRepoPath = randomRepoPath().toAbsolutePath(); } + super.remoteRepoPrefix = "remote_publication"; super.setUp(); } - public Settings.Builder remotePublishConfiguredNodeSetting() { - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - REPOSITORY_NAME - ); - String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey(); - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - REPOSITORY_NAME - ); - String routingTableRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - ROUTING_TABLE_REPO_NAME - ); - String routingTableRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - ROUTING_TABLE_REPO_NAME - ); - - Settings.Builder builder = Settings.builder() - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME) - .put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE) - .put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) - .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true) - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME) - .put(routingTableRepoTypeAttributeKey, FsRepository.TYPE) - .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath); - return builder; - } - - public Settings.Builder remoteWithRoutingTableNodeSetting() { - // Remote Cluster with Routing table - - return Settings.builder() - .put( - remoteStoreClusterSettings( - REPOSITORY_NAME, - segmentRepoPath, - ReloadableFsRepository.TYPE, - REPOSITORY_2_NAME, - translogRepoPath, - ReloadableFsRepository.TYPE, - REPOSITORY_NAME, - segmentRepoPath, - ReloadableFsRepository.TYPE - ) - ) - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true); - } - - public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); - - ensureStableCluster(3); - ensureGreen(); - - internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull); - } - - public void testServiceInitialized_WhenNodeAttributesPresent() { - internalCluster().startClusterManagerOnlyNode( - buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE) - ); - internalCluster().startDataOnlyNodes( - 2, - buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE) - ); - - ensureStableCluster(3); - ensureGreen(); - - internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull); - } - - public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); - - Settings.Builder build = remotePublishConfiguredNodeSetting(); - internalCluster().startClusterManagerOnlyNode(build.build()); - internalCluster().startDataOnlyNodes(2, build.build()); - - ensureStableCluster(6); - ensureGreen(); - } - - public void testRemotePublishConfigNodeJoinRemoteCluster() throws Exception { - // Remote Cluster without Routing table - setAddRemote(true); - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); - setAddRemote(false); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - Settings.Builder build = remotePublishConfiguredNodeSetting(); - internalCluster().startClusterManagerOnlyNode(build.build()); - ensureStableCluster(4); - ensureGreen(); - } - - public void testRemoteNodeWithRoutingTableJoinRemoteCluster() throws Exception { - setAddRemote(true); - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); - setAddRemote(false); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - // Remote Repo with Routing table - Settings settings = remoteWithRoutingTableNodeSetting().build(); - internalCluster().startClusterManagerOnlyNode(settings); - ensureStableCluster(4); - ensureGreen(); - } - - public void testNonRemoteNodeJoinRemoteWithRoutingCluster() throws Exception { - Settings settings = remoteWithRoutingTableNodeSetting().build(); - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - internalCluster().startClusterManagerOnlyNode(); - ensureStableCluster(4); - ensureGreen(); - } - - public void testRemotePublishConfigNodeJoinRemoteWithRoutingCluster() throws Exception { - Settings settings = remoteWithRoutingTableNodeSetting().build(); - internalCluster().startClusterManagerOnlyNode(settings); - internalCluster().startDataOnlyNodes(2, settings); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - internalCluster().startClusterManagerOnlyNode(remotePublishConfiguredNodeSetting().build()); - - ensureStableCluster(4); - ensureGreen(); - } - - public void testNonRemoteNodeJoiningPublishConfigCluster() throws Exception { - Settings.Builder build = remotePublishConfiguredNodeSetting(); - internalCluster().startClusterManagerOnlyNode(build.build()); - internalCluster().startDataOnlyNodes(2, build.build()); - - internalCluster().startClusterManagerOnlyNode(); - - ensureStableCluster(4); - ensureGreen(); - } - - public void testRemoteNodeJoiningPublishConfigCluster() throws Exception { - Settings.Builder build = remotePublishConfiguredNodeSetting(); - internalCluster().startClusterManagerOnlyNode(build.build()); - internalCluster().startDataOnlyNodes(2, build.build()); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - setAddRemote(true); - internalCluster().startClusterManagerOnlyNode(); - ensureStableCluster(4); - ensureGreen(); - } - - public void testRemoteNodeWithRoutingTableJoiningPublishConfigCluster() throws Exception { - Settings.Builder build = remotePublishConfiguredNodeSetting(); - internalCluster().startClusterManagerOnlyNode(build.build()); - internalCluster().startDataOnlyNodes(2, build.build()); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") - ); - - Settings settings = Settings.builder() - .put( - buildRemoteStoreNodeAttributes( - REPOSITORY_NAME, - segmentRepoPath, - REPOSITORY_2_NAME, - translogRepoPath, - ROUTING_TABLE_REPO_NAME, - segmentRepoPath, - false - ) - ) - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .build(); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - internalCluster().startClusterManagerOnlyNode(settings); - - ensureStableCluster(4); - ensureGreen(); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRepositoryConfigurationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRepositoryConfigurationIT.java new file mode 100644 index 0000000000000..48afa85dc5691 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRepositoryConfigurationIT.java @@ -0,0 +1,308 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotemigration.MigrationBaseTestCase; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Collection; +import java.util.Locale; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * Tests the compatibility between types of nodes based on the configured repositories + * Non Remote node [No Repositories configured] + * Remote Publish Configured Node [Cluster State + Routing Table] + * Remote Node [Cluster State + Segment + Translog] + * Remote Node With Routing Table [Cluster State + Segment + Translog + Routing Table] + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteRepositoryConfigurationIT extends MigrationBaseTestCase { + private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica"; + + protected String remoteRepoPrefix = "remote_store"; + + @Override + protected Collection> nodePlugins() { + /* Adding the following mock plugins: + - InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync + - MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated + */ + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class) + ).collect(Collectors.toList()); + } + + @Before + public void setUp() throws Exception { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + super.setUp(); + } + + public Settings.Builder remotePublishConfiguredNodeSetting() { + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + remoteRepoPrefix, + REPOSITORY_NAME + ); + String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey(); + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + remoteRepoPrefix, + REPOSITORY_NAME + ); + String routingTableRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + remoteRepoPrefix, + ROUTING_TABLE_REPO_NAME + ); + String routingTableRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + remoteRepoPrefix, + ROUTING_TABLE_REPO_NAME + ); + + Settings.Builder builder = Settings.builder() + .put("node.attr." + remoteRepoPrefix + ".state.repository", REPOSITORY_NAME) + .put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) + .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put("node.attr." + remoteRepoPrefix + ".routing_table.repository", ROUTING_TABLE_REPO_NAME) + .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) + .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath); + return builder; + } + + public Settings.Builder remoteWithRoutingTableNodeSetting() { + // Remote Cluster with Routing table + return Settings.builder() + .put( + buildRemoteStoreNodeAttributes( + REPOSITORY_NAME, + segmentRepoPath, + REPOSITORY_2_NAME, + translogRepoPath, + ROUTING_TABLE_REPO_NAME, + segmentRepoPath, + false + ) + ) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true); + } + + public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + + ensureStableCluster(3); + ensureGreen(); + + internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull); + } + + public void testServiceInitialized_WhenNodeAttributesPresent() { + internalCluster().startClusterManagerOnlyNode( + buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE) + ); + internalCluster().startDataOnlyNodes( + 2, + buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE) + ); + + ensureStableCluster(3); + ensureGreen(); + + internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull); + } + + public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + + Settings.Builder build = remotePublishConfiguredNodeSetting(); + internalCluster().startClusterManagerOnlyNode(build.build()); + internalCluster().startDataOnlyNodes(2, build.build()); + + ensureStableCluster(6); + ensureGreen(); + } + + public void testRemotePublishConfigNodeJoinRemoteCluster() throws Exception { + // Remote Cluster without Routing table + setAddRemote(true); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + setAddRemote(false); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + Settings.Builder build = remotePublishConfiguredNodeSetting(); + internalCluster().startClusterManagerOnlyNode(build.build()); + ensureStableCluster(4); + ensureGreen(); + } + + public void testRemoteNodeWithRoutingTableJoinRemoteCluster() throws Exception { + setAddRemote(true); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + setAddRemote(false); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // Remote Repo with Routing table + Settings settings = remoteWithRoutingTableNodeSetting().build(); + + internalCluster().startClusterManagerOnlyNode(settings); + ensureStableCluster(4); + ensureGreen(); + } + + public void testNonRemoteNodeJoinRemoteWithRoutingCluster() throws Exception { + Settings settings = remoteWithRoutingTableNodeSetting().build(); + internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNodes(2, settings); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + internalCluster().startClusterManagerOnlyNode(); + ensureStableCluster(4); + ensureGreen(); + } + + public void testRemotePublishConfigNodeJoinRemoteWithRoutingCluster() throws Exception { + Settings settings = remoteWithRoutingTableNodeSetting().build(); + internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNodes(2, settings); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + internalCluster().startClusterManagerOnlyNode(remotePublishConfiguredNodeSetting().build()); + + ensureStableCluster(4); + ensureGreen(); + } + + public void testNonRemoteNodeJoiningPublishConfigCluster() throws Exception { + Settings.Builder build = remotePublishConfiguredNodeSetting(); + internalCluster().startClusterManagerOnlyNode(build.build()); + internalCluster().startDataOnlyNodes(2, build.build()); + + internalCluster().startClusterManagerOnlyNode(); + + ensureStableCluster(4); + ensureGreen(); + } + + public void testRemoteNodeJoiningPublishConfigCluster() throws Exception { + Settings.Builder build = remotePublishConfiguredNodeSetting(); + internalCluster().startClusterManagerOnlyNode(build.build()); + internalCluster().startDataOnlyNodes(2, build.build()); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + setAddRemote(true); + internalCluster().startClusterManagerOnlyNode(); + ensureStableCluster(4); + ensureGreen(); + } + + public void testRemoteNodeWithRoutingTableJoiningPublishConfigCluster() throws Exception { + Settings.Builder build = remotePublishConfiguredNodeSetting(); + internalCluster().startClusterManagerOnlyNode(build.build()); + internalCluster().startDataOnlyNodes(2, build.build()); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + ); + + Settings settings = Settings.builder() + .put( + buildRemoteStoreNodeAttributes( + REPOSITORY_NAME, + segmentRepoPath, + ReloadableFsRepository.TYPE, + REPOSITORY_2_NAME, + translogRepoPath, + FsRepository.TYPE, + ROUTING_TABLE_REPO_NAME, + segmentRepoPath, + ReloadableFsRepository.TYPE, + false + ) + ) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + internalCluster().startClusterManagerOnlyNode(settings); + + ensureStableCluster(4); + ensureGreen(); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index e52ac11e07532..56592f24c292a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -57,6 +57,7 @@ import org.opensearch.transport.TransportService; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -71,7 +72,8 @@ import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getClusterStateRepoName; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRoutingTableRepoName; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; @@ -600,7 +602,12 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi .findFirst(); if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) { - ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES); + List repos = Arrays.asList( + getClusterStateRepoName(remotePublicationNode.get().getAttributes()), + getRoutingTableRepoName(remotePublicationNode.get().getAttributes()) + ); + + ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), repos); } } @@ -629,16 +636,12 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod List reposToSkip = new ArrayList<>(1); // find a remote node which has routing table configured Optional remoteRoutingTableNode = existingNodes.stream() - .filter( - node -> node.isRemoteStoreNode() - && node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null - ) + .filter(node -> node.isRemoteStoreNode() && RemoteStoreNodeAttribute.getRoutingTableRepoName(node.getAttributes()) != null) .findFirst(); // If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node. // This ensures a new node with remote routing table repo is able to join the cluster. if (remoteRoutingTableNode.isEmpty()) { - String joiningNodeRepoName = joiningNode.getAttributes() - .get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY); + String joiningNodeRepoName = getRoutingTableRepoName(joiningNode.getAttributes()); if (joiningNodeRepoName != null) { reposToSkip.add(joiningNodeRepoName); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index cdb3e5f615e22..05588620348aa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1185,12 +1185,8 @@ public static void updateRemoteStoreSettings( .findFirst(); if (remoteNode.isPresent()) { - translogRepo = remoteNode.get() - .getAttributes() - .get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); - segmentRepo = remoteNode.get() - .getAttributes() - .get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + translogRepo = RemoteStoreNodeAttribute.getTranslogRepoName(remoteNode.get().getAttributes()); + segmentRepo = RemoteStoreNodeAttribute.getSegmentRepoName(remoteNode.get().getAttributes()); if (segmentRepo != null && translogRepo != null) { settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) .put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo) diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 4f19e6de5084e..bef19541703f2 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -46,6 +46,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.node.Node; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.io.IOException; import java.util.Collections; @@ -64,10 +65,9 @@ import java.util.stream.Stream; import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isClusterStateRepoConfigured; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRoutingTableRepoConfigured; /** * A discovery node represents a node that is part of the cluster. @@ -555,8 +555,7 @@ public boolean isSearchNode() { * @return true if the node contains remote store node attributes, false otherwise */ public boolean isRemoteStoreNode() { - return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) - && this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)); + return isClusterStateRepoConfigured(this.getAttributes()) && RemoteStoreNodeAttribute.isSegmentRepoConfigured(this.getAttributes()); } /** @@ -564,11 +563,7 @@ public boolean isRemoteStoreNode() { * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ public boolean isRemoteStatePublicationEnabled() { - return this.getAttributes() - .keySet() - .stream() - .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))) - && this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + return isClusterStateRepoConfigured(this.getAttributes()) && isRoutingTableRepoConfigured(this.getAttributes()); } /** @@ -632,13 +627,16 @@ public String toString() { sb.append('}'); } if (!attributes.isEmpty()) { - sb.append( - attributes.entrySet() - .stream() - .filter(entry -> !entry.getKey().startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)) // filter remote_store attributes - // from logging to reduce noise. - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) - ); + sb.append(attributes.entrySet().stream().filter(entry -> { + for (String prefix : REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX) { + if (entry.getKey().startsWith(prefix)) { + return false; + } + } + return true; + }) // filter remote_store attributes + // from logging to reduce noise. + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); } return sb.toString(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index ad4f28f568def..e2f2ad3bad610 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -33,7 +33,6 @@ import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff; import org.opensearch.index.translog.transfer.BlobStoreTransferService; -import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -234,9 +233,7 @@ protected void doClose() throws IOException { @Override protected void doStart() { assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled"; - final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY - ); + final String remoteStoreRepo = RemoteStoreNodeAttribute.getRoutingTableRepoName(settings); assert remoteStoreRepo != null : "Remote routing table repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 5cee0fb7c0121..7c65516132b50 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -61,7 +61,6 @@ import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff; import org.opensearch.index.translog.transfer.BlobStoreTransferService; -import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -1065,9 +1064,8 @@ public void close() throws IOException { public void start() { assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled"; - final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY - ); + final String remoteStoreRepo = RemoteStoreNodeAttribute.getClusterStateRepoName(settings); + assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 2a76a5b966884..18b6d6184d1b0 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -26,7 +26,6 @@ import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -70,11 +69,6 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener { private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s"; private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s"; - static final String TRANSLOG_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey() - + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; - static final String SEGMENT_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey() - + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; - private static final Logger logger = LogManager.getLogger(RemoteIndexPathUploader.class); private final Settings settings; @@ -226,9 +220,8 @@ private void writePathToRemoteStore( } } - private Repository validateAndGetRepository(String repoSetting) { - final String repo = settings.get(repoSetting); - assert repo != null : "Remote " + repoSetting + " repository is not configured"; + private Repository validateAndGetRepository(String repo) { + assert repo != null : "Remote repository is not configured"; final Repository repository = repositoriesService.get().repository(repo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; return repository; @@ -240,15 +233,16 @@ public void start() { // If remote store data attributes are not present than we skip this. return; } - translogRepository = (BlobStoreRepository) validateAndGetRepository(TRANSLOG_REPO_NAME_KEY); - segmentRepository = (BlobStoreRepository) validateAndGetRepository(SEGMENT_REPO_NAME_KEY); + + translogRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings)); + segmentRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings)); } private boolean isTranslogSegmentRepoSame() { // TODO - The current comparison checks the repository name. But it is also possible that the repository are same // by attributes, but different by name. We need to handle this. - String translogRepoName = settings.get(TRANSLOG_REPO_NAME_KEY); - String segmentRepoName = settings.get(SEGMENT_REPO_NAME_KEY); + String translogRepoName = RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings); + String segmentRepoName = RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings); return Objects.equals(translogRepoName, segmentRepoName); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java index cc51fcd2f18f6..1f9ffca4460b7 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -18,6 +18,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.util.List; import java.util.Map; @@ -30,8 +31,6 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration; import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; /** * Utils for checking and mutating cluster state during remote migration @@ -74,8 +73,9 @@ public void maybeAddRemoteIndexSettings(IndexMetadata.Builder indexMetadataBuild index ); Map remoteRepoNames = getRemoteStoreRepoName(discoveryNodes); - String segmentRepoName = remoteRepoNames.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); - String tlogRepoName = remoteRepoNames.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + String segmentRepoName = RemoteStoreNodeAttribute.getSegmentRepoName(remoteRepoNames); + String tlogRepoName = RemoteStoreNodeAttribute.getTranslogRepoName(remoteRepoNames); + assert Objects.nonNull(segmentRepoName) && Objects.nonNull(tlogRepoName) : "Remote repo names cannot be null"; Settings.Builder indexSettingsBuilder = Settings.builder().put(currentIndexSettings); updateRemoteStoreSettings(indexSettingsBuilder, segmentRepoName, tlogRepoName); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index b1b6259e4ca18..89d06753063b7 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -12,12 +12,14 @@ import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -35,10 +37,28 @@ */ public class RemoteStoreNodeAttribute { - public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store"; + public static final List REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = List.of("remote_store", "remote_publication"); + + // TO-DO the string constants are used only for tests and can be moved to test package + public static final String REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.state.repository"; public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository"; public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository"; - public static final String REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.state.repository"; + public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository"; + + public static final List REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS = REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX.stream() + .map(prefix -> prefix + ".state.repository") + .collect(Collectors.toList()); + + public static final List REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS = REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX.stream() + .map(prefix -> prefix + ".routing_table.repository") + .collect(Collectors.toList()); + public static final List REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS = REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX.stream() + .map(prefix -> prefix + ".segment.repository") + .collect(Collectors.toList()); + public static final List REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS = REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX.stream() + .map(prefix -> prefix + ".translog.repository") + .collect(Collectors.toList()); + public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type"; public static final String REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s." + CryptoMetadata.CRYPTO_METADATA_KEY; @@ -46,18 +66,19 @@ public class RemoteStoreNodeAttribute { + "." + CryptoMetadata.SETTINGS_KEY; public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings."; - public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository"; - private final RepositoriesMetadata repositoriesMetadata; + public static final String REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "%s.repository.%s.type"; + public static final String REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT = "%s.repository.%s." + CryptoMetadata.CRYPTO_METADATA_KEY; + public static final String REPOSITORY_CRYPTO_SETTINGS_PREFIX = REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT + + "." + + CryptoMetadata.SETTINGS_KEY; + public static final String REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "%s.repository.%s.settings."; - public static List SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of( - REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, - REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY - ); + private final RepositoriesMetadata repositoriesMetadata; - public static List REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES = List.of( - REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, - REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY + public static List> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = Arrays.asList( + REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS, + REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS ); /** @@ -76,8 +97,17 @@ private String validateAttributeNonNull(DiscoveryNode node, String attributeKey) return attributeValue; } - private CryptoMetadata buildCryptoMetadata(DiscoveryNode node, String repositoryName) { - String metadataKey = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT, repositoryName); + private Tuple validateAttributeNonNull(DiscoveryNode node, List attributeKeys) { + Tuple attributeValue = getValue(node.getAttributes(), attributeKeys); + if (attributeValue == null || attributeValue.v1() == null || attributeValue.v1().isEmpty()) { + throw new IllegalStateException("joining node [" + node + "] doesn't have the node attribute [" + attributeKeys.get(0) + "]"); + } + + return attributeValue; + } + + private CryptoMetadata buildCryptoMetadata(DiscoveryNode node, String repositoryName, String prefix) { + String metadataKey = String.format(Locale.getDefault(), REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT, prefix, repositoryName); boolean isRepoEncrypted = node.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(metadataKey)); if (isRepoEncrypted == false) { return null; @@ -86,11 +116,7 @@ private CryptoMetadata buildCryptoMetadata(DiscoveryNode node, String repository String keyProviderName = validateAttributeNonNull(node, metadataKey + "." + CryptoMetadata.KEY_PROVIDER_NAME_KEY); String keyProviderType = validateAttributeNonNull(node, metadataKey + "." + CryptoMetadata.KEY_PROVIDER_TYPE_KEY); - String settingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - REMOTE_STORE_REPOSITORY_CRYPTO_SETTINGS_PREFIX, - repositoryName - ); + String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REPOSITORY_CRYPTO_SETTINGS_PREFIX, prefix, repositoryName); Map settingsMap = node.getAttributes() .keySet() @@ -104,10 +130,11 @@ private CryptoMetadata buildCryptoMetadata(DiscoveryNode node, String repository return new CryptoMetadata(keyProviderName, keyProviderType, settings.build()); } - private Map validateSettingsAttributesNonNull(DiscoveryNode node, String repositoryName) { + private Map validateSettingsAttributesNonNull(DiscoveryNode node, String repositoryName, String prefix) { String settingsAttributeKeyPrefix = String.format( Locale.getDefault(), - REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + prefix, repositoryName ); Map settingsMap = node.getAttributes() @@ -125,17 +152,17 @@ private Map validateSettingsAttributesNonNull(DiscoveryNode node return settingsMap; } - private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { + private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name, String prefix) { String type = validateAttributeNonNull( node, - String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name) + String.format(Locale.getDefault(), REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, prefix, name) ); - Map settingsMap = validateSettingsAttributesNonNull(node, name); + Map settingsMap = validateSettingsAttributesNonNull(node, name, prefix); Settings.Builder settings = Settings.builder(); settingsMap.forEach(settings::put); - CryptoMetadata cryptoMetadata = buildCryptoMetadata(node, name); + CryptoMetadata cryptoMetadata = buildCryptoMetadata(node, name, prefix); // Repository metadata built here will always be for a system repository. settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); @@ -144,53 +171,104 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na } private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) { - Set repositoryNames = getValidatedRepositoryNames(node); + Map repositoryNamesWithPrefix = getValidatedRepositoryNames(node); List repositoryMetadataList = new ArrayList<>(); - for (String repositoryName : repositoryNames) { - repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName)); + for (Map.Entry repository : repositoryNamesWithPrefix.entrySet()) { + repositoryMetadataList.add(buildRepositoryMetadata(node, repository.getKey(), repository.getValue())); } return new RepositoriesMetadata(repositoryMetadataList); } - private Set getValidatedRepositoryNames(DiscoveryNode node) { - Set repositoryNames = new HashSet<>(); - if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) - || node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) { - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)); - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)); - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); - } else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) { - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + private static Tuple getValue(Map attributes, List keys) { + for (String key : keys) { + if (attributes.containsKey(key)) { + return new Tuple<>(attributes.get(key), key); + } } - if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) { - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + return null; + } + + private Map getValidatedRepositoryNames(DiscoveryNode node) { + Set> repositoryNames = new HashSet<>(); + if (containsKey(node.getAttributes(), REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS) + || containsKey(node.getAttributes(), REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS)); + repositoryNames.add(validateAttributeNonNull(node, REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS)); + repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS)); + } else if (containsKey(node.getAttributes(), REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS)); + } + if (containsKey(node.getAttributes(), REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS)); } - return repositoryNames; + Map repoNamesWithPrefix = new HashMap<>(); + repositoryNames.forEach(t -> { + String[] attrKeyParts = t.v2().split("\\."); + repoNamesWithPrefix.put(t.v1(), attrKeyParts[0]); + }); + + return repoNamesWithPrefix; } public static boolean isRemoteStoreAttributePresent(Settings settings) { - return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false; + for (String prefix : REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX) { + if (settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + prefix).isEmpty() == false) { + return true; + } + } + return false; } public static boolean isRemoteDataAttributePresent(Settings settings) { - return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false - || settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false; + return isSegmentRepoConfigured(settings) || isTranslogRepoConfigured(settings); + } + + public static boolean isSegmentRepoConfigured(Settings settings) { + for (String prefix : REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + prefix).isEmpty() == false) { + return true; + } + } + return false; + } + + public static boolean isTranslogRepoConfigured(Settings settings) { + for (String prefix : REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + prefix).isEmpty() == false) { + return true; + } + } + return false; } public static boolean isRemoteClusterStateConfigured(Settings settings) { - return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY) - .isEmpty() == false; + for (String prefix : REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + prefix).isEmpty() == false) { + return true; + } + } + return false; } public static String getRemoteStoreSegmentRepo(Settings settings) { - return settings.get(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + for (String prefix : REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.get(Node.NODE_ATTRIBUTES.getKey() + prefix) != null) { + return settings.get(Node.NODE_ATTRIBUTES.getKey() + prefix); + } + } + return null; } public static String getRemoteStoreTranslogRepo(Settings settings) { - return settings.get(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + for (String prefix : REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.get(Node.NODE_ATTRIBUTES.getKey() + prefix) != null) { + return settings.get(Node.NODE_ATTRIBUTES.getKey() + prefix); + } + } + return null; } public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { @@ -198,8 +276,12 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { } private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { - return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) - .isEmpty() == false; + for (String prefix : REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS) { + if (settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + prefix).isEmpty() == false) { + return true; + } + } + return false; } public static boolean isRemoteRoutingTableConfigured(Settings settings) { @@ -219,21 +301,83 @@ public RepositoriesMetadata getRepositoriesMetadata() { public static Map getDataRepoNames(DiscoveryNode node) { assert remoteDataAttributesPresent(node.getAttributes()); Map dataRepoNames = new HashMap<>(); - for (String supportedRepoAttribute : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { - dataRepoNames.put(supportedRepoAttribute, node.getAttributes().get(supportedRepoAttribute)); + for (List supportedRepoAttribute : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { + Tuple value = getValue(node.getAttributes(), supportedRepoAttribute); + if (value != null && value.v1() != null) { + dataRepoNames.put(value.v2(), value.v1()); + } } return dataRepoNames; } private static boolean remoteDataAttributesPresent(Map nodeAttrs) { - for (String supportedRepoAttributes : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { - if (nodeAttrs.get(supportedRepoAttributes) == null || nodeAttrs.get(supportedRepoAttributes).isEmpty()) { + for (List supportedRepoAttribute : SUPPORTED_DATA_REPO_NAME_ATTRIBUTES) { + Tuple value = getValue(nodeAttrs, supportedRepoAttribute); + if (value == null || value.v1() == null) { return false; } } return true; } + public static String getClusterStateRepoName(Map repos) { + return getValueFromAnyKey(repos, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static String getRoutingTableRepoName(Map repos) { + return getValueFromAnyKey(repos, REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static String getSegmentRepoName(Map repos) { + return getValueFromAnyKey(repos, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static String getTranslogRepoName(Map repos) { + return getValueFromAnyKey(repos, REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + private static String getValueFromAnyKey(Map repos, List keys) { + for (String key : keys) { + if (repos.get(key) != null) { + return repos.get(key); + } + } + return null; + } + + public static String getClusterStateRepoName(Settings settings) { + return getValueFromAnyKey(settings, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static String getRoutingTableRepoName(Settings settings) { + return getValueFromAnyKey(settings, REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + private static String getValueFromAnyKey(Settings settings, List keys) { + for (String key : keys) { + if (settings.get(Node.NODE_ATTRIBUTES.getKey() + key) != null) { + return settings.get(Node.NODE_ATTRIBUTES.getKey() + key); + } + } + return null; + } + + public static boolean isClusterStateRepoConfigured(Map attributes) { + return containsKey(attributes, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static boolean isRoutingTableRepoConfigured(Map attributes) { + return containsKey(attributes, REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + public static boolean isSegmentRepoConfigured(Map attributes) { + return containsKey(attributes, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS); + } + + private static boolean containsKey(Map attributes, List keys) { + return keys.stream().filter(k -> attributes.containsKey(k)).findFirst().isPresent(); + } + @Override public int hashCode() { // The hashCode is generated by computing the hash of all the repositoryMetadata present in diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 98fcad0e6c496..f5b372ddd9b80 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -21,7 +21,6 @@ import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.core.action.ActionListener; import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.node.Node; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -42,6 +41,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo; + /** * Service for managing pinned timestamps in a remote store. * This service handles pinning and unpinning of timestamps, as well as periodic updates of the pinned timestamps set. @@ -86,9 +87,7 @@ public void start() { } private static BlobContainer validateAndCreateBlobContainer(Settings settings, RepositoriesService repositoriesService) { - final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY - ); + final String remoteStoreRepo = getRemoteStoreSegmentRepo(settings); assert remoteStoreRepo != null : "Remote Segment Store repository is not configured"; final Repository repository = repositoriesService.repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java index 9f406101203a0..47cb6404e5bc8 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java @@ -100,7 +100,7 @@ public void testRemoteStoreRedactionInToString() { roles, Version.CURRENT ); - assertFalse(node.toString().contains(RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)); + assertFalse(node.toString().contains(RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX.get(0))); } public void testDiscoveryNodeIsCreatedWithHostFromInetAddress() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index d6519d9db8ee6..2e6523a4a64a0 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -80,11 +80,16 @@ public class RemoteIndexPathUploaderTests extends OpenSearchTestCase { private final AtomicLong successCount = new AtomicLong(); private final AtomicLong failureCount = new AtomicLong(); + static final String TRANSLOG_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey() + + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; + static final String SEGMENT_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey() + + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; + @Before public void setup() { settings = Settings.builder() - .put(RemoteIndexPathUploader.TRANSLOG_REPO_NAME_KEY, TRANSLOG_REPO_NAME) - .put(RemoteIndexPathUploader.SEGMENT_REPO_NAME_KEY, TRANSLOG_REPO_NAME) + .put(TRANSLOG_REPO_NAME_KEY, TRANSLOG_REPO_NAME) + .put(SEGMENT_REPO_NAME_KEY, TRANSLOG_REPO_NAME) .put(CLUSTER_STATE_REPO_KEY, TRANSLOG_REPO_NAME) .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); @@ -247,10 +252,7 @@ public void testInterceptWithSameRepo() throws IOException { } public void testInterceptWithDifferentRepo() throws IOException { - Settings settings = Settings.builder() - .put(this.settings) - .put(RemoteIndexPathUploader.SEGMENT_REPO_NAME_KEY, SEGMENT_REPO_NAME) - .build(); + Settings settings = Settings.builder().put(this.settings).put(SEGMENT_REPO_NAME_KEY, SEGMENT_REPO_NAME).build(); when(repositoriesService.repository(SEGMENT_REPO_NAME)).thenReturn(repository); RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, diff --git a/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java index de7f8977686a7..537a5a5739b75 100644 --- a/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java +++ b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java @@ -32,12 +32,66 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_CRYPTO_SETTINGS_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.test.RemoteStoreAttributeConstants.REMOTE_PUBLICATION_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.RemoteStoreAttributeConstants.REMOTE_PUBLICATION_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.RemoteStoreAttributeConstants.REMOTE_PUBLICATION_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; public class RemoteStoreNodeAttributeTests extends OpenSearchTestCase { static private final String KEY_ARN = "arn:aws:kms:us-east-1:123456789:key/6e9aa906-2cc3-4924-8ded-f385c78d9dcf"; static private final String REGION = "us-east-1"; + public void testCryptoMetadataForPublication() throws UnknownHostException { + String repoName = "remote-store-A"; + String prefix = "remote_publication"; + String repoTypeSettingKey = String.format(Locale.ROOT, REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, prefix, repoName); + String repoSettingsKey = String.format(Locale.ROOT, REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, prefix, repoName); + String repoCryptoMetadataKey = String.format(Locale.ROOT, REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT, prefix, repoName); + String repoCryptoMetadataSettingsKey = String.format(Locale.ROOT, REPOSITORY_CRYPTO_SETTINGS_PREFIX, prefix, repoName); + Map attr = Map.of( + REMOTE_PUBLICATION_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_PUBLICATION_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_PUBLICATION_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + repoTypeSettingKey, + "s3", + repoSettingsKey, + "abc", + repoSettingsKey + "base_path", + "xyz", + repoCryptoMetadataKey + ".key_provider_name", + "store-test", + repoCryptoMetadataKey + ".key_provider_type", + "aws-kms", + repoCryptoMetadataSettingsKey + ".region", + REGION, + repoCryptoMetadataSettingsKey + ".key_arn", + KEY_ARN + ); + DiscoveryNode node = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr, + emptySet(), + Version.CURRENT + ); + + RemoteStoreNodeAttribute remoteStoreNodeAttribute = new RemoteStoreNodeAttribute(node); + assertEquals(remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().size(), 1); + RepositoryMetadata repositoryMetadata = remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().get(0); + Settings.Builder settings = Settings.builder(); + settings.put("region", REGION); + settings.put("key_arn", KEY_ARN); + CryptoMetadata cryptoMetadata = new CryptoMetadata("store-test", "aws-kms", settings.build()); + assertEquals(cryptoMetadata, repositoryMetadata.cryptoMetadata()); + } + public void testCryptoMetadata() throws UnknownHostException { String repoName = "remote-store-A"; String repoTypeSettingKey = String.format(Locale.ROOT, REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, repoName); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index ffc5fdac210a4..d84c60989948f 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2881,7 +2881,7 @@ private static Settings buildRemoteStoreNodeAttributes( ); } - private static Settings buildRemoteStoreNodeAttributes( + protected static Settings buildRemoteStoreNodeAttributes( String segmentRepoName, Path segmentRepoPath, String segmentRepoType, diff --git a/test/framework/src/main/java/org/opensearch/test/RemoteStoreAttributeConstants.java b/test/framework/src/main/java/org/opensearch/test/RemoteStoreAttributeConstants.java new file mode 100644 index 0000000000000..0e7ebb9f871f6 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/RemoteStoreAttributeConstants.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test; + +public class RemoteStoreAttributeConstants { + + public static final String REMOTE_PUBLICATION_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_publication.state.repository"; + public static final String REMOTE_PUBLICATION_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_publication.segment.repository"; + public static final String REMOTE_PUBLICATION_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_publication.translog.repository"; + public static final String REMOTE_PUBLICATION_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = + "remote_publication.routing_table.repository"; + +}