Skip to content

Commit

Permalink
Integrate with CPU admission controller for cluster-manager Read API'…
Browse files Browse the repository at this point in the history
…s. (#12496) (#12829)

* Integrate with CPU admission controller for cluster-manager Read API's.
The admission control is enforced at the transport layer.

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv authored Mar 21, 2024
1 parent 8e8d0c9 commit 7139740
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710))
- Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435))
- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/))
- Integrate with admission controller for cluster-manager Read API. ([#12496](https://github.com/opensearch-project/OpenSearch/pull/12496))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.25.1 to 2.37.1 ([#12289](https://github.com/opensearch-project/OpenSearch/pull/12289), [#12365](https://github.com/opensearch-project/OpenSearch/pull/12365))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.node.IoUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.rest.AbstractRestChannel;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.junit.Before;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {

private static final Logger LOGGER = LogManager.getLogger(AdmissionForClusterManagerIT.class);

public static final String INDEX_NAME = "test_index";

private String clusterManagerNodeId;
private String datanode;
private ResourceUsageCollectorService cMResourceCollector;

private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder()
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
.build();

private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder()
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()
);
datanode = internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build());

ensureClusterSizeConsistency();
ensureGreen();

// Disable the automatic resource collection
clusterManagerNodeId = internalCluster().clusterService(clusterManagerNode).localNode().getId();
cMResourceCollector = internalCluster().getClusterManagerNodeInstance(ResourceUsageCollectorService.class);
cMResourceCollector.stop();

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
}

public void testAdmissionControlEnforced() throws Exception {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
fail("expected failure");
} catch (Exception e) {
assertTrue(e instanceof OpenSearchRejectedExecutionException);
assertTrue(e.getMessage().contains("CPU usage admission controller rejected the request"));
assertTrue(e.getMessage().contains("[indices:admin/aliases/get]"));
assertTrue(e.getMessage().contains("action-type [CLUSTER_ADMIN]"));
}

client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));

AdmissionControlService admissionControlServiceCM = internalCluster().getClusterManagerNodeInstance(AdmissionControlService.class);

AdmissionControllerStats admissionStats = getAdmissionControlStats(admissionControlServiceCM).get(
CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER
);

assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1);
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()));
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()));
}

public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException {
// CPU usage is less than threshold 50%
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));
}

public void testAdmissionControlMonitorOnBreach() throws InterruptedException {
admissionControlDisabledOnBreach(
Settings.builder().put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()).build()
);
}

public void testAdmissionControlDisabledOnBreach() throws InterruptedException {
admissionControlDisabledOnBreach(DISABLE_ADMISSION_CONTROL);
}

public void admissionControlDisabledOnBreach(Settings admission) throws InterruptedException {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(admission).execute().actionGet();

cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());

// Read API on ClusterManager
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));

}

public void testAdmissionControlResponseStatus() throws Exception {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager
FakeRestRequest aliasesRequest = new FakeRestRequest();
aliasesRequest.params().put("name", "alias1");
CountDownLatch waitForResponse = new CountDownLatch(1);
AtomicReference<RestResponse> aliasResponse = new AtomicReference<>();
AbstractRestChannel channel = new AbstractRestChannel(aliasesRequest, true) {

@Override
public void sendResponse(RestResponse response) {
waitForResponse.countDown();
aliasResponse.set(response);
}
};

RestGetAliasesAction restHandler = internalCluster().getInstance(RestGetAliasesAction.class, datanode);
restHandler.handleRequest(aliasesRequest, channel, internalCluster().getInstance(NodeClient.class, datanode));

waitForResponse.await();
assertEquals(RestStatus.TOO_MANY_REQUESTS, aliasResponse.get().status());
}

@Override
public void tearDown() throws Exception {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();
super.tearDown();
}

Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlService admissionControlService) {
Map<String, AdmissionControllerStats> acStats = new HashMap<>();
for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) {
acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
}
return acStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected HandledTransportAction(
Writeable.Reader<Request> requestReader,
String executor
) {
this(actionName, true, transportService, actionFilters, requestReader, executor);
this(actionName, true, null, transportService, actionFilters, requestReader, executor);
}

protected HandledTransportAction(
Expand All @@ -75,19 +76,49 @@ protected HandledTransportAction(
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader
) {
this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
this(actionName, canTripCircuitBreaker, null, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
}

protected HandledTransportAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader
) {
this(
actionName,
canTripCircuitBreaker,
admissionControlActionType,
transportService,
actionFilters,
requestReader,
ThreadPool.Names.SAME
);
}

protected HandledTransportAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
String executor
) {
super(actionName, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler());

transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
admissionControlActionType,
requestReader,
new TransportHandler()
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected TransportClusterManagerNodeAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this(actionName, true, null, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
}

protected TransportClusterManagerNodeAction(
Expand All @@ -118,7 +119,31 @@ protected TransportClusterManagerNodeAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, canTripCircuitBreaker, transportService, actionFilters, request);
this(
actionName,
canTripCircuitBreaker,
null,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeAction(
String actionName,
boolean canTripCircuitBreaker,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, canTripCircuitBreaker, admissionControlActionType, transportService, actionFilters, request);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -59,12 +60,46 @@ protected TransportClusterManagerNodeReadAction(
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this(
actionName,
true,
AdmissionControlActionType.CLUSTER_ADMIN,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeReadAction(
String actionName,
boolean checkSizeLimit,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
actionName,
checkSizeLimit,
null,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver
);
}

protected TransportClusterManagerNodeReadAction(
String actionName,
boolean checkSizeLimit,
AdmissionControlActionType admissionControlActionType,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
Expand All @@ -75,6 +110,7 @@ protected TransportClusterManagerNodeReadAction(
super(
actionName,
checkSizeLimit,
admissionControlActionType,
transportService,
clusterService,
threadPool,
Expand Down
Loading

0 comments on commit 7139740

Please sign in to comment.