diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java new file mode 100644 index 0000000000000..13fd6e91135f5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -0,0 +1,260 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.junit.Before; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; + +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Set; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.*; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase { + private static final String INDEX_NAME = "test-index"; + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(RemoteClusterStateService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.toString()) + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME) + .put(REMOTE_PUBLICATION_EXPERIMENTAL, true) + .build(); + } + + private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX; + + public void testRemoteRoutingTableIndexLifecycle() throws Exception { + prepareCluster(1, 3, INDEX_NAME, 1, 1); + ensureGreen(INDEX_NAME); + + BlobPath baseMetadataPath = getBaseMetadataPath(repository); + List indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values()); + BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID()); + + AtomicInteger indexRoutingFiles = new AtomicInteger(); + assertBusy(() -> { + indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size()); + assertTrue(indexRoutingFiles.get() > 0); + }); + + RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); + verifyUpdatesInManifestFile(remoteManifestManager); + + List routingTableVersions = getRoutingTableVersionsFromAllNodes(); + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Update index settings + updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2); + assertBusy(() -> { + int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size(); + assertTrue(indexRoutingFilesAfterUpdate > indexRoutingFiles.get()); + }); + + verifyUpdatesInManifestFile(remoteManifestManager); + + routingTableVersions = getRoutingTableVersionsFromAllNodes(); + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Delete the index and assert its deletion + deleteIndexAndVerify(remoteManifestManager); + + routingTableVersions = getRoutingTableVersionsFromAllNodes(); + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + } + + public void testRemoteRoutingTableIndexNodeRestart() throws Exception { + prepareCluster(1, 3, INDEX_NAME, 1, 1); + ensureGreen(INDEX_NAME); + + BlobPath baseMetadataPath = getBaseMetadataPath(repository); + List indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values()); + BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID()); + + AtomicInteger indexRoutingFiles = new AtomicInteger(); + assertBusy(() -> { + indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size()); + assertTrue(indexRoutingFiles.get() > 0); + }); + + List routingTableVersions = getRoutingTableVersionsFromAllNodes(); + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Ensure node comes healthy after restart + Set dataNodes = internalCluster().getDataNodeNames(); + internalCluster().restartNode(randomFrom(dataNodes)); + ensureGreen(); + ensureGreen(INDEX_NAME); + + // ensure restarted node joins and the cluster is stable + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + ensureStableCluster(4); + + assertBusy(() -> { + int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size(); + assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get()); + }); + + RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); + verifyUpdatesInManifestFile(remoteManifestManager); + } + + public void testRemoteRoutingTableIndexMasterRestart() throws Exception { + prepareCluster(1, 3, INDEX_NAME, 1, 1); + ensureGreen(INDEX_NAME); + + BlobPath baseMetadataPath = getBaseMetadataPath(repository); + List indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values()); + BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID()); + + AtomicInteger indexRoutingFiles = new AtomicInteger(); + assertBusy(() -> { + indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size()); + assertTrue(indexRoutingFiles.get() > 0); + }); + + List routingTableVersions = getRoutingTableVersionsFromAllNodes(); + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Ensure node comes healthy after restart + String clusterManagerName = internalCluster().getClusterManagerName(); + internalCluster().restartNode(clusterManagerName); + ensureGreen(); + ensureGreen(INDEX_NAME); + + // ensure master is elected and the cluster is stable + assertEquals(1, internalCluster().clusterService().state().nodes().getClusterManagerNode()); + ensureStableCluster(4); + + assertBusy(() -> { + int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size(); + assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get()); + }); + + RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); + verifyUpdatesInManifestFile(remoteManifestManager); + } + + private BlobPath getBaseMetadataPath(BlobStoreRepository repository) { + return repository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + } + + private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID) { + RemoteStoreEnums.PathHashAlgorithm pathHashAlgo = RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; + return pathType.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath(indexRoutingPath) + .indexUUID(indexUUID) + .build(), + pathHashAlgo + ); + } + + private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) { + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + assertTrue(latestManifest.isPresent()); + ClusterMetadataManifest manifest = latestManifest.get(); + assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME)); + assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty()); + } + + private List getRoutingTableVersionsFromAllNodes() throws ExecutionException, InterruptedException { + String[] allNodes = internalCluster().getNodeNames(); + List routingTableVersions = new ArrayList<>(); + for (String node : allNodes) { + RoutingTable routingTable = internalCluster().client(node) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .routingTable(); + routingTableVersions.add(routingTable.version()); + } + return routingTableVersions; + } + + private void updateIndexSettings(String indexName, String settingKey, int settingValue) { + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(settingKey, settingValue)) + .execute() + .actionGet(); + } + + private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) { + client().admin().indices().prepareDelete(INDEX_NAME).execute().actionGet(); + assertFalse(client().admin().indices().prepareExists(INDEX_NAME).get().isExists()); + + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + assertTrue(latestManifest.isPresent()); + ClusterMetadataManifest manifest = latestManifest.get(); + assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty()); + assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME)); + } + + private boolean areRoutingTableVersionsSame(List routingTableVersions) { + if (routingTableVersions == null || routingTableVersions.isEmpty()) { + return false; + } + + Long firstVersion = routingTableVersions.get(0); + for (Long routingTableVersion : routingTableVersions) { + if (!firstVersion.equals(routingTableVersion)) { + logger.info("Responses are not the same: {} {}", firstVersion, routingTableVersion); + return false; + } + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 74abe9cd257b4..349b6fbcf834b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1437,7 +1437,7 @@ public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { this.remoteStateReadTimeout = remoteStateReadTimeout; } - private BlobStoreTransferService getBlobStoreTransferService() { + public BlobStoreTransferService getBlobStoreTransferService() { if (blobStoreTransferService == null) { blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); }