Skip to content

Commit

Permalink
[Segment Replication] Introduce cluster level setting `cluster.index.…
Browse files Browse the repository at this point in the history
…restrict.replication.type` to prevent replication type setting override during index creations (#11583)

* Add new cluster level setting that prevents index level replication type setting overrides

Signed-off-by: Suraj Singh <[email protected]>

* Block restore on mis-matching replication type setting

Signed-off-by: Suraj Singh <[email protected]>

* Address review comments and rebase

Signed-off-by: Suraj Singh <[email protected]>

* Address review comments

Signed-off-by: Suraj Singh <[email protected]>

* Use appropriate variable names

Signed-off-by: Suraj Singh <[email protected]>

* Fix failing integ test

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Dec 21, 2023
1 parent 2c8ee19 commit 6a6ab32
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BWC and API enforcement] Introduce checks for enforcing the API restrictions ([#11175](https://github.com/opensearch-project/OpenSearch/pull/11175))
- Create separate transport action for render search template action ([#11170](https://github.com/opensearch-project/OpenSearch/pull/11170))
- Add additional handling in SearchTemplateRequest when simulate is set to true ([#11591](https://github.com/opensearch-project/OpenSearch/pull/11591))
- Introduce cluster level setting `cluster.index.restrict.replication.type` to prevent replication type setting override during index creations([#11583](https://github.com/opensearch-project/OpenSearch/pull/11583))

### Dependencies
- Bump Lucene from 9.7.0 to 9.8.0 ([10276](https://github.com/opensearch-project/OpenSearch/pull/10276))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,26 @@

import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.shrink.ResizeType;
import org.opensearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.hamcrest.Matchers.hasSize;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase {
Expand All @@ -29,6 +39,9 @@ public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

protected static final String REPLICATION_MISMATCH_VALIDATION_ERROR =
"Validation Failed: 1: index setting [index.replication.type] is not allowed to be set as [cluster.index.restrict.replication.type=true];";

@Override
public Settings indexSettings() {
return Settings.builder()
Expand All @@ -44,14 +57,6 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Exception {
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final String ANOTHER_INDEX = "test-index";
Expand Down Expand Up @@ -123,4 +128,125 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
}

public void testReplicationTypesOverrideNotAllowed_IndexAPI() {
// Generate mutually exclusive replication strategies at cluster and index level
List<ReplicationType> replicationStrategies = getRandomReplicationTypesAsList();
ReplicationType clusterLevelReplication = replicationStrategies.get(0);
ReplicationType indexLevelReplication = replicationStrategies.get(1);
Settings nodeSettings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, clusterLevelReplication)
.put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
.build();
internalCluster().startClusterManagerOnlyNode(nodeSettings);
internalCluster().startDataOnlyNode(nodeSettings);
Settings indexSettings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, indexLevelReplication).build();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME, indexSettings));
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}

public void testReplicationTypesOverrideNotAllowed_WithTemplates() {
// Generate mutually exclusive replication strategies at cluster and index level
List<ReplicationType> replicationStrategies = getRandomReplicationTypesAsList();
ReplicationType clusterLevelReplication = replicationStrategies.get(0);
ReplicationType templateReplicationType = replicationStrategies.get(1);
Settings nodeSettings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, clusterLevelReplication)
.put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
.build();
internalCluster().startClusterManagerOnlyNode(nodeSettings);
internalCluster().startDataOnlyNode(nodeSettings);
internalCluster().startDataOnlyNode(nodeSettings);
logger.info(
"--> Create index with template replication {} and cluster level replication {}",
templateReplicationType,
clusterLevelReplication
);
// Create index template
client().admin()
.indices()
.preparePutTemplate("template_1")
.setPatterns(Collections.singletonList("test-idx*"))
.setSettings(Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, templateReplicationType).build())
.setOrder(0)
.get();

GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
assertThat(response.getIndexTemplates(), hasSize(1));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME));
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}

public void testReplicationTypesOverrideNotAllowed_WithResizeAction() {
// Generate mutually exclusive replication strategies at cluster and index level
List<ReplicationType> replicationStrategies = getRandomReplicationTypesAsList();
ReplicationType clusterLevelReplication = replicationStrategies.get(0);
ReplicationType indexLevelReplication = replicationStrategies.get(1);
Settings nodeSettings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, clusterLevelReplication)
.put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
.build();
internalCluster().startClusterManagerOnlyNode(nodeSettings);
internalCluster().startDataOnlyNode(nodeSettings);
internalCluster().startDataOnlyNode(nodeSettings);
logger.info(
"--> Create index with index level replication {} and cluster level replication {}",
indexLevelReplication,
clusterLevelReplication
);

// Define resize action and target shard count.
List<Tuple<ResizeType, Integer>> resizeActionsList = new ArrayList<>();
final int initialShardCount = 2;
resizeActionsList.add(new Tuple<>(ResizeType.SPLIT, 2 * initialShardCount));
resizeActionsList.add(new Tuple<>(ResizeType.SHRINK, SHARD_COUNT));
resizeActionsList.add(new Tuple<>(ResizeType.CLONE, initialShardCount));

Tuple<ResizeType, Integer> resizeActionTuple = resizeActionsList.get(random().nextInt(resizeActionsList.size()));
final String targetIndexName = resizeActionTuple.v1().name().toLowerCase(Locale.ROOT) + "-target";

logger.info("--> Performing resize action {} with shard count {}", resizeActionTuple.v1(), resizeActionTuple.v2());

Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, initialShardCount)
.put(SETTING_REPLICATION_TYPE, clusterLevelReplication)
.build();
createIndex(INDEX_NAME, indexSettings);

// Block writes
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// Validate resize action fails
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.indices()
.prepareResizeIndex(INDEX_NAME, targetIndexName)
.setResizeType(resizeActionTuple.v1())
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", resizeActionTuple.v2())
.putNull("index.blocks.write")
.put(SETTING_REPLICATION_TYPE, indexLevelReplication)
.build()
)
.get()
);
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}

