Skip to content

Commit

Permalink
Port Changes
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Apr 8, 2024
1 parent 9b0f578 commit 8bc1b40
Show file tree
Hide file tree
Showing 7 changed files with 851 additions and 49 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Optional;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.assertNodeInCluster;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.prepareIndexWithoutReplica;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setClusterMode;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setDirection;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteStoreMigrationSettingsUpdateIT extends MigrationBaseTestCase {

private Client client;

// remote store backed index setting tests

public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() {
logger.info(" --> initialize cluster: gives non remote cluster manager");
initializeCluster(false);

String indexName1 = "test_index_1";
String indexName2 = "test_index_2";

logger.info(" --> add non-remote node");
addRemote = false;
String nonRemoteNodeName = internalCluster().startNode();
internalCluster().validateClusterFormed();
assertNodeInCluster(nonRemoteNodeName);

logger.info(" --> create an index");
prepareIndexWithoutReplica(Optional.of(indexName1));

logger.info(" --> verify that non remote-backed index is created");
assertNonRemoteStoreBackedIndex(indexName1);

logger.info(" --> set mixed cluster compatibility mode and remote_store direction");
setClusterMode(MIXED.mode);
setDirection(REMOTE_STORE.direction);

logger.info(" --> add remote node");
addRemote = true;
String remoteNodeName = internalCluster().startNode();
internalCluster().validateClusterFormed();
assertNodeInCluster(remoteNodeName);

logger.info(" --> create another index");
prepareIndexWithoutReplica(Optional.of(indexName2));

logger.info(" --> verify that remote backed index is created");
assertRemoteStoreBackedIndex(indexName2);
}

// verify that the created index is not remote store backed
private void assertNonRemoteStoreBackedIndex(String indexName) {
Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName);
assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY));
assertNull(indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
}

// verify that the created index is remote store backed
private void assertRemoteStoreBackedIndex(String indexName) {
Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName);
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
assertEquals("true", indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertEquals(REPOSITORY_NAME, indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY));
assertEquals(REPOSITORY_2_NAME, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
assertEquals(
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings)
);
}

private void initializeCluster(boolean remoteClusterManager) {
addRemote = remoteClusterManager;
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startNodes(1);
client = internalCluster().client();
setClusterMode(STRICT.mode);
setDirection(NONE.direction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -100,7 +101,6 @@
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -144,6 +144,7 @@
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;

/**
* Service responsible for submitting create index requests
Expand Down Expand Up @@ -945,7 +946,7 @@ static Settings aggregateIndexSettings(
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());

updateReplicationStrategy(indexSettingsBuilder, request.settings(), settings, combinedTemplateSettings);
updateRemoteStoreSettings(indexSettingsBuilder, settings);
updateRemoteStoreSettings(indexSettingsBuilder, currentState, clusterSettings, settings, request.index());

if (sourceMetadata != null) {
assert request.resizeType() != null;
Expand Down Expand Up @@ -1023,23 +1024,50 @@ private static void updateReplicationStrategy(
/**
* Updates index settings to enable remote store by default based on node attributes
* @param settingsBuilder index settings builder to be updated with relevant settings
* @param clusterState state of cluster
* @param clusterSettings cluster level settings
* @param nodeSettings node level settings
* @param indexName name of index
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteDataAttributePresent(clusterSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
clusterSettings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
)
)
.put(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
clusterSettings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
)
);
public static void updateRemoteStoreSettings(
Settings.Builder settingsBuilder,
ClusterState clusterState,
ClusterSettings clusterSettings,
Settings nodeSettings,
String indexName
) {
if (isRemoteDataAttributePresent(nodeSettings) || isMigratingToRemoteStore(clusterSettings)) {
String segmentRepo, translogRepo;

Optional<DiscoveryNode> remoteNode = clusterState.nodes()
.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStoreNode)
.findFirst();

if (remoteNode.isPresent()) {
translogRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
segmentRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (segmentRepo != null && translogRepo != null) {
if (isMigratingToRemoteStore(clusterSettings)) {
settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
}
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo)
.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepo);
} else {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(
Collections.singletonList("Cluster is migrating to remote store but no remote node found, failing index creation")
);
throw new IndexCreationException(indexName, validationException);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -223,4 +224,18 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
return existingRepositories;
}
}

/**
* To check if the cluster is undergoing remote store migration
* @param clusterSettings cluster level settings
* @return
* <code>true</code> For <code>REMOTE_STORE</code> migration direction and <code>MIXED</code> compatibility mode,
* <code>false</code> otherwise
*/
public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings) {
boolean isMixedMode = clusterSettings.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(CompatibilityMode.MIXED);
boolean isRemoteStoreMigrationDirection = clusterSettings.get(MIGRATION_DIRECTION_SETTING).equals(Direction.REMOTE_STORE);

return (isMixedMode && isRemoteStoreMigrationDirection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -922,16 +922,16 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
);
}

private static final String SEGMENT_REPO = "segment-repo";
private static final String TRANSLOG_REPO = "translog-repo";
public static final String SEGMENT_REPO = "segment-repo";
public static final String TRANSLOG_REPO = "translog-repo";
private static final String CLUSTER_STATE_REPO = "cluster-state-repo";
private static final String COMMON_REPO = "remote-repo";

private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) {
public static Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) {
return remoteStoreNodeAttributes(segmentRepoName, translogRepoName, CLUSTER_STATE_REPO);
}

private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName, String clusterStateRepo) {
private static Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName, String clusterStateRepo) {
String segmentRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
Expand Down Expand Up @@ -968,7 +968,7 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
};
}

private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
private static Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
String clusterStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
Expand Down
Loading

0 comments on commit 8bc1b40

Please sign in to comment.