Skip to content

Commit

Permalink
Added support for feature flags in opensearch.yml (#4959) (#5941)
Browse files Browse the repository at this point in the history
This change introduces a static store "settings"
in FeatureFlags.java file to enable isEnabled method to
fetch flag settings defined in opensearch.yml.

Signed-off-by: Nagaraj Tantri <[email protected]>

Signed-off-by: Nagaraj Tantri <[email protected]>

Signed-off-by: Nagaraj Tantri <[email protected]>
Co-authored-by: Nagaraj Tantri <[email protected]>
  • Loading branch information
andrross and ntantri authored Jan 19, 2023
1 parent 77455fa commit cdb2b26
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

### Added
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))
Expand Down
23 changes: 23 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,26 @@ ${path.logs}
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.replication_type.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.remote_store.enabled: false
#
#
# Gates the functionality of a new parameter to the snapshot restore API
# that allows for creation of a new index type that searches a snapshot
# directly in a remote repository without restoring all index data to disk
# ahead of time.
#
#opensearch.experimental.feature.searchable_snapshot.enabled: false
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
Expand Down Expand Up @@ -66,11 +65,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
Expand All @@ -92,11 +86,16 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -123,7 +122,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +133,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -163,10 +162,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand Down Expand Up @@ -197,10 +196,13 @@ public void testCancelPrimaryAllocation() throws Exception {

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());

logger.info("--> creating test index ...");
prepareCreate(
Expand All @@ -223,7 +225,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
Expand Down Expand Up @@ -265,8 +267,8 @@ public void testAddNewReplicaFailure() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -306,8 +308,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
final String replica = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,8 +353,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -392,8 +394,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -437,11 +439,11 @@ public void testReplicationAfterForceMerge() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -496,7 +498,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -519,7 +521,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -534,8 +536,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -647,10 +649,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings());
final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings());
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
internalCluster().startDataOnlyNodes(6, featureFlagSettings());
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand All @@ -673,7 +675,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;

/**
* Encapsulates all valid feature flag level settings.
*
* @opensearch.internal
*/
public class FeatureFlagSettings extends AbstractScopedSettings {

protected FeatureFlagSettings(
Settings settings,
Set<Setting<?>> settingsSet,
Set<SettingUpgrader<?>> settingUpgraders,
Property scope
) {
super(settings, settingsSet, settingUpgraders, scope);
}

public static final Set<Setting<?>> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FeatureFlags.REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING,
FeatureFlags.EXTENSIONS_SETTING
)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public SettingsModule(
for (Setting<?> setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) {
registerSetting(setting);
}
for (Setting<?> setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.common.util;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;

/**
* Utility class to manage feature flags. Feature flags are system properties that must be set on the JVM.
* These are used to gate the visibility/availability of incomplete features. Fore more information, see
Expand Down Expand Up @@ -50,12 +54,39 @@ public class FeatureFlags {
*/
public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled";

/**
* Should store the settings from opensearch.yml.
*/
private static Settings settings;

/**
* This method is responsible to map settings from opensearch.yml to local stored
* settings value. That is used for the existing isEnabled method.
*
* @param openSearchSettings The settings stored in opensearch.yml.
*/
public static void initializeFeatureFlags(Settings openSearchSettings) {
settings = openSearchSettings;
}

/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
return true;
}
return settings != null && settings.getAsBoolean(featureFlagName, false);
}

public static final Setting<Boolean> REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope);

public static final Setting<Boolean> SEARCHABLE_SNAPSHOT_SETTING = Setting.boolSetting(SEARCHABLE_SNAPSHOT, false, Property.NodeScope);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ protected Node(

final Settings settings = pluginsService.updatedSettings();

// Ensure to initialize Feature Flags via the settings from opensearch.yml
FeatureFlags.initializeFeatureFlags(settings);

final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getRoles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -769,6 +770,20 @@ public Settings indexSettings() {
return builder.build();
}

/**
* Setting all feature flag settings at base IT, which can be overridden later by individual
* IT classes.
*
* @return Feature flag settings.
*/
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
return featureSettings.build();
}

/**
* Creates one or more indices and asserts that the indices are acknowledged. If one of the indices
* already exists this method will fail and wipe all the indices created so far.
Expand Down

0 comments on commit cdb2b26

Please sign in to comment.