/**
* Generate a list of ReplicationType with random ordering
*
* @return List of ReplicationType values
*/
private List<ReplicationType> getRandomReplicationTypesAsList() {
List<ReplicationType> replicationStrategies = List.of(ReplicationType.SEGMENT, ReplicationType.DOCUMENT);
int randomReplicationIndex = random().nextInt(replicationStrategies.size());
ReplicationType clusterLevelReplication = replicationStrategies.get(randomReplicationIndex);
ReplicationType indexLevelReplication = replicationStrategies.get(1 - randomReplicationIndex);
return List.of(clusterLevelReplication, indexLevelReplication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -47,6 +48,9 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase
private static final String REPOSITORY_NAME = "test-segrep-repo";
private static final String SNAPSHOT_NAME = "test-segrep-snapshot";

protected static final String REPLICATION_MISMATCH_VALIDATION_ERROR =
"Validation Failed: 1: index setting [index.replication.type] is not allowed to be set as [cluster.index.restrict.replication.type=true];";

public Settings segRepEnableIndexSettings() {
return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
}
Expand Down Expand Up @@ -306,4 +310,63 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
}

/**
* 1. Create index in DOCUMENT replication type
* 2. Snapshot index
* 3. Add new set of nodes with `cluster.indices.replication.strategy` set to SEGMENT and `cluster.index.restrict.replication.type`
* set to true.
* 4. Perform restore on new set of nodes to validate restored index has `DOCUMENT` replication.
*/
public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
final int documentCount = scaledRandomIntBetween(1, 10);
String originalClusterManagerNode = internalCluster().startClusterManagerOnlyNode();

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startDataOnlyNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < documentCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}

createSnapshot();

// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));

// Start new set of nodes with cluster level replication type setting and restrict replication type setting.
Settings settings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
.build();

// Start new cluster manager node
String newClusterManagerNode = internalCluster().startClusterManagerOnlyNode(settings);

// Remove older nodes
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(originalClusterManagerNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));

String newPrimaryNode = internalCluster().startDataOnlyNode(settings);
String newReplicaNode = internalCluster().startDataOnlyNode(settings);

// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null));
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,10 @@ static Settings aggregateIndexSettings(
);
}

List<String> validationErrors = new ArrayList<>();
validateIndexReplicationTypeSettings(indexSettingsBuilder.build(), clusterSettings).ifPresent(validationErrors::add);
validateErrors(request.index(), validationErrors);

Settings indexSettings = indexSettingsBuilder.build();
/*
* We can not validate settings until we have applied templates, otherwise we do not know the actual settings
Expand Down Expand Up @@ -1246,7 +1250,11 @@ private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState
public void validateIndexSettings(String indexName, final Settings settings, final boolean forbidPrivateIndexSettings)
throws IndexCreationException {
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings, indexName);
validateIndexReplicationTypeSettings(settings, clusterService.getClusterSettings()).ifPresent(validationErrors::add);
validateErrors(indexName, validationErrors);
}

private static void validateErrors(String indexName, List<String> validationErrors) {
if (validationErrors.isEmpty() == false) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
Expand Down Expand Up @@ -1322,6 +1330,27 @@ private static List<String> validateIndexCustomPath(Settings settings, @Nullable
return validationErrors;
}

/**
* Validates {@code index.replication.type} is matches with cluster level setting {@code cluster.indices.replication.strategy}
* when {@code cluster.index.restrict.replication.type} is set to true.
*
* @param requestSettings settings passed in during index create request
* @param clusterSettings cluster setting
*/
private static Optional<String> validateIndexReplicationTypeSettings(Settings requestSettings, ClusterSettings clusterSettings) {
if (clusterSettings.get(IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING)
&& requestSettings.hasValue(SETTING_REPLICATION_TYPE)
&& requestSettings.get(INDEX_REPLICATION_TYPE_SETTING.getKey())
.equals(clusterSettings.get(CLUSTER_REPLICATION_TYPE_SETTING).name()) == false) {
return Optional.of(
"index setting [index.replication.type] is not allowed to be set as ["
+ IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey()
+ "=true]"
);
}
return Optional.empty();
}

/**
* Validates the settings and mappings for shrinking an index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ public void apply(Settings value, Settings current, Settings previous) {
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING
)
)
);
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

/**
* If enabled, this setting enforces that indexes will be created with a replication type matching the cluster setting
* defined in cluster.indices.replication.strategy by rejecting any request that specifies a replication type that
* does not match the cluster setting. If disabled, a user can choose a replication type on a per-index basis using
* the index.replication.type setting.
*/
public static final Setting<Boolean> CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING = Setting.boolSetting(
"cluster.index.restrict.replication.type",
false,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down
Loading

0 comments on commit 6a6ab32

Please sign in to comment.