Skip to content

Commit

Permalink
Integrate with CPU admission controller for cluster-manager Read API's.
Browse files Browse the repository at this point in the history
The admission control is enforced at the transport layer.
  • Loading branch information
rajiv-kv committed Feb 29, 2024
1 parent fb41756 commit 68968cf
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {

private static final Logger LOGGER = LogManager.getLogger(AdmissionForClusterManagerIT.class);

public static final String INDEX_NAME = "test_index";

private String clusterManagerNodeId;
private ResourceUsageCollectorService cMResourceCollector;

private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder()
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED)
.build();

private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder()
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
.put(CLUSTER_INFO_CPU_USAGE_LIMIT.getKey(), 50)
.build();

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()
);
internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build());

ensureClusterSizeConsistency();
ensureGreen();

// Disable the automatic resource collection
clusterManagerNodeId = internalCluster().clusterService(clusterManagerNode).localNode().getId();
cMResourceCollector = internalCluster().getClusterManagerNodeInstance(ResourceUsageCollectorService.class);
cMResourceCollector.stop();

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
}

public void testAdmissionControlEnforced() throws InterruptedException {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99);

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
fail("expected failure");
} catch (Exception e) {
assertTrue(e instanceof OpenSearchRejectedExecutionException);
}

client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();

GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));

AdmissionControlService admissionControlServicePrimary = internalCluster().getClusterManagerNodeInstance(
AdmissionControlService.class
);
AdmissionControllerStats admissionStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_INFO.getType()).longValue(), 1);
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()));
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()));
}

public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException {
// CPU usage is less than threshold 50%
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35);

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));
}

public void testAdmissionControlDisabledOnBreach() throws InterruptedException {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();

cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97);

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));

}

@Override
public void tearDown() throws Exception {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();
super.tearDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected HandledTransportAction(
Writeable.Reader<Request> requestReader,
String executor
) {
this(actionName, true, transportService, actionFilters, requestReader, executor);
this(actionName, true, null, transportService, actionFilters, requestReader, executor);
}

protected HandledTransportAction(
Expand All @@ -75,19 +76,59 @@ protected HandledTransportAction(
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader
) {
this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
this(actionName, canTripCircuitBreaker, null, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
}

protected HandledTransportAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader
) {
this(
actionName,
canTripCircuitBreaker,
admissionControlActionType,
transportService,
actionFilters,
requestReader,
ThreadPool.Names.SAME
);
}

protected HandledTransportAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
String executor
) {
super(actionName, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler());

if (admissionControlActionType != null) {
transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
admissionControlActionType,
requestReader,
new TransportHandler()
);
} else {
transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
requestReader,
new TransportHandler()
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -97,7 +98,7 @@ protected TransportClusterManagerNodeAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this(actionName, true, null, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
}

protected TransportClusterManagerNodeAction(
Expand All @@ -110,7 +111,31 @@ protected TransportClusterManagerNodeAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, canTripCircuitBreaker, transportService, actionFilters, request);
this(
actionName,
canTripCircuitBreaker,
null,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, canTripCircuitBreaker, admissionControlActionType, transportService, actionFilters, request);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -59,12 +60,46 @@ protected TransportClusterManagerNodeReadAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this(
actionName,
true,
AdmissionControlActionType.CLUSTER_INFO,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeReadAction(
String actionName,
boolean checkSizeLimit,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
actionName,
checkSizeLimit,
null,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeReadAction(
String actionName,
boolean checkSizeLimit,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
Expand All @@ -75,6 +110,7 @@ protected TransportClusterManagerNodeReadAction(
super(
actionName,
checkSizeLimit,
admissionControlActionType,
transportService,
clusterService,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ public void apply(Settings value, Settings current, Settings previous) {
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,

// Concurrent segment search settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp
throw new OpenSearchRejectedExecutionException(
String.format(
Locale.ROOT,
"CPU usage admission controller rejected the request for action [%s] as CPU limit reached",
"CPU usage admission controller rejected the request for action [%s] as CPU limit reached for action-type [%s]",
actionName,
admissionControlActionType.name()
)
);
Expand Down Expand Up @@ -112,6 +113,8 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro
return this.settings.getSearchCPULimit();
case INDEXING:
return this.settings.getIndexingCPULimit();
case CLUSTER_INFO:
return this.settings.getClusterInfoCPULimit();
default:
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
public enum AdmissionControlActionType {
INDEXING("indexing"),
SEARCH("search");
SEARCH("search"),
CLUSTER_INFO("cluster_info");

private final String type;

Expand All @@ -38,6 +39,8 @@ public static AdmissionControlActionType fromName(String name) {
return INDEXING;
case "search":
return SEARCH;
case "cluster_info":
return CLUSTER_INFO;
default:
throw new IllegalArgumentException("Not Supported TransportAction Type: " + name);
}
Expand Down
Loading

0 comments on commit 68968cf

Please sign in to comment.