Skip to content

Commit

Permalink
Add integration tests for RemoteRoutingTable.
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
Shailendra Singh committed Jul 4, 2024
1 parent 58d1164 commit 615a179
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.junit.Before;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;

import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.*;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(RemoteClusterStateService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX.toString())
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
.build();
}

private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

public void testRemoteRoutingTableIndexLifecycle() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 1, 1);
ensureGreen(INDEX_NAME);

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());

AtomicInteger indexRoutingFiles = new AtomicInteger();
assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
assertTrue(indexRoutingFiles.get() > 0);
});

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);

List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
assertTrue(areRoutingTableVersionsSame(routingTableVersions));

// Update index settings
updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2);
assertBusy(() -> {
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
assertTrue(indexRoutingFilesAfterUpdate > indexRoutingFiles.get());
});

verifyUpdatesInManifestFile(remoteManifestManager);

routingTableVersions = getRoutingTableVersionsFromAllNodes();
assertTrue(areRoutingTableVersionsSame(routingTableVersions));

// Delete the index and assert its deletion
deleteIndexAndVerify(remoteManifestManager);

routingTableVersions = getRoutingTableVersionsFromAllNodes();
assertTrue(areRoutingTableVersionsSame(routingTableVersions));
}

public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 1, 1);
ensureGreen(INDEX_NAME);

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());

AtomicInteger indexRoutingFiles = new AtomicInteger();
assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
assertTrue(indexRoutingFiles.get() > 0);
});

List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
assertTrue(areRoutingTableVersionsSame(routingTableVersions));

// Ensure node comes healthy after restart
Set<String> dataNodes = internalCluster().getDataNodeNames();
internalCluster().restartNode(randomFrom(dataNodes));
ensureGreen();
ensureGreen(INDEX_NAME);

// ensure restarted node joins and the cluster is stable
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
ensureStableCluster(4);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
}

public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 1, 1);
ensureGreen(INDEX_NAME);

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
BlobPath indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_PATH_TOKEN), indexRoutingTables.get(0).getIndex().getUUID());

AtomicInteger indexRoutingFiles = new AtomicInteger();
assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
assertTrue(indexRoutingFiles.get() > 0);
});

List<Long> routingTableVersions = getRoutingTableVersionsFromAllNodes();
assertTrue(areRoutingTableVersionsSame(routingTableVersions));

// Ensure node comes healthy after restart
String clusterManagerName = internalCluster().getClusterManagerName();
internalCluster().restartNode(clusterManagerName);
ensureGreen();
ensureGreen(INDEX_NAME);

// ensure master is elected and the cluster is stable
assertEquals(1, internalCluster().clusterService().state().nodes().getClusterManagerNode());
ensureStableCluster(4);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
}

private BlobPath getBaseMetadataPath(BlobStoreRepository repository) {
return repository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)))
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
}

private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID) {
RemoteStoreEnums.PathHashAlgorithm pathHashAlgo = RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64;
return pathType.path(
RemoteStorePathStrategy.PathInput.builder()
.basePath(indexRoutingPath)
.indexUUID(indexUUID)
.build(),
pathHashAlgo
);
}

private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) {
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME));
assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty());
}

private List<Long> getRoutingTableVersionsFromAllNodes() throws ExecutionException, InterruptedException {
String[] allNodes = internalCluster().getNodeNames();
List<Long> routingTableVersions = new ArrayList<>();
for (String node : allNodes) {
RoutingTable routingTable = internalCluster().client(node)
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.routingTable();
routingTableVersions.add(routingTable.version());
}
return routingTableVersions;
}

private void updateIndexSettings(String indexName, String settingKey, int settingValue) {
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(settingKey, settingValue))
.execute()
.actionGet();
}

private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) {
client().admin().indices().prepareDelete(INDEX_NAME).execute().actionGet();
assertFalse(client().admin().indices().prepareExists(INDEX_NAME).get().isExists());

Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty());
assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME));
}

private boolean areRoutingTableVersionsSame(List<Long> routingTableVersions) {
if (routingTableVersions == null || routingTableVersions.isEmpty()) {
return false;
}

Long firstVersion = routingTableVersions.get(0);
for (Long routingTableVersion : routingTableVersions) {
if (!firstVersion.equals(routingTableVersion)) {
logger.info("Responses are not the same: {} {}", firstVersion, routingTableVersion);
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {
this.remoteStateReadTimeout = remoteStateReadTimeout;
}

private BlobStoreTransferService getBlobStoreTransferService() {
public BlobStoreTransferService getBlobStoreTransferService() {
if (blobStoreTransferService == null) {
blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool);
}
Expand Down

0 comments on commit 615a179

Please sign in to comment.