diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e615018e8fea..b6ec71503a7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) - Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) - [Metrics Framework] Adds support for Histogram metric ([#12062](https://github.com/opensearch-project/OpenSearch/pull/12062)) +- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) +- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java new file mode 100644 index 0000000000000..0af3d31f9e846 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java @@ -0,0 +1,292 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.UUIDs; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) +public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase { + + public static final Settings settings = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + + private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class); + + public static final String INDEX_NAME = "test_index"; + + @Before + public void init() { + assertAcked( + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ); + ensureGreen(INDEX_NAME); + } + + @After + public void cleanup() { + client().admin().indices().prepareDelete(INDEX_NAME).get(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build(); + } + + public void testAdmissionControlRejectionOnEnforced() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1); + Arrays.stream(res.getItems()).forEach(bulkItemResponse -> { + assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException")); + }); + SearchResponse searchResponse; + try { + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + } catch (Exception exception) { + assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1); + } + + public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException { + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.clear() + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName() + ); + NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet(); + assertEquals(200, clusterHealthResponse.status().getStatus()); + assertFalse(nodesStatsResponse.hasFailures()); + } + + public void testAdmissionControlRejectionOnMonitor() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 1); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); + assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1); + } + + public void testAdmissionControlRejectionOnDisabled() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.DISABLED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 0); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0); + } + + private Tuple getPrimaryReplicaNodeNames(String indexName) { + IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + return new Tuple<>(primaryName, replicaName); + } + + private String getCoordinatingOnlyNode() { + return client().admin() + .cluster() + .prepareState() + .get() + .getState() + .nodes() + .getCoordinatingOnlyNodes() + .values() + .iterator() + .next() + .getName(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 58801c660e03f..0e8e12d65b892 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -59,6 +59,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; @@ -155,6 +156,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private RepositoriesStats repositoriesStats; + @Nullable + private AdmissionControlStats admissionControlStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -238,6 +242,12 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); + } else { + admissionControlStats = null; + } } public NodeStats( @@ -267,7 +277,8 @@ public NodeStats( @Nullable TaskCancellationStats taskCancellationStats, @Nullable SearchPipelineStats searchPipelineStats, @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, - @Nullable RepositoriesStats repositoriesStats + @Nullable RepositoriesStats repositoriesStats, + @Nullable AdmissionControlStats admissionControlStats ) { super(node); this.timestamp = timestamp; @@ -296,6 +307,7 @@ public NodeStats( this.searchPipelineStats = searchPipelineStats; this.segmentReplicationRejectionStats = segmentReplicationRejectionStats; this.repositoriesStats = repositoriesStats; + this.admissionControlStats = admissionControlStats; } public long getTimestamp() { @@ -448,6 +460,11 @@ public RepositoriesStats getRepositoriesStats() { return repositoriesStats; } + @Nullable + public AdmissionControlStats getAdmissionControlStats() { + return admissionControlStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -506,6 +523,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(repositoriesStats); } + + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + out.writeOptionalWriteable(admissionControlStats); + } } @Override @@ -605,6 +626,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getRepositoriesStats() != null) { getRepositoriesStats().toXContent(builder, params); } + if (getAdmissionControlStats() != null) { + getAdmissionControlStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 57ad1da062129..27d20c9775091 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -249,7 +249,8 @@ public enum Metric { SEARCH_PIPELINE("search_pipeline"), RESOURCE_USAGE_STATS("resource_usage_stats"), SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), - REPOSITORIES("repositories"); + REPOSITORIES("repositories"), + ADMISSION_CONTROL("admission_control"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index f93dd50e28a72..2363b3583fc25 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -127,7 +127,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), - NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics) + NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), + NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 58ff1a5741069..144080bb581a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 6adffd34adf78..a7a13afd2597c 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -98,6 +98,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; @@ -180,7 +181,8 @@ public TransportShardBulkAction( false, indexingPressureService, systemIndices, - tracer + tracer, + AdmissionControlActionType.INDEXING ); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index a25ff0a0201a1..68194e0ee6d12 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -47,6 +47,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; import org.opensearch.search.dfs.DfsSearchResult; @@ -548,6 +549,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( DFS_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase( request, @@ -562,6 +566,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -581,6 +588,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, QuerySearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -639,6 +649,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ThreadPool.Names.SAME, true, true, + AdmissionControlActionType.SEARCH, ShardFetchSearchRequest::new, (request, channel, task) -> { searchService.executeFetchPhase( diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index b68bd13cfed80..95f998e2d89c2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -82,6 +82,7 @@ import org.opensearch.indices.IndexClosedException; import org.opensearch.indices.IndicesService; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -134,6 +135,12 @@ public abstract class TransportReplicationAction< Setting.Property.NodeScope ); + /** + * Making primary and replica actions suffixes as constant + */ + public static final String PRIMARY_ACTION_SUFFIX = "[p]"; + public static final String REPLICA_ACTION_SUFFIX = "[r]"; + protected final ThreadPool threadPool; protected final TransportService transportService; protected final ClusterService clusterService; @@ -195,6 +202,40 @@ protected TransportReplicationAction( String executor, boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + requestReader, + replicaRequestReader, + executor, + syncGlobalCheckpointAfterOperation, + forceExecutionOnPrimary, + null + ); + } + + protected TransportReplicationAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader requestReader, + Writeable.Reader replicaRequestReader, + String executor, + boolean syncGlobalCheckpointAfterOperation, + boolean forceExecutionOnPrimary, + AdmissionControlActionType admissionControlActionType ) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; @@ -204,8 +245,8 @@ protected TransportReplicationAction( this.shardStateAction = shardStateAction; this.executor = executor; - this.transportPrimaryAction = actionName + "[p]"; - this.transportReplicaAction = actionName + "[r]"; + this.transportPrimaryAction = actionName + PRIMARY_ACTION_SUFFIX; + this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX; this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); @@ -213,14 +254,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - transportService.registerRequestHandler( - transportPrimaryAction, - executor, - forceExecutionOnPrimary, - true, - in -> new ConcreteShardRequest<>(requestReader, in), - this::handlePrimaryRequest - ); + // This method will register Primary Request Handler Based on AdmissionControlActionType + registerPrimaryRequestHandler(requestReader, admissionControlActionType); // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler( @@ -241,6 +276,38 @@ protected TransportReplicationAction( clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v); } + /** + * This method will register handler as based on admissionControlActionType and AdmissionControlHandler will be + * invoked for registered action + * @param requestReader instance of the request reader + * @param admissionControlActionType type of AdmissionControlActionType + */ + private void registerPrimaryRequestHandler( + Writeable.Reader requestReader, + AdmissionControlActionType admissionControlActionType + ) { + if (admissionControlActionType != null) { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + admissionControlActionType, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } else { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } + } + @Override protected void doExecute(Task task, Request request, ActionListener listener) { assert request.shardId() != null : "request shardId must be set"; diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 9ebfa8cfd0df8..27f9e6dee83de 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -59,6 +59,7 @@ import org.opensearch.index.translog.Translog.Location; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; @@ -104,7 +105,8 @@ protected TransportWriteAction( boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService, SystemIndices systemIndices, - Tracer tracer + Tracer tracer, + AdmissionControlActionType admissionControlActionType ) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. @@ -121,7 +123,8 @@ protected TransportWriteAction( replicaRequest, ThreadPool.Names.SAME, true, - forceExecutionOnPrimary + forceExecutionOnPrimary, + admissionControlActionType ); this.executorFunction = executorFunction; this.indexingPressureService = indexingPressureService; @@ -129,6 +132,43 @@ protected TransportWriteAction( this.tracer = tracer; } + protected TransportWriteAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader replicaRequest, + Function executorFunction, + boolean forceExecutionOnPrimary, + IndexingPressureService indexingPressureService, + SystemIndices systemIndices, + Tracer tracer + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + request, + replicaRequest, + executorFunction, + forceExecutionOnPrimary, + indexingPressureService, + systemIndices, + tracer, + null + ); + } + protected String executor(IndexShard shard) { return executorFunction.apply(shard); } diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 0734659d8ee72..2edf3967c61b0 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -55,6 +55,7 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.RawTaskStatus; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; @@ -131,7 +132,7 @@ public final class NetworkModule { private final Map> transportFactories = new HashMap<>(); private final Map> transportHttpFactories = new HashMap<>(); - private final List transportIntercetors = new ArrayList<>(); + private final List transportInterceptors = new ArrayList<>(); /** * Creates a network module that custom networking classes can be plugged into. @@ -149,7 +150,8 @@ public NetworkModule( NetworkService networkService, HttpServerTransport.Dispatcher dispatcher, ClusterSettings clusterSettings, - Tracer tracer + Tracer tracer, + List transportInterceptors ) { this.settings = settings; for (NetworkPlugin plugin : plugins) { @@ -180,14 +182,18 @@ public NetworkModule( for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); } - List transportInterceptors = plugin.getTransportInterceptors( + List pluginTransportInterceptors = plugin.getTransportInterceptors( namedWriteableRegistry, threadPool.getThreadContext() ); - for (TransportInterceptor interceptor : transportInterceptors) { + for (TransportInterceptor interceptor : pluginTransportInterceptors) { registerTransportInterceptor(interceptor); } } + // Adding last because interceptors are triggered from last to first order from the list + if (transportInterceptors != null) { + transportInterceptors.forEach(this::registerTransportInterceptor); + } } /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */ @@ -264,7 +270,7 @@ public Supplier getTransportSupplier() { * Registers a new {@link TransportInterceptor} */ private void registerTransportInterceptor(TransportInterceptor interceptor) { - this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null")); + this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null")); } /** @@ -272,7 +278,7 @@ private void registerTransportInterceptor(TransportInterceptor interceptor) { * @see #registerTransportInterceptor(TransportInterceptor) */ public TransportInterceptor getTransportInterceptor() { - return new CompositeTransportInterceptor(this.transportIntercetors); + return new CompositeTransportInterceptor(this.transportInterceptors); } static final class CompositeTransportInterceptor implements TransportInterceptor { @@ -295,6 +301,30 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + /** + * Intercept the transport action and perform admission control if applicable + * @param action The action the request handler is associated with + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param actualHandler The handler itself that implements the request handling + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @return returns the actual TransportRequestHandler after intercepting all previous handlers + * @param + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType + ) { + for (TransportInterceptor interceptor : this.transportInterceptors) { + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType); + } + return actualHandler; + } + @Override public AsyncSender interceptSender(AsyncSender sender) { for (TransportInterceptor interceptor : this.transportInterceptors) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 91ed8980e8a46..6b7fba30a2cd0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -136,6 +136,8 @@ import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.decider.EnableAssignmentDecider; import org.opensearch.plugins.PluginsService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; import org.opensearch.script.ScriptService; @@ -703,7 +705,12 @@ public void apply(Settings value, Settings current, Settings previous) { // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, - SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING, + IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, + AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 171ea2c80808c..f579f4bd4d3ae 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -198,6 +198,8 @@ import org.opensearch.plugins.SearchPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor; import org.opensearch.repositories.RepositoriesModule; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; @@ -896,6 +898,29 @@ protected Node( final RestController restController = actionModule.getRestController(); + final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( + threadPool, + settings, + clusterService.getClusterSettings() + ); + final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( + nodeResourceUsageTracker, + clusterService, + threadPool + ); + + final AdmissionControlService admissionControlService = new AdmissionControlService( + settings, + clusterService, + threadPool, + resourceUsageCollectorService + ); + + AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor( + admissionControlService + ); + + List transportInterceptors = List.of(admissionControlTransportInterceptor); final NetworkModule networkModule = new NetworkModule( settings, pluginsService.filterPlugins(NetworkPlugin.class), @@ -908,7 +933,8 @@ protected Node( networkService, restController, clusterService.getClusterSettings(), - tracer + tracer, + transportInterceptors ); Collection>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins( Plugin.class @@ -1090,16 +1116,6 @@ protected Node( transportService.getTaskManager(), taskCancellationMonitoringSettings ); - final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( - threadPool, - settings, - clusterService.getClusterSettings() - ); - final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( - nodeResourceUsageTracker, - clusterService, - threadPool - ); this.nodeService = new NodeService( settings, threadPool, @@ -1124,7 +1140,8 @@ protected Node( taskCancellationMonitoringService, resourceUsageCollectorService, segmentReplicationStatsTracker, - repositoryService + repositoryService, + admissionControlService ); final SearchService searchService = newSearchService( @@ -1186,6 +1203,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControlService.class).toInstance(admissionControlService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 325683b80394a..15cc8f3d20bb3 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.plugins.PluginsService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.AggregationUsageService; @@ -96,6 +97,7 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; private final RepositoriesService repositoriesService; + private final AdmissionControlService admissionControlService; private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; NodeService( @@ -122,7 +124,8 @@ public class NodeService implements Closeable { TaskCancellationMonitoringService taskCancellationMonitoringService, ResourceUsageCollectorService resourceUsageCollectorService, SegmentReplicationStatsTracker segmentReplicationStatsTracker, - RepositoriesService repositoriesService + RepositoriesService repositoriesService, + AdmissionControlService admissionControlService ) { this.settings = settings; this.threadPool = threadPool; @@ -147,6 +150,7 @@ public class NodeService implements Closeable { this.taskCancellationMonitoringService = taskCancellationMonitoringService; this.resourceUsageCollectorService = resourceUsageCollectorService; this.repositoriesService = repositoriesService; + this.admissionControlService = admissionControlService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; @@ -231,7 +235,8 @@ public NodeStats stats( boolean searchPipelineStats, boolean resourceUsageStats, boolean segmentReplicationTrackerStats, - boolean repositoriesStats + boolean repositoriesStats, + boolean admissionControl ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -262,7 +267,8 @@ public NodeStats stats( taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, - repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null + repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, + admissionControl ? this.admissionControlService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java new file mode 100644 index 0000000000000..adca6992833bd --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControlService { + private final ThreadPool threadPool; + public final AdmissionControlSettings admissionControlSettings; + private final ConcurrentMap admissionControllers; + private static final Logger logger = LogManager.getLogger(AdmissionControlService.class); + private final ClusterService clusterService; + private final Settings settings; + private final ResourceUsageCollectorService resourceUsageCollectorService; + + /** + * + * @param settings Immutable settings instance + * @param clusterService ClusterService Instance + * @param threadPool ThreadPool Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats + */ + public AdmissionControlService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + ResourceUsageCollectorService resourceUsageCollectorService + ) { + this.threadPool = threadPool; + this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + this.admissionControllers = new ConcurrentHashMap<>(); + this.clusterService = clusterService; + this.settings = settings; + this.resourceUsageCollectorService = resourceUsageCollectorService; + this.initialise(); + } + + /** + * Initialise and Register all the admissionControllers + */ + private void initialise() { + // Initialise different type of admission controllers + registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER); + } + + /** + * + * @param action Transport action name + * @param admissionControlActionType admissionControllerActionType value + */ + public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { + this.admissionControllers.forEach( + (name, admissionController) -> { admissionController.apply(action, admissionControlActionType); } + ); + } + + /** + * + * @param admissionControllerName admissionControllerName to register into the service. + */ + public void registerAdmissionController(String admissionControllerName) { + AdmissionController admissionController = this.controllerFactory(admissionControllerName); + this.admissionControllers.put(admissionControllerName, admissionController); + } + + /** + * @return AdmissionController Instance + */ + private AdmissionController controllerFactory(String admissionControllerName) { + switch (admissionControllerName) { + case CPU_BASED_ADMISSION_CONTROLLER: + return new CpuBasedAdmissionController( + admissionControllerName, + this.resourceUsageCollectorService, + this.clusterService, + this.settings + ); + default: + throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); + } + } + + /** + * + * @return list of the registered admissionControllers + */ + public List getAdmissionControllers() { + return new ArrayList<>(this.admissionControllers.values()); + } + + /** + * + * @param controllerName name of the admissionController + * @return instance of the AdmissionController Instance + */ + public AdmissionController getAdmissionController(String controllerName) { + return this.admissionControllers.getOrDefault(controllerName, null); + } + + /** + * Return admission control stats + */ + public AdmissionControlStats stats() { + List statsList = new ArrayList<>(); + if (this.admissionControllers.size() > 0) { + this.admissionControllers.forEach((controllerName, admissionController) -> { + AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController); + statsList.add(admissionControllerStats); + }); + return new AdmissionControlStats(statsList); + } + return null; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java new file mode 100644 index 0000000000000..b557190ab54ac --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +/** + * Settings related to admission control. + * @opensearch.internal + */ +public final class AdmissionControlSettings { + + /** + * Default parameters for the AdmissionControlSettings + */ + public static class Defaults { + public static final String MODE = "disabled"; + } + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting ADMISSION_CONTROL_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_control.transport.mode", + Defaults.MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile AdmissionControlMode transportLayeradmissionControlMode; + + /** + * @param clusterSettings clusterSettings Instance + * @param settings settings instance + */ + public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings) { + this.transportLayeradmissionControlMode = ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, this::setAdmissionControlTransportLayerMode); + } + + /** + * + * @param admissionControlMode update the mode of admission control feature + */ + private void setAdmissionControlTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayeradmissionControlMode = admissionControlMode; + } + + /** + * + * @return return the default mode of the admissionControl + */ + public AdmissionControlMode getAdmissionControlTransportLayerMode() { + return this.transportLayeradmissionControlMode; + } + + /** + * + * @return true based on the admission control feature is enforced else false + */ + public Boolean isTransportLayerAdmissionControlEnforced() { + return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED; + } + + /** + * + * @return true based on the admission control feature is enabled else false + */ + public Boolean isTransportLayerAdmissionControlEnabled() { + return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java new file mode 100644 index 0000000000000..2246ce34dd399 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Abstract class for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public abstract class AdmissionController { + + private final String admissionControllerName; + final ResourceUsageCollectorService resourceUsageCollectorService; + public final Map rejectionCountMap; + public final ClusterService clusterService; + + /** + * @param admissionControllerName name of the admissionController + * @param resourceUsageCollectorService instance used to get resource usage stats of the node + * @param clusterService instance of the clusterService + */ + public AdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService + ) { + this.admissionControllerName = admissionControllerName; + this.resourceUsageCollectorService = resourceUsageCollectorService; + this.clusterService = clusterService; + this.rejectionCountMap = ConcurrentCollections.newConcurrentMap(); + } + + /** + * Return the current state of the admission controller + * @return true if admissionController is enabled for the transport layer else false + */ + public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlMode) { + return admissionControlMode != AdmissionControlMode.DISABLED; + } + + /** + * + * @return true if admissionController is Enforced Mode else false + */ + public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionControlMode) { + return admissionControlMode == AdmissionControlMode.ENFORCED; + } + + /** + * Apply admission control based on the resource usage for an action + */ + public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); + + /** + * @return name of the admission-controller + */ + public String getName() { + return this.admissionControllerName; + } + + /** + * Add rejection count to the rejection count metric tracked by the admission controller + */ + public void addRejectionCount(String admissionControlActionType, long count) { + if (!this.rejectionCountMap.containsKey(admissionControlActionType)) { + this.rejectionCountMap.put(admissionControlActionType, new AtomicLong(0)); + } + this.rejectionCountMap.get(admissionControlActionType).getAndAdd(count); + } + + /** + * @return current value of the rejection count metric tracked by the admission-controller. + */ + public long getRejectionCount(String admissionControlActionType) { + if (this.rejectionCountMap.containsKey(admissionControlActionType)) { + return this.rejectionCountMap.get(admissionControlActionType).get(); + } + return 0; + } + + /** + * Get rejection stats of the admission controller + */ + public Map getRejectionStats() { + Map rejectionStats = new HashMap<>(); + rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); + return rejectionStats; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java new file mode 100644 index 0000000000000..5c180346c05e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.node.NodeResourceUsageStats; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; + +import java.util.Locale; +import java.util.Optional; + +/** + * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class CpuBasedAdmissionController extends AdmissionController { + public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; + private static final Logger LOGGER = LogManager.getLogger(CpuBasedAdmissionController.class); + public CpuBasedAdmissionControllerSettings settings; + + /** + * @param admissionControllerName Name of the admission controller + * @param resourceUsageCollectorService Instance used to get node resource usage stats + * @param clusterService ClusterService Instance + * @param settings Immutable settings instance + */ + public CpuBasedAdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService, + Settings settings + ) { + super(admissionControllerName, resourceUsageCollectorService, clusterService); + this.settings = new CpuBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); + } + + /** + * Apply admission control based on process CPU usage + * @param action is the transport action + */ + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { + this.applyForTransportLayer(action, admissionControlActionType); + } + } + + /** + * Apply transport layer admission control if configured limit has been reached + */ + private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { + if (isLimitsBreached(actionName, admissionControlActionType)) { + this.addRejectionCount(admissionControlActionType.getType(), 1); + if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "CPU usage admission controller rejected the request for action [%s] as CPU limit reached", + admissionControlActionType.name() + ) + ); + } + } + } + + /** + * Check if the configured resource usage limits are breached for the action + */ + private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { + // check if cluster state is ready + if (clusterService.state() != null && clusterService.state().nodes() != null) { + long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( + this.clusterService.state().nodes().getLocalNodeId() + ); + if (nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit) { + LOGGER.warn( + "CpuBasedAdmissionController limit reached as the current CPU " + + "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]", + cpuUsage, + maxCpuLimit, + actionName, + this.settings.getTransportLayerAdmissionControllerMode() + ); + return true; + } + } + } + return false; + } + + /** + * Get CPU rejection threshold based on action type + */ + private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) { + switch (admissionControlActionType) { + case SEARCH: + return this.settings.getSearchCPULimit(); + case INDEXING: + return this.settings.getIndexingCPULimit(); + default: + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Admission control not Supported for AdmissionControlActionType: %s", + admissionControlActionType.getType() + ) + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java new file mode 100644 index 0000000000000..23746cc61a203 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes related to the different admission controllers + */ +package org.opensearch.ratelimitting.admissioncontrol.controllers; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java new file mode 100644 index 0000000000000..8cf6e973ceb64 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import java.util.Locale; + +/** + * Enums that defines the type of the transport requests + */ +public enum AdmissionControlActionType { + INDEXING("indexing"), + SEARCH("search"); + + private final String type; + + AdmissionControlActionType(String uriType) { + this.type = uriType; + } + + /** + * + * @return type of the request + */ + public String getType() { + return type; + } + + public static AdmissionControlActionType fromName(String name) { + name = name.toLowerCase(Locale.ROOT); + switch (name) { + case "indexing": + return INDEXING; + case "search": + return SEARCH; + default: + throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java new file mode 100644 index 0000000000000..2ae2436ba84e7 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import java.util.Locale; + +/** + * Defines the AdmissionControlMode + */ +public enum AdmissionControlMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControlMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + public String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControlMode based on the mode + */ + public static AdmissionControlMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + default: + throw new IllegalArgumentException("Invalid AdmissionControlMode: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java new file mode 100644 index 0000000000000..98b08ebd0a7bf --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains enums related to the different admission controller feature + */ +package org.opensearch.ratelimitting.admissioncontrol.enums; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java new file mode 100644 index 0000000000000..b3dc229f86fb6 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains base classes needed for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java new file mode 100644 index 0000000000000..1bddd1446a4c4 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +/** + * Settings related to cpu based admission controller. + * @opensearch.internal + */ +public class CpuBasedAdmissionControllerSettings { + + /** + * Default parameters for the CpuBasedAdmissionControllerSettings + */ + public static class Defaults { + public static final long CPU_USAGE_LIMIT = 95; + } + + private AdmissionControlMode transportLayerMode; + private Long searchCPULimit; + private Long indexingCPULimit; + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_control.transport.cpu_usage.mode_override", + AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting used to set the CPU Limits for the search requests by default it will use default IO usage limit + */ + public static final Setting SEARCH_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.search.cpu_usage.limit", + Defaults.CPU_USAGE_LIMIT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting used to set the CPU limits for the indexing requests by default it will use default IO usage limit + */ + public static final Setting INDEXING_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.indexing.cpu_usage.limit", + Defaults.CPU_USAGE_LIMIT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // currently limited to one setting will add further more settings in follow-up PR's + public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { + this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); + this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); + this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); + clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); + } + + private void setTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayerMode = admissionControlMode; + } + + public AdmissionControlMode getTransportLayerAdmissionControllerMode() { + return transportLayerMode; + } + + public Long getSearchCPULimit() { + return searchCPULimit; + } + + public Long getIndexingCPULimit() { + return indexingCPULimit; + } + + public void setIndexingCPULimit(Long indexingCPULimit) { + this.indexingCPULimit = indexingCPULimit; + } + + public void setSearchCPULimit(Long searchCPULimit) { + this.searchCPULimit = searchCPULimit; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java new file mode 100644 index 0000000000000..a024ccc756745 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains settings related classes for the different admission controllers + */ +package org.opensearch.ratelimitting.admissioncontrol.settings; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java new file mode 100644 index 0000000000000..39909c571c63e --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Class for admission control stats used as part of node stats + * @opensearch.internal + */ +public class AdmissionControlStats implements ToXContentFragment, Writeable { + + private final List admissionControllerStatsList; + + /** + * + * @param admissionControllerStatsList list of admissionControllerStats + */ + public AdmissionControlStats(List admissionControllerStatsList) { + this.admissionControllerStatsList = admissionControllerStatsList; + } + + /** + * + * @param in the stream to read from + * @throws IOException if an I/O error occurs + */ + public AdmissionControlStats(StreamInput in) throws IOException { + this.admissionControllerStatsList = in.readList(AdmissionControllerStats::new); + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out the output stream to write entity content to + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(this.admissionControllerStatsList); + } + + public List getAdmissionControllerStatsList() { + return admissionControllerStatsList; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("admission_control"); + for (AdmissionControllerStats admissionControllerStats : this.admissionControllerStatsList) { + builder.field(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); + } + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java new file mode 100644 index 0000000000000..3895cac3eaa07 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; + +import java.io.IOException; +import java.util.Map; + +/** + * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type + * @opensearch.internal + */ +public class AdmissionControllerStats implements Writeable, ToXContentFragment { + public Map rejectionCount; + public String admissionControllerName; + + public AdmissionControllerStats(AdmissionController admissionController) { + this.rejectionCount = admissionController.getRejectionStats(); + this.admissionControllerName = admissionController.getName(); + } + + public AdmissionControllerStats(StreamInput in) throws IOException { + this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + this.admissionControllerName = in.readString(); + } + + public String getAdmissionControllerName() { + return admissionControllerName; + } + + public Map getRejectionCount() { + return rejectionCount; + } + + /** + * Writes this instance into a {@link StreamOutput} + * @param out the {@link StreamOutput} to write to + * @throws IOException if an error occurs while writing to the StreamOutput + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + out.writeString(this.admissionControllerName); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("transport"); + { + builder.startObject("rejection_count"); + { + for (Map.Entry rejectionCountEntry : this.rejectionCount.entrySet()) { + builder.field(rejectionCountEntry.getKey(), rejectionCountEntry.getValue()); + } + } + builder.endObject(); + } + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java new file mode 100644 index 0000000000000..7c96dcd569d64 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains stats related classes for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol.stats; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java new file mode 100644 index 0000000000000..1e8f309234f90 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionControl Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControlTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + AdmissionControlService admissionControlService; + boolean forceExecution; + AdmissionControlActionType admissionControlActionType; + + public AdmissionControlTransportHandler( + String action, + TransportRequestHandler actualHandler, + AdmissionControlService admissionControlService, + boolean forceExecution, + AdmissionControlActionType admissionControlActionType + ) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControlService = admissionControlService; + this.forceExecution = forceExecution; + this.admissionControlActionType = admissionControlActionType; + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception when admission control rejected the requests + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // skip admission control if force execution is true + if (!this.forceExecution) { + // intercept the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + return; + } + } + actualHandler.messageReceived(request, channel, task); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java new file mode 100644 index 0000000000000..ae1520bca769d --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling by intercepting requests on both the sender and the receiver side. + */ +public class AdmissionControlTransportInterceptor implements TransportInterceptor { + + AdmissionControlService admissionControlService; + + public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) { + this.admissionControlService = admissionControlService; + } + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType + ) { + return new AdmissionControlTransportHandler<>( + action, + actualHandler, + this.admissionControlService, + forceExecution, + admissionControlActionType + ); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java new file mode 100644 index 0000000000000..f97f31bc7b1db --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains transport related classes for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol.transport; diff --git a/server/src/main/java/org/opensearch/ratelimitting/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/package-info.java new file mode 100644 index 0000000000000..c04358e14284f --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base OpenSearch Throttling package + */ +package org.opensearch.ratelimitting; diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 9ee2db6d39893..e8efbeb7de3f9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.Writeable.Reader; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; /** * This interface allows plugins to intercept requests on both the sender and the receiver side. @@ -57,6 +58,19 @@ default TransportRequestHandler interceptHandler return actualHandler; } + /** + * This is called for handlers that needs admission control support + */ + default TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType + ) { + return interceptHandler(action, executor, forceExecution, actualHandler); + } + /** * This is called up-front providing the actual low level {@link AsyncSender} that performs the low level send request. * The returned sender is used to send all requests that come in via diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 210fd400703a4..c04d34856814a 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -66,6 +66,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.telemetry.tracing.Span; @@ -1214,6 +1215,40 @@ public void registerRequestHandler( transport.registerRequestHandler(reg); } + /** + * Registers a new request handler with admission control support + * + * @param action The action the request handler is associated with + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @param requestReader The request class that will be used to construct new instances for streaming + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler( + String action, + String executor, + boolean forceExecution, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + Writeable.Reader requestReader, + TransportRequestHandler handler + ) { + validateActionName(action); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, + requestReader, + taskManager, + handler, + executor, + forceExecution, + canTripCircuitBreaker + ); + transport.registerRequestHandler(reg); + } + /** * called by the {@link Transport} implementation when an incoming request arrives but before * any parsing of it has happened (with the exception of the requestId and action) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 253639c09b606..cf5cdf720eb4a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -66,6 +66,11 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.test.OpenSearchTestCase; @@ -540,15 +545,44 @@ public void testSerialization() throws IOException { assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } + AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats(); + AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats(); + if (admissionControlStats == null) { + assertNull(deserializedAdmissionControlStats); + } else { + assertEquals( + admissionControlStats.getAdmissionControllerStatsList().size(), + deserializedAdmissionControlStats.getAdmissionControllerStatsList().size() + ); + AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); + AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats + .getAdmissionControllerStatsList() + .get(0); + assertEquals( + admissionControllerStats.getAdmissionControllerName(), + deserializedAdmissionControllerStats.getAdmissionControllerName() + ); + assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()) + ); + + assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()) + ); + } } } } - public static NodeStats createNodeStats() { + public static NodeStats createNodeStats() throws IOException { return createNodeStats(false); } - public static NodeStats createNodeStats(boolean remoteStoreStats) { + public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOException { DiscoveryNode node = new DiscoveryNode( "test_node", buildNewFakeTransportAddress(), @@ -861,6 +895,26 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); clusterManagerThrottlingStats.onThrottle("test-task", randomInt()); } + + AdmissionControlStats admissionControlStats = null; + if (frequently()) { + AdmissionController admissionController = new AdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + null + ) { + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + return; + } + }; + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + AdmissionControllerStats stats = new AdmissionControllerStats(admissionController); + List statsList = new ArrayList(); + statsList.add(stats); + admissionControlStats = new AdmissionControlStats(statsList); + } ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null; WeightedRoutingStats weightedRoutingStats = null; @@ -898,7 +952,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { null, null, segmentReplicationRejectionStats, - null + null, + admissionControlStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index a0c45f95ef7c0..40a30342b86b9 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -87,7 +88,13 @@ public void testNetworkTypesToXContent() throws Exception { } public void testIngestStats() throws Exception { - NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); + NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, () -> { + try { + return NodeStatsTests.createNodeStats(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); SortedMap processorStats = new TreeMap<>(); nodeStats.getIngestStats().getProcessorStats().values().forEach(stats -> { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index f037b75dc16a3..ff47ec3015697 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -193,6 +193,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -222,6 +223,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -251,6 +253,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -311,6 +314,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -340,6 +344,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -369,6 +374,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 0ca118fe422a5..de4bdcac6c2b2 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -57,6 +57,7 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -124,7 +125,7 @@ public Map> getTransports( return Collections.singletonMap("custom", custom); } }; - NetworkModule module = newNetworkModule(settings, plugin); + NetworkModule module = newNetworkModule(settings, null, plugin); assertSame(custom, module.getTransportSupplier()); } @@ -135,7 +136,7 @@ public void testRegisterHttpTransport() { .build(); Supplier custom = FakeHttpTransport::new; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getHttpTransports( Settings settings, @@ -155,7 +156,7 @@ public Map> getHttpTransports( assertSame(custom, module.getHttpServerTransportSupplier()); settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); - NetworkModule newModule = newNetworkModule(settings); + NetworkModule newModule = newNetworkModule(settings, null); expectThrows(IllegalStateException.class, () -> newModule.getHttpServerTransportSupplier()); } @@ -169,7 +170,7 @@ public void testOverrideDefault() { Supplier customTransport = () -> null; // content doesn't matter we check reference equality Supplier custom = FakeHttpTransport::new; Supplier def = FakeHttpTransport::new; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getTransports( Settings settings, @@ -214,7 +215,7 @@ public void testDefaultKeys() { Supplier custom = FakeHttpTransport::new; Supplier def = FakeHttpTransport::new; Supplier customTransport = () -> null; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getTransports( Settings settings, @@ -273,7 +274,7 @@ public TransportRequestHandler interceptHandler( return actualHandler; } }; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, @@ -295,7 +296,7 @@ public List getTransportInterceptors( assertSame(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.get(0), interceptor); NullPointerException nullPointerException = expectThrows(NullPointerException.class, () -> { - newNetworkModule(settings, new NetworkPlugin() { + newNetworkModule(settings, null, new NetworkPlugin() { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, @@ -309,7 +310,202 @@ public List getTransportInterceptors( assertEquals("interceptor must not be null", nullPointerException.getMessage()); } - private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugins) { + public void testRegisterCoreInterceptor() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(0, called.get()); + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + assertEquals(1, called.get()); + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + assertEquals(2, called.get()); + assertTrue(transportInterceptor instanceof NetworkModule.CompositeTransportInterceptor); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 1); + assertSame(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.get(0), interceptor); + } + + public void testInterceptorOrder() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + AtomicInteger called1 = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + TransportInterceptor interceptor1 = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called1.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor1); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor); + } + }); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 2); + + assertEquals(0, called.get()); + assertEquals(0, called1.get()); + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + assertEquals(1, called.get()); + assertEquals(1, called1.get()); + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + assertEquals(2, called.get()); + assertEquals(2, called1.get()); + } + + public void testInterceptorOrderException() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + AtomicInteger called1 = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + TransportInterceptor interceptor1 = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called1.incrementAndGet(); + throw new RuntimeException("Handler Invoke Failed"); + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor1); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor); + } + }); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 2); + + assertEquals(0, called.get()); + assertEquals(0, called1.get()); + try { + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + } catch (Exception e) { + assertEquals(1, called.get()); + assertEquals(1, called1.get()); + } + + coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor); + module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor1); + } + }); + + transportInterceptor = module.getTransportInterceptor(); + + try { + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + } catch (Exception e) { + assertEquals(1, called.get()); + assertEquals(2, called1.get()); + } + } + + private NetworkModule newNetworkModule( + Settings settings, + List coreTransportInterceptors, + NetworkPlugin... plugins + ) { return new NetworkModule( settings, Arrays.asList(plugins), @@ -322,7 +518,8 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi null, new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - NoopTracer.INSTANCE + NoopTracer.INSTANCE, + coreTransportInterceptors ); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java new file mode 100644 index 0000000000000..7a67ffc8c7c5d --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +public class AdmissionControlServiceTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + private AdmissionControlService admissionControlService; + private String action = ""; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + action = "indexing"; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testWhenAdmissionControllerRegistered() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); + } + + public void testRegisterInvalidAdmissionController() { + String test = "TEST"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> admissionControlService.registerAdmissionController(test) + ); + assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test); + } + + public void testAdmissionControllerSettings() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings; + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService + .getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals( + admissionControlSettings.isTransportLayerAdmissionControlEnabled(), + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals( + admissionControlSettings.isTransportLayerAdmissionControlEnabled(), + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + + Settings newSettings = Settings.builder() + .put(settings) + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue( + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + } + + public void testApplyAdmissionControllerDisabled() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + admissionControllerList.forEach(admissionController -> { + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + }); + } + + public void testApplyAdmissionControllerEnabled() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); + assertEquals( + admissionControlService.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), + 0 + ); + + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + } + + public void testApplyAdmissionControllerEnforced() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); + assertEquals( + admissionControlService.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), + 0 + ); + + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..c11ee1cc608f6 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Set; + +public class AdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the admission controller settings should be supported built in settings", + settings.containsAll(List.of(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE)) + ); + } + + public void testDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + assertEquals(admissionControlSettings.getAdmissionControlTransportLayerMode().getMode(), AdmissionControlSettings.Defaults.MODE); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } + + public void testUpdateAfterGetDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + + Settings newSettings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java new file mode 100644 index 0000000000000..a1694b2c3cee2 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.After; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +/** + * Single node integration tests for admission control + */ +public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @After + public void cleanup() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + } + + public void testAdmissionControlRejectionEnforcedMode() throws Exception { + ensureGreen(); + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Thread.sleep(700); + client().admin().indices().prepareCreate("index").execute().actionGet(); + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + // verify bulk request hits 429 + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request hits 429 + SearchRequest searchRequest = new SearchRequest("index"); + try { + client().search(searchRequest).actionGet(); + } catch (Exception e) { + assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success but admission control having rejections stats + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success but admission control having rejections stats + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionDisabledMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder().put(super.nodeSettings()).put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + + } + + public void testAdmissionControlWithinLimits() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java new file mode 100644 index 0000000000000..e72c0cd58ed64 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import org.mockito.Mockito; + +public class CpuBasedAdmissionControllerTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + CpuBasedAdmissionController admissionController = null; + String action = "TEST_ACTION"; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testCheckDefaultParameters() { + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + clusterService, + Settings.EMPTY + ); + assertEquals(admissionController.getName(), CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertFalse( + admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); + } + + public void testCheckUpdateSettings() { + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + clusterService, + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + assertEquals(admissionController.getName(), CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + } + + public void testApplyControllerWithDefaultSettings() { + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + Settings.EMPTY + ); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + action = "indices:data/write/bulk[s][p]"; + admissionController.apply(action, AdmissionControlActionType.INDEXING); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + } + + public void testApplyControllerWhenSettingsEnabled() throws Exception { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + settings + ); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + assertTrue( + admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + // we can assert admission control and rejections as part of ITs + } + + public void testRejectionCount() { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + settings + ); + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 3); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 1); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 3); + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 2); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 5); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java new file mode 100644 index 0000000000000..15a25e6cbca1c --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import org.opensearch.test.OpenSearchTestCase; + +public class AdmissionControlActionTypeTests extends OpenSearchTestCase { + + public void testValidActionType() { + assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); + assertEquals(AdmissionControlActionType.INDEXING.getType(), "indexing"); + assertEquals(AdmissionControlActionType.fromName("search"), AdmissionControlActionType.SEARCH); + assertEquals(AdmissionControlActionType.fromName("indexing"), AdmissionControlActionType.INDEXING); + } + + public void testInValidActionType() { + String name = "test"; + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> AdmissionControlActionType.fromName(name)); + assertEquals(ex.getMessage(), "Not Supported TransportAction Type: " + name); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java new file mode 100644 index 0000000000000..98c0f3c7cf24c --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import org.opensearch.test.OpenSearchTestCase; + +public class AdmissionControlModeTests extends OpenSearchTestCase { + + public void testValidActionType() { + assertEquals(AdmissionControlMode.DISABLED.getMode(), "disabled"); + assertEquals(AdmissionControlMode.ENFORCED.getMode(), "enforced"); + assertEquals(AdmissionControlMode.MONITOR.getMode(), "monitor_only"); + assertEquals(AdmissionControlMode.fromName("disabled"), AdmissionControlMode.DISABLED); + assertEquals(AdmissionControlMode.fromName("enforced"), AdmissionControlMode.ENFORCED); + assertEquals(AdmissionControlMode.fromName("monitor_only"), AdmissionControlMode.MONITOR); + } + + public void testInValidActionType() { + String name = "TEST"; + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> AdmissionControlMode.fromName(name)); + assertEquals(ex.getMessage(), "Invalid AdmissionControlMode: " + name); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..11688e2f30d4b --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.settings; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Set; + +public class CPUBasedAdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the cpu based admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT + ) + ) + ); + } + + public void testDefaultSettings() { + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + long percent = 95; + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); + } + + public void testGetConfiguredSettings() { + long percent = 95; + long indexingPercent = 85; + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .build(); + + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + } + + public void testUpdateAfterGetDefaultSettings() { + long percent = 95; + long searchPercent = 80; + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + } + + public void testUpdateAfterGetConfiguredSettings() { + long percent = 95; + long indexingPercent = 85; + long searchPercent = 80; + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + + Settings updatedSettings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + + searchPercent = 70; + + updatedSettings = Settings.builder() + .put(updatedSettings) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java new file mode 100644 index 0000000000000..7b4db5f787d6e --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class AdmissionControlStatsTests extends OpenSearchTestCase { + AdmissionController admissionController; + AdmissionControllerStats admissionControllerStats; + AdmissionControlStats admissionControlStats; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + mock(ResourceUsageCollectorService.class), + clusterService, + settings + ); + admissionControllerStats = new AdmissionControllerStats(admissionController); + List admissionControllerStats = new ArrayList<>(); + admissionControlStats = new AdmissionControlStats(admissionControllerStats); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testDefaults() throws IOException { + assertEquals(admissionControlStats.getAdmissionControllerStatsList().size(), 0); + } + + public void testRejectionCount() throws IOException { + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 11); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 1); + admissionControllerStats = new AdmissionControllerStats(admissionController); + admissionControlStats = new AdmissionControlStats(List.of(admissionControllerStats)); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + builder = admissionControlStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String response = builder.toString(); + assertEquals( + response, + "{\"admission_control\":{\"global_cpu_usage\":{\"transport\":{\"rejection_count\":{\"search\":11,\"indexing\":1}}}}}" + ); + AdmissionControlStats admissionControlStats1 = admissionControlStats; + assertEquals(admissionControlStats.hashCode(), admissionControlStats1.hashCode()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java new file mode 100644 index 0000000000000..fe0399e79a5f4 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +public class AdmissionControllerStatsTests extends OpenSearchTestCase { + AdmissionController admissionController; + AdmissionControllerStats admissionControllerStats; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + admissionController = new CpuBasedAdmissionController("TEST", mock(ResourceUsageCollectorService.class), clusterService, settings); + admissionControllerStats = new AdmissionControllerStats(admissionController); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testDefaults() throws IOException { + assertEquals(admissionControllerStats.getRejectionCount().size(), 0); + assertEquals(admissionControllerStats.getAdmissionControllerName(), "TEST"); + } + + public void testRejectionCount() throws IOException { + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 11); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 1); + admissionControllerStats = new AdmissionControllerStats(admissionController); + long searchRejection = admissionControllerStats.getRejectionCount().getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); + long indexingRejection = admissionControllerStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L); + assertEquals(searchRejection, 11); + assertEquals(indexingRejection, 1); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder = admissionControllerStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + String response = builder.toString(); + assertEquals(response, "{\"transport\":{\"rejection_count\":{\"search\":11,\"indexing\":1}}}"); + AdmissionControllerStats admissionControllerStats1 = admissionControllerStats; + assertEquals(admissionControllerStats.hashCode(), admissionControllerStats1.hashCode()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java new file mode 100644 index 0000000000000..0c95769e19489 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class AdmissionControlTransportHandlerTests extends OpenSearchTestCase { + AdmissionControlTransportHandler admissionControlTransportHandler; + + public void testHandlerInvoked() throws Exception { + String action = "TEST"; + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + mock(AdmissionControlService.class), + false, + null + ); + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRejectedException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action, null); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + admissionControlService, + false, + null + ); + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRandomException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action, null); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + admissionControlService, + false, + null + ); + try { + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } catch (Exception exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + private class InterceptingRequestHandler implements TransportRequestHandler { + private final String action; + public int count; + + public InterceptingRequestHandler(String action) { + this.action = action; + this.count = 0; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + this.count = this.count + 1; + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 2ba4de5e54a67..1ad6083074025 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -123,7 +123,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getTaskCancellationStats(), nodeStats.getSearchPipelineStats(), nodeStats.getSegmentReplicationRejectionStats(), - nodeStats.getRepositoriesStats() + nodeStats.getRepositoriesStats(), + nodeStats.getAdmissionControlStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index cd26c132b689b..abdcc3f918aa7 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2735,6 +2735,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(