Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add changes to block calls in cat shards, indices and segments based on dynamic limit settings #15986

Merged
merged 12 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.opensearch.Version.V_2_10_0;
import static org.opensearch.Version.V_2_13_0;
import static org.opensearch.Version.V_2_17_0;
import static org.opensearch.Version.V_2_18_0;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
Expand Down Expand Up @@ -1210,6 +1211,14 @@ public static void registerExceptions() {
V_2_17_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.common.breaker.ResponseLimitBreachedException.class,
org.opensearch.common.breaker.ResponseLimitBreachedException::new,
175,
V_2_18_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.cluster.block.IndexCreateBlockException.class,
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
Expand Down Expand Up @@ -531,6 +532,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ExtensionsManager extensionsManager;
private final ResponseLimitSettings responseLimitSettings;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -583,6 +585,7 @@ public ActionModule(
);

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
responseLimitSettings = new ResponseLimitSettings(clusterSettings, settings);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -968,8 +971,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClusterManagerAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestSegmentsAction());
registerHandler.accept(new RestIndicesAction(responseLimitSettings));
registerHandler.accept(new RestSegmentsAction(responseLimitSettings));
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Expand All @@ -989,7 +992,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestIndicesListAction());
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down Expand Up @@ -1060,6 +1063,8 @@ protected void configure() {

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);

bind(ResponseLimitSettings.class).toInstance(responseLimitSettings);
}

public ActionFilters getActionFilters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
this.requestLimitCheckSupported = false;
sumitasr marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -55,6 +57,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}

public boolean isRequestLimitCheckSupported() {
return this.requestLimitCheckSupported;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.Objects;

import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.SHARDS;

/**
* Perform cat shards action
*
Expand All @@ -31,11 +37,18 @@
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {

private final NodeClient client;
private final ResponseLimitSettings responseLimitSettings;

@Inject
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
public TransportCatShardsAction(
NodeClient client,
TransportService transportService,
ActionFilters actionFilters,
ResponseLimitSettings responseLimitSettings
) {
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
this.client = client;
this.responseLimitSettings = responseLimitSettings;
}

@Override
Expand Down Expand Up @@ -73,6 +86,7 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down Expand Up @@ -107,4 +121,19 @@ public void onFailure(Exception e) {
}

}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
final ActionListener<CatShardsResponse> listener
) {
if (shardsRequest.isRequestLimitCheckSupported()
&& Objects.nonNull(clusterStateResponse)
&& Objects.nonNull(clusterStateResponse.getState())) {
int limit = responseLimitSettings.getCatShardsResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getRoutingTable(), SHARDS, limit)) {
listener.onFailure(new ResponseLimitBreachedException("Too many shards requested.", limit, SHARDS));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.breaker;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Thrown when api response breaches threshold limit.
*
* @opensearch.internal
*/
public class ResponseLimitBreachedException extends OpenSearchException {

private final int responseLimit;
private final ResponseLimitSettings.LimitEntity limitEntity;

public ResponseLimitBreachedException(StreamInput in) throws IOException {
super(in);
responseLimit = in.readVInt();
limitEntity = in.readEnum(ResponseLimitSettings.LimitEntity.class);
}

public ResponseLimitBreachedException(String msg, int responseLimit, ResponseLimitSettings.LimitEntity limitEntity) {
super(msg);
this.responseLimit = responseLimit;
this.limitEntity = limitEntity;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(responseLimit);
out.writeEnum(limitEntity);
}

public int getResponseLimit() {
return responseLimit;
}

public ResponseLimitSettings.LimitEntity getLimitEntity() {
return limitEntity;
}

@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}

@Override
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("response_limit", responseLimit);
builder.field("limit_entity", limitEntity);
}
}
Loading
Loading