From 454e2dd92a18d2887e1dfcbe7e1fac7ee4d2173a Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Thu, 19 Sep 2024 03:36:30 +0530 Subject: [PATCH] Add changes to block calls in cat shards, indices and segments based on dynamic limit settings Signed-off-by: Sumit Bansal --- CHANGELOG.md | 1 + .../org/opensearch/action/ActionModule.java | 9 +- .../cluster/shards/CatShardsRequest.java | 10 ++ .../shards/TransportCatShardsAction.java | 18 +- .../common/settings/ClusterSettings.java | 7 +- .../opensearch/rest/RequestLimitSettings.java | 149 +++++++++++++++ .../rest/action/cat/AbstractCatAction.java | 9 + .../rest/action/cat/RestIndicesAction.java | 94 ++++++---- .../rest/action/cat/RestSegmentsAction.java | 19 ++ .../rest/action/cat/RestShardsAction.java | 6 + .../RenamedTimeoutRequestParameterTests.java | 12 +- .../rest/RequestLimitSettingsTests.java | 169 ++++++++++++++++++ .../action/cat/RestIndicesActionTests.java | 7 +- 13 files changed, 473 insertions(+), 37 deletions(-) create mode 100644 server/src/main/java/org/opensearch/rest/RequestLimitSettings.java create mode 100644 server/src/test/java/org/opensearch/rest/RequestLimitSettingsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ed85b630bbd4..788319e0fa519 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651)) - Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424)) - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) +- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index fbf90b97d1e8f..8b44e52e221f6 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -325,6 +325,7 @@ import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ActionPlugin.ActionHandler; import org.opensearch.rest.NamedRoute; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestHeaderDefinition; @@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; private final ExtensionsManager extensionsManager; + private final RequestLimitSettings requestLimitSettings; public ActionModule( Settings settings, @@ -580,6 +582,7 @@ public ActionModule( ); restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService); + requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); } public Map> getActions() { @@ -960,8 +963,8 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestClusterManagerAction()); registerHandler.accept(new RestNodesAction()); registerHandler.accept(new RestTasksAction(nodesInCluster)); - registerHandler.accept(new RestIndicesAction()); - registerHandler.accept(new RestSegmentsAction()); + registerHandler.accept(new RestIndicesAction(requestLimitSettings)); + registerHandler.accept(new RestSegmentsAction(requestLimitSettings)); // Fully qualified to prevent interference with rest.action.count.RestCountAction registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction()); // Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction @@ -1048,6 +1051,8 @@ protected void configure() { // register dynamic ActionType -> transportAction Map used by NodeClient bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry); + + bind(RequestLimitSettings.class).toInstance(requestLimitSettings); } public ActionFilters getActionFilters() { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java index 49299777db8ae..76aa25b9a96b5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java @@ -27,11 +27,13 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest headers) { return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java index 224d3cbc5f10a..7e21195e019e0 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java @@ -19,10 +19,15 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SHARDS; + /** * Perform cat shards action * @@ -31,11 +36,18 @@ public class TransportCatShardsAction extends HandledTransportAction { private final NodeClient client; + private final RequestLimitSettings requestLimitSettings; @Inject - public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + public TransportCatShardsAction( + NodeClient client, + TransportService transportService, + ActionFilters actionFilters, + RequestLimitSettings requestLimitSettings + ) { super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new); this.client = client; + this.requestLimitSettings = requestLimitSettings; } @Override @@ -73,6 +85,10 @@ protected void innerOnFailure(Exception e) { client.admin().cluster().state(clusterStateRequest, new ActionListener() { @Override public void onResponse(ClusterStateResponse clusterStateResponse) { + if (shardsRequest.isRequestLimitCheckSupported() + && requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) { + listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT)); + } catShardsResponse.setClusterStateResponse(clusterStateResponse); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.setShouldCancelOnTimeout(true); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09832e2b41b6d..8ab4252ea109c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -155,6 +155,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.script.ScriptService; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.MultiBucketConsumerService; @@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) { WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD, WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, - WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD + WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, + + RequestLimitSettings.CAT_INDICES_LIMIT_SETTING, + RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING, + RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/rest/RequestLimitSettings.java b/server/src/main/java/org/opensearch/rest/RequestLimitSettings.java new file mode 100644 index 0000000000000..727113def43a6 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/RequestLimitSettings.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.rest.action.cat.RestIndicesAction; +import org.opensearch.rest.action.cat.RestSegmentsAction; +import org.opensearch.rest.action.cat.RestShardsAction; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required. + */ +public class RequestLimitSettings { + + /** + * Enum to represent action names against whom we need to perform limit checks. + */ + public enum BlockAction { + CAT_INDICES, + CAT_SHARDS, + CAT_SEGMENTS + } + + private volatile int catIndicesLimit; + private volatile int catShardsLimit; + private volatile int catSegmentsLimit; + + /** + * Setting to enable circuit breaker on {@link RestIndicesAction}. The limit will be applied on number of indices. + */ + public static final Setting CAT_INDICES_LIMIT_SETTING = Setting.intSetting( + "cat.indices.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting to enable circuit breaker on {@link RestShardsAction}. The limit will be applied on number of shards. + */ + public static final Setting CAT_SHARDS_LIMIT_SETTING = Setting.intSetting( + "cat.shards.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting to enable circuit breaker on {@link RestSegmentsAction}. The limit will be applied on number of indices. + */ + public static final Setting CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting( + "cat.segments.limit", + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) { + setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings)); + setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings)); + setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings)); + + clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting); + clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting); + clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting); + } + + /** + * Method to check if the circuit breaker limit has reached for an action. + * The limits are controlled via dynamic settings. + * + * @param clusterState {@link ClusterState} + * @param actionToCheck {@link BlockAction} + * @return True/False + */ + public boolean isCircuitLimitBreached(final ClusterState clusterState, final BlockAction actionToCheck) { + if (Objects.isNull(clusterState)) return false; + switch (actionToCheck) { + case CAT_INDICES: + if (catIndicesLimit <= 0) return false; + int indicesCount = getTotalIndices(clusterState); + if (indicesCount > catIndicesLimit) return true; + break; + case CAT_SHARDS: + if (catShardsLimit <= 0) return false; + int totalShards = getTotalShards(clusterState); + if (totalShards > catShardsLimit) return true; + break; + case CAT_SEGMENTS: + if (catSegmentsLimit <= 0) return false; + if (getTotalIndices(clusterState) > catSegmentsLimit) return true; + break; + } + return false; + } + + private void setCatShardsLimitSetting(final int catShardsLimit) { + this.catShardsLimit = catShardsLimit; + } + + private void setCatIndicesLimitSetting(final int catIndicesLimit) { + this.catIndicesLimit = catIndicesLimit; + } + + private void setCatSegmentsLimitSetting(final int catSegmentsLimit) { + this.catSegmentsLimit = catSegmentsLimit; + } + + private static int getTotalIndices(final ClusterState clusterState) { + return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0); + } + + private static int getTotalShards(final ClusterState clusterState) { + final RoutingTable routingTable = clusterState.getRoutingTable(); + final Map indexRoutingTableMap = routingTable.getIndicesRouting(); + int totalShards = 0; + for (final Map.Entry entry : indexRoutingTableMap.entrySet()) { + for (final Map.Entry indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) { + totalShards += indexShardRoutingTableEntry.getValue().getShards().size(); + } + } + return totalShards; + } + + // TODO: Evaluate if we can move this to common util. + private static T chainWalk(Supplier supplier, T defaultValue) { + try { + return supplier.get(); + } catch (NullPointerException e) { + return defaultValue; + } + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java index 6f4e060363bfb..e502b5a110b5e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java @@ -39,6 +39,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.rest.RestRequest; import java.io.IOException; @@ -98,4 +99,12 @@ protected Set responseParams() { return RESPONSE_PARAMS; } + /** + * Method to check if limits defined in {@link RequestLimitSettings} are applicable to an action. + * + * @return True / False status + */ + protected boolean isRequestLimitCheckSupported() { + return false; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 9dc711f804144..19e3470c1b449 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -57,7 +57,10 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.index.IndexSettings; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; @@ -80,6 +83,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; +import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_INDICES; import static org.opensearch.rest.RestRequest.Method.GET; /** @@ -96,6 +100,12 @@ public class RestIndicesAction extends AbstractCatAction { private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE = "Please only use one of the request parameters [master_timeout, cluster_manager_timeout]."; + private final RequestLimitSettings requestLimitSettings; + + public RestIndicesAction(RequestLimitSettings requestLimitSettings) { + this.requestLimitSettings = requestLimitSettings; + } + @Override public List routes() { return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}"))); @@ -117,6 +127,11 @@ protected void documentation(StringBuilder sb) { sb.append("/_cat/indices/{index}\n"); } + @Override + public boolean isRequestLimitCheckSupported() { + return true; + } + @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); @@ -151,48 +166,67 @@ public RestResponse buildResponse(final Table table) throws Exception { new ActionListener() { @Override public void onResponse(final GetSettingsResponse getSettingsResponse) { - final GroupedActionListener groupedListener = createGroupedListener(request, 4, listener); - groupedListener.onResponse(getSettingsResponse); - // The list of indices that will be returned is determined by the indices returned from the Get Settings call. // All the other requests just provide additional detail, and wildcards may be resolved differently depending on the // type of request in the presence of security plugins (looking at you, ClusterHealthRequest), so // force the IndicesOptions for all the sub-requests to be as inclusive as possible. final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden(); - // Indices that were successfully resolved during the get settings request might be deleted when the subsequent - // cluster - // state, cluster health and indices stats requests execute. We have to distinguish two cases: - // 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we want the - // subsequent requests to fail. - // 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent requests - // not to - // fail on the deleted index (as we want to ignore wildcards that cannot be resolved). - // This behavior can be ensured by letting the cluster state, cluster health and indices stats requests re-resolve - // the - // index names with the same indices options that we used for the initial cluster state request (strictExpand). - sendIndicesStatsRequest( - indices, - subRequestIndicesOptions, - includeUnloadedSegments, - client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) - ); + // Indices that were successfully resolved during the get settings request might be deleted when the + // subsequent cluster state, cluster health and indices stats requests execute. We have to distinguish two cases: + // 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we + // want the subsequent requests to fail. + // 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent + // requests not to fail on the deleted index (as we want to ignore wildcards that cannot be resolved). + // This behavior can be ensured by letting the cluster state, cluster health and indices stats requests + // re-resolve the index names with the same indices options that we used for the initial cluster state + // request (strictExpand). sendClusterStateRequest( indices, subRequestIndicesOptions, local, clusterManagerNodeTimeout, client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) - ); - sendClusterHealthRequest( - indices, - subRequestIndicesOptions, - local, - clusterManagerNodeTimeout, - client, - ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + new ActionListener() { + @Override + public void onResponse(ClusterStateResponse clusterStateResponse) { + if (isRequestLimitCheckSupported() + && requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_INDICES)) { + listener.onFailure( + new CircuitBreakingException("Too many indices requested.", CircuitBreaker.Durability.TRANSIENT) + ); + } + final GroupedActionListener groupedListener = createGroupedListener( + request, + 4, + listener + ); + groupedListener.onResponse(getSettingsResponse); + groupedListener.onResponse(clusterStateResponse); + + sendIndicesStatsRequest( + indices, + subRequestIndicesOptions, + includeUnloadedSegments, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + + sendClusterHealthRequest( + indices, + subRequestIndicesOptions, + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } ); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java index b88af4ac3eeed..d608c87cc2b85 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java @@ -44,7 +44,10 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.index.engine.Segment; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestActionListener; @@ -55,6 +58,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SEGMENTS; import static org.opensearch.rest.RestRequest.Method.GET; /** @@ -66,6 +70,12 @@ public class RestSegmentsAction extends AbstractCatAction { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestSegmentsAction.class); + private final RequestLimitSettings requestLimitSettings; + + public RestSegmentsAction(RequestLimitSettings requestLimitSettings) { + this.requestLimitSettings = requestLimitSettings; + } + @Override public List routes() { return unmodifiableList(asList(new Route(GET, "/_cat/segments"), new Route(GET, "/_cat/segments/{index}"))); @@ -81,6 +91,11 @@ public boolean allowSystemIndexAccessByDefault() { return true; } + @Override + public boolean isRequestLimitCheckSupported() { + return true; + } + @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); @@ -96,6 +111,10 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { + if (isRequestLimitCheckSupported() + && requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SEGMENTS)) { + throw new CircuitBreakingException("Segments from too many indices requested.", CircuitBreaker.Durability.TRANSIENT); + } final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(); indicesSegmentsRequest.indices(indices); client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener(channel) { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index a7ad5fe6c14a3..c8b4e7472927e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -105,6 +105,11 @@ protected void documentation(StringBuilder sb) { sb.append("/_cat/shards/{index}\n"); } + @Override + public boolean isRequestLimitCheckSupported() { + return true; + } + @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); @@ -113,6 +118,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli shardsRequest.clusterManagerNodeTimeout(request.paramAsTime("cluster_manager_timeout", shardsRequest.clusterManagerNodeTimeout())); shardsRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", NO_TIMEOUT)); shardsRequest.setIndices(indices); + shardsRequest.setRequestLimitCheckSupported(isRequestLimitCheckSupported()); parseDeprecatedMasterTimeoutParameter(shardsRequest, request, deprecationLogger, getName()); return channel -> client.execute(CatShardsAction.INSTANCE, shardsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java b/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java index c7e1039686cc9..088b4d2824f1e 100644 --- a/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java +++ b/server/src/test/java/org/opensearch/action/RenamedTimeoutRequestParameterTests.java @@ -12,12 +12,14 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.rest.action.admin.cluster.RestCleanupRepositoryAction; import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction; @@ -155,7 +157,10 @@ public void testCatAllocation() { } public void testCatIndices() { - RestIndicesAction action = new RestIndicesAction(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings settings = Settings.builder().build(); + RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + RestIndicesAction action = new RestIndicesAction(requestLimitSettings); Exception e = assertThrows(OpenSearchParseException.class, () -> action.doCatRequest(getRestRequestWithBothParams(), client)); assertThat(e.getMessage(), containsString(DUPLICATE_PARAMETER_ERROR_MESSAGE)); assertWarnings(MASTER_TIMEOUT_DEPRECATED_MESSAGE); @@ -239,7 +244,10 @@ public void testCatThreadPool() { } public void testCatSegments() { - RestSegmentsAction action = new RestSegmentsAction(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final Settings settings = Settings.builder().build(); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + RestSegmentsAction action = new RestSegmentsAction(requestLimitSettings); Exception e = assertThrows(OpenSearchParseException.class, () -> action.doCatRequest(getRestRequestWithBothParams(), client)); assertThat(e.getMessage(), containsString(DUPLICATE_PARAMETER_ERROR_MESSAGE)); assertWarnings(MASTER_TIMEOUT_DEPRECATED_MESSAGE); diff --git a/server/src/test/java/org/opensearch/rest/RequestLimitSettingsTests.java b/server/src/test/java/org/opensearch/rest/RequestLimitSettingsTests.java new file mode 100644 index 0000000000000..c7ef256592eb1 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/RequestLimitSettingsTests.java @@ -0,0 +1,169 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.rest.RequestLimitSettings.CAT_INDICES_LIMIT_SETTING; +import static org.opensearch.rest.RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING; +import static org.opensearch.rest.RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING; + +public class RequestLimitSettingsTests extends OpenSearchTestCase { + + public void testIsCircuitLimitBreached_forNullClusterState_expectNotBreached() { + final Settings settings = Settings.builder().build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(null, RequestLimitSettings.BlockAction.CAT_INDICES); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatIndicesWithSettingDisabled_expectNotBreached() { + // Don't enable limit + final Settings settings = Settings.builder().put(CAT_INDICES_LIMIT_SETTING.getKey(), -1).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_INDICES); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatIndicesWithSettingEnabled_expectBreached() { + // Set limit of 1 index + final Settings settings = Settings.builder().put(CAT_INDICES_LIMIT_SETTING.getKey(), 1).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Pass cluster state with 3 indices + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_INDICES); + assertTrue(breached); + } + + public void testIsCircuitLimitBreached_forCatIndicesWithSettingEnabled_expectNotBreached() { + // Set limit of 5 indices + final Settings settings = Settings.builder().put(CAT_INDICES_LIMIT_SETTING.getKey(), 5).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Pass cluster state with 3 indices + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_INDICES); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatShardsWithSettingDisabled_expectNotBreached() { + // Don't enable limit + final Settings settings = Settings.builder().put(CAT_SHARDS_LIMIT_SETTING.getKey(), -1).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 shards + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SHARDS); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatShardsWithSettingEnabled_expectBreached() { + // Set limit of 2 shards + final Settings settings = Settings.builder().put(CAT_SHARDS_LIMIT_SETTING.getKey(), 2).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 shards + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SHARDS); + assertTrue(breached); + } + + public void testIsCircuitLimitBreached_forCatShardsWithSettingEnabled_expectNotBreached() { + // Set limit of 3 shards + final Settings settings = Settings.builder().put(CAT_SHARDS_LIMIT_SETTING.getKey(), 3).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 shards + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SHARDS); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatSegmentsWithSettingDisabled_expectNotBreached() { + // Don't enable limit + final Settings settings = Settings.builder().put(CAT_SEGMENTS_LIMIT_SETTING.getKey(), -1).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 indices + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SEGMENTS); + assertFalse(breached); + } + + public void testIsCircuitLimitBreached_forCatSegmentsWithSettingEnabled_expectBreached() { + // Set limit of 1 index + final Settings settings = Settings.builder().put(CAT_SEGMENTS_LIMIT_SETTING.getKey(), 1).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 indices + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SEGMENTS); + assertTrue(breached); + } + + public void testIsCircuitLimitBreached_forCatSegmentsWithSettingEnabled_expectNotBreached() { + // Set limit of 3 indices + final Settings settings = Settings.builder().put(CAT_SEGMENTS_LIMIT_SETTING.getKey(), 5).build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + // Build cluster state with 3 indices + final ClusterState clusterState = buildClusterState("test-index-1", "test-index-2", "test-index-3"); + final boolean breached = requestLimitSettings.isCircuitLimitBreached(clusterState, RequestLimitSettings.BlockAction.CAT_SEGMENTS); + assertFalse(breached); + } + + private static ClusterState buildClusterState(String... indices) { + final Metadata.Builder metadata = Metadata.builder(); + for (String index : indices) { + metadata.put(IndexMetadata.builder(index).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)); + } + final Map indexRoutingTableMap = new HashMap<>(); + for (String s : indices) { + final Index index = new Index(s, "uuid"); + final ShardId shardId = new ShardId(index, 0); + final ShardRouting primaryShardRouting = createShardRouting(shardId, true); + final IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTableBuilder.addShard(primaryShardRouting); + final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(primaryShardRouting) + .addIndexShard(indexShardRoutingTableBuilder.build()); + indexRoutingTableMap.put(index.getName(), indexRoutingTable.build()); + } + final RoutingTable routingTable = new RoutingTable(1, indexRoutingTableMap); + return ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { + return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java index 96b1c75371697..b6666136e2379 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java @@ -43,10 +43,12 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.Table; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; +import org.opensearch.rest.RequestLimitSettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.FakeRestRequest; @@ -137,7 +139,10 @@ public void testBuildTable() { } } - final RestIndicesAction action = new RestIndicesAction(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final Settings settings = Settings.builder().build(); + final RequestLimitSettings requestLimitSettings = new RequestLimitSettings(clusterSettings, settings); + final RestIndicesAction action = new RestIndicesAction(requestLimitSettings); final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetadatas); // now, verify the table is correct