diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index fbf90b97d1e8f..7778e2d6b98d4 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -425,6 +425,7 @@ import org.opensearch.rest.action.admin.indices.RestValidateQueryAction; import org.opensearch.rest.action.admin.indices.RestViewAction; import org.opensearch.rest.action.cat.AbstractCatAction; +import org.opensearch.rest.action.cat.RequestLimitSettings; import org.opensearch.rest.action.cat.RestAliasAction; import org.opensearch.rest.action.cat.RestAllocationAction; import org.opensearch.rest.action.cat.RestCatAction; @@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; private final ExtensionsManager extensionsManager; + private final RequestLimitSettings requestLimitSettings; public ActionModule( Settings settings, @@ -580,6 +582,7 @@ public ActionModule( ); restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService); + requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); } public Map> getActions() { @@ -960,7 +963,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestClusterManagerAction()); registerHandler.accept(new RestNodesAction()); registerHandler.accept(new RestTasksAction(nodesInCluster)); - registerHandler.accept(new RestIndicesAction()); + registerHandler.accept(new RestIndicesAction(requestLimitSettings)); registerHandler.accept(new RestSegmentsAction()); // Fully qualified to prevent interference with rest.action.count.RestCountAction registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction()); @@ -1048,6 +1051,8 @@ protected void configure() { // register dynamic ActionType -> transportAction Map used by NodeClient bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry); + + bind(RequestLimitSettings.class).toInstance(requestLimitSettings); } public ActionFilters getActionFilters() { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java index 224d3cbc5f10a..0d42ab6de457b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java @@ -19,10 +19,15 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; +import org.opensearch.rest.action.cat.RequestLimitSettings; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_SHARDS; + /** * Perform cat shards action * @@ -31,11 +36,18 @@ public class TransportCatShardsAction extends HandledTransportAction { private final NodeClient client; + private final RequestLimitSettings requestLimitSettings; @Inject - public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + public TransportCatShardsAction( + NodeClient client, + TransportService transportService, + ActionFilters actionFilters, + RequestLimitSettings requestLimitSettings + ) { super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new); this.client = client; + this.requestLimitSettings = requestLimitSettings; } @Override @@ -73,6 +85,9 @@ protected void innerOnFailure(Exception e) { client.admin().cluster().state(clusterStateRequest, new ActionListener() { @Override public void onResponse(ClusterStateResponse clusterStateResponse) { + if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) { + listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT)); + } catShardsResponse.setClusterStateResponse(clusterStateResponse); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.setShouldCancelOnTimeout(true); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09832e2b41b6d..9d49b6f8e2d0e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -155,6 +155,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.action.cat.RequestLimitSettings; import org.opensearch.script.ScriptService; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.MultiBucketConsumerService; @@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) { WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD, WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, - WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD + WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, + + RequestLimitSettings.CAT_INDICES_LIMIT_SETTING, + RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING, + RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RequestLimitSettings.java b/server/src/main/java/org/opensearch/rest/action/cat/RequestLimitSettings.java new file mode 100644 index 0000000000000..f2c5b7cb97d94 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/cat/RequestLimitSettings.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required. + */ +public class RequestLimitSettings { + + /** + * Enum to represent action names against whom we need to perform limit checks. + */ + public enum BlockAction { + CAT_INDICES, + CAT_SHARDS, + CAT_SEGMENTS + } + + private volatile int catIndicesLimit; + private volatile int catShardsLimit; + private volatile int catSegmentsLimit; + + public static final Setting CAT_INDICES_LIMIT_SETTING = Setting.intSetting( + "cat.indices.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting CAT_SHARDS_LIMIT_SETTING = Setting.intSetting( + "cat.shards.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting( + "cat.segments.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) { + setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings)); + setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings)); + setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings)); + + clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting); + clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting); + clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting); + } + + /** + * Method to check if the circuit breaker limit has reached for an action. + * The limits are controlled via dynamic settings. + * + * @param clusterState {@link ClusterState} + * @param actionToCheck {@link BlockAction} + * @return True/False + */ + public boolean isCircuitBreakerLimitBreached(final ClusterState clusterState, BlockAction actionToCheck) { + if (Objects.isNull(clusterState)) return false; + switch (actionToCheck) { + case CAT_INDICES: + if (catIndicesLimit <= 0) return false; + int indicesCount = getTotalIndices(clusterState); + if (indicesCount > catIndicesLimit) return true; + break; + case CAT_SHARDS: + if (catShardsLimit <= 0) return false; + int totalShards = getTotalShards(clusterState); + if (totalShards > catShardsLimit) return true; + break; + case CAT_SEGMENTS: + if (catSegmentsLimit <= 0) return false; + if (getTotalIndices(clusterState) > catSegmentsLimit) return true; + break; + } + return false; + } + + private void setCatShardsLimitSetting(final int catShardsLimit) { + this.catShardsLimit = catShardsLimit; + } + + private void setCatIndicesLimitSetting(final int catIndicesLimit) { + this.catIndicesLimit = catIndicesLimit; + } + + private void setCatSegmentsLimitSetting(final int catSegmentsLimit) { + this.catSegmentsLimit = catSegmentsLimit; + } + + private static int getTotalIndices(final ClusterState clusterState) { + return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0); + } + + private static int getTotalShards(final ClusterState clusterState) { + final RoutingTable routingTable = clusterState.getRoutingTable(); + final Map indexRoutingTableMap = routingTable.getIndicesRouting(); + int totalShards = 0; + for (final Map.Entry entry : indexRoutingTableMap.entrySet()) { + for (final Map.Entry indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) { + totalShards += indexShardRoutingTableEntry.getValue().getShards().size(); + } + } + return totalShards; + } + + // TODO: Evaluate if we can move this to common util. + private static T chainWalk(Supplier supplier, T defaultValue) { + try { + return supplier.get(); + } catch (NullPointerException e) { + return defaultValue; + } + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 9dc711f804144..795be2203f513 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -57,6 +57,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.index.IndexSettings; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; @@ -81,6 +83,7 @@ import static java.util.Collections.unmodifiableList; import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_INDICES; /** * _cat API action to list indices @@ -96,6 +99,12 @@ public class RestIndicesAction extends AbstractCatAction { private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE = "Please only use one of the request parameters [master_timeout, cluster_manager_timeout]."; + private final RequestLimitSettings requestLimitSettings; + + public RestIndicesAction(RequestLimitSettings requestLimitSettings) { + this.requestLimitSettings = requestLimitSettings; + } + @Override public List routes() { return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}"))); @@ -151,48 +160,66 @@ public RestResponse buildResponse(final Table table) throws Exception { new ActionListener() { @Override public void onResponse(final GetSettingsResponse getSettingsResponse) { - final GroupedActionListener groupedListener = createGroupedListener(request, 4, listener); - groupedListener.onResponse(getSettingsResponse); - // The list of indices that will be returned is determined by the indices returned from the Get Settings call. // All the other requests just provide additional detail, and wildcards may be resolved differently depending on the // type of request in the presence of security plugins (looking at you, ClusterHealthRequest), so // force the IndicesOptions for all the sub-requests to be as inclusive as possible. final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden(); - // Indices that were successfully resolved during the get settings request might be deleted when the subsequent - // cluster - // state, cluster health and indices stats requests execute. We have to distinguish two cases: - // 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we want the - // subsequent requests to fail. - // 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent requests - // not to - // fail on the deleted index (as we want to ignore wildcards that cannot be resolved). - // This behavior can be ensured by letting the cluster state, cluster health and indices stats requests re-resolve - // the - // index names with the same indices options that we used for the initial cluster state request (strictExpand). - sendIndicesStatsRequest( - indices, - subRequestIndicesOptions, - includeUnloadedSegments, - client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) - ); + // Indices that were successfully resolved during the get settings request might be deleted when the + // subsequent cluster state, cluster health and indices stats requests execute. We have to distinguish two cases: + // 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we + // want the subsequent requests to fail. + // 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent + // requests not to fail on the deleted index (as we want to ignore wildcards that cannot be resolved). + // This behavior can be ensured by letting the cluster state, cluster health and indices stats requests + // re-resolve the index names with the same indices options that we used for the initial cluster state + // request (strictExpand). sendClusterStateRequest( indices, subRequestIndicesOptions, local, clusterManagerNodeTimeout, client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) - ); - sendClusterHealthRequest( - indices, - subRequestIndicesOptions, - local, - clusterManagerNodeTimeout, - client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + new ActionListener() { + @Override + public void onResponse(ClusterStateResponse clusterStateResponse) { + if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_INDICES)) { + listener.onFailure( + new CircuitBreakingException("Too many indices requested.", CircuitBreaker.Durability.TRANSIENT) + ); + } + final GroupedActionListener groupedListener = createGroupedListener( + request, + 4, + listener + ); + groupedListener.onResponse(getSettingsResponse); + groupedListener.onResponse(clusterStateResponse); + + sendIndicesStatsRequest( + indices, + subRequestIndicesOptions, + includeUnloadedSegments, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + + sendClusterHealthRequest( + indices, + subRequestIndicesOptions, + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } ); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java index b88af4ac3eeed..8d88e9828c9d4 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java @@ -96,6 +96,11 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { + /* + if (requestLimitSettings.isRequestLimitBreached(clusterStateResponse, CAT_SEGMENTS)) { + listener.onFailure(new CircuitBreakingException("Segments from too many indices requested.", CircuitBreaker.Durability.TRANSIENT)); + } + */ final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(); indicesSegmentsRequest.indices(indices); client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener(channel) { diff --git a/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java b/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java index c7e1039686cc9..386f466caf122 100644 --- a/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java +++ b/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.common.bytes.BytesArray; @@ -65,6 +66,7 @@ import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction; import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction; import org.opensearch.rest.action.admin.indices.RestUpdateSettingsAction; +import org.opensearch.rest.action.cat.RequestLimitSettings; import org.opensearch.rest.action.cat.RestAllocationAction; import org.opensearch.rest.action.cat.RestClusterManagerAction; import org.opensearch.rest.action.cat.RestIndicesAction; @@ -155,7 +157,10 @@ public void testCatAllocation() { } public void testCatIndices() { - RestIndicesAction action = new RestIndicesAction(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings settings = Settings.builder().build(); + RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + RestIndicesAction action = new RestIndicesAction(requestLimitSettings); Exception e = assertThrows(OpenSearchParseException.class, () -> action.doCatRequest(getRestRequestWithBothParams(), client)); assertThat(e.getMessage(), containsString(DUPLICATE_PARAMETER_ERROR_MESSAGE)); assertWarnings(MASTER_TIMEOUT_DEPRECATED_MESSAGE); diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java index 96b1c75371697..466c577a38f22 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.Table; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; @@ -137,7 +138,10 @@ public void testBuildTable() { } } - final RestIndicesAction action = new RestIndicesAction(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final Settings settings = Settings.builder().build(); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + final RestIndicesAction action = new RestIndicesAction(requestLimitSettings); final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetadatas); // now, verify the table is correct