Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Introduce cluster level setting cluster.index.restrict.replication.type to prevent replication type setting override during index creations #11583

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BWC and API enforcement] Introduce checks for enforcing the API restrictions ([#11175](https://github.com/opensearch-project/OpenSearch/pull/11175))
- Create separate transport action for render search template action ([#11170](https://github.com/opensearch-project/OpenSearch/pull/11170))
- Add additional handling in SearchTemplateRequest when simulate is set to true ([#11591](https://github.com/opensearch-project/OpenSearch/pull/11591))
- Introduce cluster level setting `cluster.force.index.replication.type` to prevent replication type setting override during index creations([#11583](https://github.com/opensearch-project/OpenSearch/pull/11583))

### Dependencies
- Bump Lucene from 9.7.0 to 9.8.0 ([10276](https://github.com/opensearch-project/OpenSearch/pull/10276))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,31 @@

import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.shrink.ResizeType;
import org.opensearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.hasSize;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase {
Expand All @@ -29,6 +44,9 @@ public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

protected static final String REPLICATION_MISMATCH_VALIDATION_ERROR =
"Validation Failed: 1: index setting [index.replication.type] is not allowed to be set as [cluster.force.index.replication.type=true];";

@Override
public Settings indexSettings() {
return Settings.builder()
Expand All @@ -44,14 +62,6 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Exception {
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final String ANOTHER_INDEX = "test-index";
Expand Down Expand Up @@ -123,4 +133,160 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
}

public void testDifferentReplicationTypes_CreateIndex_StrictMode() throws Throwable {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
final int documentCount = scaledRandomIntBetween(1, 10);
BiConsumer<List<ReplicationType>, List<String>> consumer = (replicationList, dataNodesList) -> {
Settings indexSettings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, replicationList.get(1)).build();
createIndex(INDEX_NAME, indexSettings);
};
executeTest(true, consumer, INDEX_NAME, documentCount);
}

public void testDifferentReplicationTypes_IndexTemplates_StrictMode() throws Throwable {
final int documentCount = scaledRandomIntBetween(1, 10);

BiConsumer<List<ReplicationType>, List<String>> consumer = (replicationList, dataNodesList) -> {
client().admin()
.indices()
.preparePutTemplate("template_1")
.setPatterns(Collections.singletonList("test-idx*"))
.setSettings(Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build())
.setOrder(0)
.get();

GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
assertThat(response.getIndexTemplates(), hasSize(1));

createIndex(INDEX_NAME);
};
executeTest(true, consumer, INDEX_NAME, documentCount);
}

public void testMismatchingReplicationType_ResizeAction_StrictMode() throws Throwable {
// Define resize action and target shard count.
List<Tuple<ResizeType, Integer>> resizeActionsList = new ArrayList<>();
final int initialShardCount = 2;
resizeActionsList.add(new Tuple<>(ResizeType.SPLIT, 2 * initialShardCount));
resizeActionsList.add(new Tuple<>(ResizeType.SHRINK, SHARD_COUNT));
resizeActionsList.add(new Tuple<>(ResizeType.CLONE, initialShardCount));

Tuple<ResizeType, Integer> resizeActionTuple = resizeActionsList.get(random().nextInt(resizeActionsList.size()));
final String targetIndexName = resizeActionTuple.v1().name().toLowerCase(Locale.ROOT) + "-target";
final int documentCount = scaledRandomIntBetween(1, 10);

logger.info("--> Performing resize action {} with shard count {}", resizeActionTuple.v1(), resizeActionTuple.v2());

// Create an index using current cluster level settings
BiConsumer<List<ReplicationType>, List<String>> consumer = (replicationList, dataNodesList) -> {
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, initialShardCount)
.put(SETTING_REPLICATION_TYPE, replicationList.get(0))
.build();
createIndex(INDEX_NAME, indexSettings);

for (int i = 0; i < documentCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
}

// Block writes
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put("index.blocks.write", true))
.get();
ensureGreen();

try {
for (String node : dataNodesList) {
assertBusy(() -> {
assertHitCount(
client(node).prepareSearch(INDEX_NAME).setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(),
documentCount
);
}, 30, TimeUnit.SECONDS);
}
} catch (Exception e) {
throw new RuntimeException(e);
}

// Test create index fails
client().admin()
.indices()
.prepareResizeIndex(INDEX_NAME, targetIndexName)
.setResizeType(resizeActionTuple.v1())
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", resizeActionTuple.v2())
.putNull("index.blocks.write")
.put(SETTING_REPLICATION_TYPE, replicationList.get(1))
.build()
)
.get();
};
executeTest(true, consumer, targetIndexName, documentCount);
}

// Creates a cluster with mis-matching cluster level and index level replication strategies and validates that index
// creation fails when cluster level setting `cluster.force.index.replication.type` is set to true and creation goes
// through when it is false.
private void executeTest(
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
boolean restrictIndexLevelReplicationTypeSetting,
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
BiConsumer<List<ReplicationType>, List<String>> createIndexRunnable,
String targetIndexName,
int documentCount
) throws Throwable {
// Generate mutually exclusive replication strategies at cluster and index level
List<ReplicationType> replicationStrategies = getRandomReplicationTypesAsList();
ReplicationType clusterLevelReplication = replicationStrategies.get(0);
ReplicationType indexLevelReplication = replicationStrategies.get(1);

Settings settings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, clusterLevelReplication)
.put(CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING.getKey(), restrictIndexLevelReplicationTypeSetting)
.build();
internalCluster().startClusterManagerOnlyNode(settings);
final String dataNodeOne = internalCluster().startDataOnlyNode(settings);
final String dataNodeTwo = internalCluster().startDataOnlyNode(settings);
List<String> dataNodes = Arrays.asList(dataNodeOne, dataNodeTwo);

logger.info(
"--> Create index with cluster level setting {} and index level replication setting {}",
clusterLevelReplication,
indexLevelReplication
);

if (restrictIndexLevelReplicationTypeSetting) {
try {
createIndexRunnable.accept(replicationStrategies, dataNodes);
} catch (IllegalArgumentException exception) {
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}
} else {
createIndexRunnable.accept(replicationStrategies, dataNodes);
ensureGreen(targetIndexName);
for (String node : dataNodes) {
assertBusy(() -> {
assertHitCount(
client(node).prepareSearch(targetIndexName).setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(),
documentCount
);
});
}
}
}

/**
* Generate a list of ReplicationType with random ordering
*
* @return List of ReplicationType values
*/
private List<ReplicationType> getRandomReplicationTypesAsList() {
List<ReplicationType> replicationStrategies = List.of(ReplicationType.SEGMENT, ReplicationType.DOCUMENT);
int randomReplicationIndex = random().nextInt(replicationStrategies.size());
ReplicationType clusterLevelReplication = replicationStrategies.get(randomReplicationIndex);
ReplicationType indexLevelReplication = replicationStrategies.get(1 - randomReplicationIndex);
return List.of(clusterLevelReplication, indexLevelReplication);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -47,6 +48,9 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase
private static final String REPOSITORY_NAME = "test-segrep-repo";
private static final String SNAPSHOT_NAME = "test-segrep-snapshot";

protected static final String REPLICATION_MISMATCH_VALIDATION_ERROR =
"Validation Failed: 1: index setting [index.replication.type] is not allowed to be set as [cluster.force.index.replication.type=true];";

public Settings segRepEnableIndexSettings() {
return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
}
Expand Down Expand Up @@ -306,4 +310,63 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
}

/**
* 1. Create index in DOCUMENT replication type
* 2. Snapshot index
* 3. Add new set of nodes with `cluster.indices.replication.strategy` set to SEGMENT and `cluster.force.index.replication.type`
* set to true.
* 4. Perform restore on new set of nodes to validate restored index has `DOCUMENT` replication.
*/
public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
final int documentCount = scaledRandomIntBetween(1, 10);
String originalClusterManagerNode = internalCluster().startClusterManagerOnlyNode();

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startDataOnlyNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < documentCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}

createSnapshot();

// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));

// Start new set of nodes with cluster level replication type setting and restrict replication type setting.
Settings settings = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING.getKey(), true)
.build();

// Start new cluster manager node
String newClusterManagerNode = internalCluster().startClusterManagerOnlyNode(settings);

// Remove older nodes
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(originalClusterManagerNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));

String newPrimaryNode = internalCluster().startDataOnlyNode(settings);
String newReplicaNode = internalCluster().startDataOnlyNode(settings);

// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null));
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,10 @@ static Settings aggregateIndexSettings(
);
}

List<String> validationErrors = new ArrayList<>();
validateIndexReplicationTypeSettings(indexSettingsBuilder.build(), clusterSettings).ifPresent(validationErrors::add);
validateErrors(request.index(), validationErrors);

Settings indexSettings = indexSettingsBuilder.build();
/*
* We can not validate settings until we have applied templates, otherwise we do not know the actual settings
Expand Down Expand Up @@ -1246,7 +1250,11 @@ private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState
public void validateIndexSettings(String indexName, final Settings settings, final boolean forbidPrivateIndexSettings)
throws IndexCreationException {
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings, indexName);
validateIndexReplicationTypeSettings(settings, clusterService.getClusterSettings()).ifPresent(validationErrors::add);
validateErrors(indexName, validationErrors);
}

private static void validateErrors(String indexName, List<String> validationErrors) {
if (validationErrors.isEmpty() == false) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
Expand Down Expand Up @@ -1322,6 +1330,27 @@ private static List<String> validateIndexCustomPath(Settings settings, @Nullable
return validationErrors;
}

/**
* Validates {@code index.replication.type} is matches with cluster level setting {@code cluster.indices.replication.strategy}
* when {@code cluster.force.index.replication.type} is set to true.
*
* @param requestSettings settings passed in during index create request
* @param clusterSettings cluster setting
*/
private static Optional<String> validateIndexReplicationTypeSettings(Settings requestSettings, ClusterSettings clusterSettings) {
if (clusterSettings.get(IndicesService.CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING)
&& requestSettings.hasValue(SETTING_REPLICATION_TYPE)
&& requestSettings.get(INDEX_REPLICATION_TYPE_SETTING.getKey())
.equals(clusterSettings.get(CLUSTER_REPLICATION_TYPE_SETTING).name()) == false) {
return Optional.of(
"index setting [index.replication.type] is not allowed to be set as ["
+ IndicesService.CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING.getKey()
+ "=true]"
);
}
return Optional.empty();
}

/**
* Validates the settings and mappings for shrinking an index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ public void apply(Settings value, Settings current, Settings previous) {
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING
)
)
);
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

/**
* If enabled, this setting enforces that indexes will be created with a replication type matching the cluster setting
* defined in cluster.indices.replication.strategy by rejecting any request that specifies a replication type that
* does not match the cluster setting. If disabled, a user can choose a replication type on a per-index basis using
* the index.replication.type setting.
*/
public static final Setting<Boolean> CLUSTER_FORCE_INDEX_REPLICATION_TYPE_SETTING = Setting.boolSetting(
"cluster.force.index.replication.type",
false,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down
Loading
Loading