diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java new file mode 100644 index 0000000000000..370da763285a2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.junit.Before; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; + +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.*; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; + +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase { + private static String INDEX_NAME = "test-index"; + + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.toString()) + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME) + .put(REMOTE_PUBLICATION_EXPERIMENTAL, true) + .build(); + } + + private RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX; + + public void testRemoteRoutingTableIndexLifecycle() throws Exception { + prepareCluster(1, 3, INDEX_NAME, 1, 1); + ensureGreen(INDEX_NAME); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(RemoteClusterStateService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + BlobPath indexRoutingPath = baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN); + List indexRoutingTable = new ArrayList<>(getClusterState().routingTable().indicesRouting().values()); + RemoteStoreEnums.PathHashAlgorithm pathHashAlgo = RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; + BlobPath path = pathType.path( + RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID(indexRoutingTable.get(0).getIndex().getUUID()).build(), + pathHashAlgo + ); + AtomicInteger indexRoutingFiles = new AtomicInteger(); + assertBusy(() -> { + indexRoutingFiles.set(repository.blobStore().blobContainer(path).listBlobs().size()); + assertTrue(indexRoutingFiles.get() > 0); + }); + + RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(getClusterState().getClusterName().value(), getClusterState().getMetadata().clusterUUID()); + assertTrue(latestManifest.isPresent()); + ClusterMetadataManifest manifest = latestManifest.get(); + assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME)); + assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty()); + + String[] allNodes = internalCluster().getNodeNames(); + List routingTableVersions = new ArrayList<>(); + + // Retrieve cluster state from all nodes + for (String node : allNodes) { + RoutingTable routingTable = internalCluster().client(node) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get().getState().routingTable(); + routingTableVersions.add(routingTable.version()); + } + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Update index settings + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)) + .execute() + .actionGet(); + assertBusy(() -> { + int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(path).listBlobs().size(); + assertTrue(indexRoutingFilesAfterUpdate > indexRoutingFiles.get()); + }); + + latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(getClusterState().getClusterName().value(), getClusterState().getMetadata().clusterUUID()); + assertTrue(latestManifest.isPresent()); + manifest = latestManifest.get(); + assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME)); + assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty()); + + allNodes = internalCluster().getNodeNames(); + routingTableVersions = new ArrayList<>(); + // Retrieve updated cluster state from all nodes again + for (String node : allNodes) { + RoutingTable routingTable = internalCluster().client(node) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get().getState().routingTable(); + routingTableVersions.add(routingTable.version()); + } + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + + // Delete the index and assert its deletion + client().admin().indices().prepareDelete(INDEX_NAME).execute().actionGet(); + assertFalse(client().admin().indices().prepareExists(INDEX_NAME).get().isExists()); + + latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(getClusterState().getClusterName().value(), getClusterState().getMetadata().clusterUUID()); + assertTrue(latestManifest.isPresent()); + manifest = latestManifest.get(); + assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty()); + assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME)); + + allNodes = internalCluster().getNodeNames(); + routingTableVersions = new ArrayList<>(); + // Retrieve updated cluster state from all nodes again + for (String node : allNodes) { + RoutingTable routingTable = internalCluster().client(node) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get().getState().routingTable(); + routingTableVersions.add(routingTable.version()); + } + assertTrue(areRoutingTableVersionsSame(routingTableVersions)); + } + + private boolean areRoutingTableVersionsSame(List routingTableVersions) { + if (routingTableVersions == null || routingTableVersions.isEmpty()) { + return false; + } + + Long firstVersion = routingTableVersions.get(0); + for (Long routingTableVersion : routingTableVersions) { + if (!firstVersion.equals(routingTableVersion)) { + logger.info("Responses are not same {} {}", firstVersion, routingTableVersion); + return false; + } + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 74abe9cd257b4..349b6fbcf834b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1437,7 +1437,7 @@ public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { this.remoteStateReadTimeout = remoteStateReadTimeout; } - private BlobStoreTransferService getBlobStoreTransferService() { + public BlobStoreTransferService getBlobStoreTransferService() { if (blobStoreTransferService == null) { blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); }