Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.x] Revert "Integrate IO Based AdmissionController to AdmissionControl Fr… #12671

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase {

public static final Settings settings = Settings.builder()
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
.put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0)
.put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0)
.build();

private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class);

public static final String INDEX_NAME = "test_index";

@Before
public void init() {
assertAcked(
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
);
ensureGreen(INDEX_NAME);
}

@After
public void cleanup() {
client().admin().indices().prepareDelete(INDEX_NAME).get();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build();
}

public void testAdmissionControlRejectionOnEnforced() {
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
String primaryName = primaryReplicaNodeNames.v1();
String replicaName = primaryReplicaNodeNames.v2();
String coordinatingOnlyNode = getCoordinatingOnlyNode();
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 3; ++i) {
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
bulkRequest.add(request);
}
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
.getAdmissionControllerStatsList()
.get(0);
assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1);
Arrays.stream(res.getItems()).forEach(bulkItemResponse -> {
assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException"));
});
SearchResponse searchResponse;
try {
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
} catch (Exception exception) {
assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
}
AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1);
}

public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException {
String coordinatingOnlyNode = getCoordinatingOnlyNode();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();

updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.ENFORCED.getMode()
)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
NodesStatsRequest.Metric.JVM.metricName(),
NodesStatsRequest.Metric.OS.metricName(),
NodesStatsRequest.Metric.FS.metricName(),
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName()
);
NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet();
assertEquals(200, clusterHealthResponse.status().getStatus());
assertFalse(nodesStatsResponse.hasFailures());
}

public void testAdmissionControlRejectionOnMonitor() {
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
String primaryName = primaryReplicaNodeNames.v1();
String replicaName = primaryReplicaNodeNames.v2();
String coordinatingOnlyNode = getCoordinatingOnlyNode();

AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();

updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.MONITOR.getMode()
)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 3; ++i) {
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
bulkRequest.add(request);
}
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertFalse(res.hasFailures());
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
.getAdmissionControllerStatsList()
.get(0);
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
.getAdmissionControllerStatsList()
.get(0);
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
AdmissionControlActionType.INDEXING.getType(),
new AtomicLong(0).longValue()
);
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
AdmissionControlActionType.INDEXING.getType(),
new AtomicLong(0).longValue()
);
assertEquals(primaryRejectionCount, 1);
assertEquals(replicaRejectionCount, 0);
SearchResponse searchResponse;
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1);
assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1);
}

public void testAdmissionControlRejectionOnDisabled() {
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
String primaryName = primaryReplicaNodeNames.v1();
String replicaName = primaryReplicaNodeNames.v2();
String coordinatingOnlyNode = getCoordinatingOnlyNode();

AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();

updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.DISABLED.getMode()
)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 3; ++i) {
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
bulkRequest.add(request);
}
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
assertFalse(res.hasFailures());
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
.getAdmissionControllerStatsList()
.get(0);
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
.getAdmissionControllerStatsList()
.get(0);
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
AdmissionControlActionType.INDEXING.getType(),
new AtomicLong(0).longValue()
);
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
AdmissionControlActionType.INDEXING.getType(),
new AtomicLong(0).longValue()
);
assertEquals(primaryRejectionCount, 0);
assertEquals(replicaRejectionCount, 0);
SearchResponse searchResponse;
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0);
}

private Tuple<String, String> getPrimaryReplicaNodeNames(String indexName) {
IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get();
String primaryId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(ShardRouting::primary)
.findAny()
.get()
.currentNodeId();
String replicaId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(sr -> sr.primary() == false)
.findAny()
.get()
.currentNodeId();
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
String primaryName = nodes.get(primaryId).getName();
String replicaName = nodes.get(replicaId).getName();
return new Tuple<>(primaryName, replicaName);
}

private String getCoordinatingOnlyNode() {
return client().admin()
.cluster()
.prepareState()
.get()
.getState()
.nodes()
.getCoordinatingOnlyNodes()
.values()
.iterator()
.next()
.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -713,10 +712,7 @@ public void apply(Settings value, Settings current, Settings previous) {
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.node.resource.tracker;

import org.apache.lucene.util.Constants;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -70,9 +69,6 @@ public IoUsageStats getIoUsageStats() {
* Checks if all of the resource usage trackers are ready
*/
public boolean isReady() {
if (Constants.LINUX) {
return memoryUsageTracker.isReady() && cpuUsageTracker.isReady() && ioUsageTracker.isReady();
}
return memoryUsageTracker.isReady() && cpuUsageTracker.isReady();
}

Expand Down
Loading
Loading