Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing pagination for _cat/shards #14641

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.list.RestShardsListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -802,9 +805,14 @@ private ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins) {

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
List<AbstractListAction> listActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
if (handler instanceof AbstractListAction && ((AbstractListAction) handler).isActionPaginated()) {
listActions.add((AbstractListAction) handler);
} else {
catActions.add((AbstractCatAction) handler);
}
}
restController.registerHandler(handler);
};
Expand Down Expand Up @@ -980,6 +988,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestShardsListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
Expand Down Expand Up @@ -1011,6 +1022,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));
registerHandler.accept(new RestListAction(listActions));
registerHandler.accept(new RestDecommissionAction());
registerHandler.accept(new RestGetDecommissionStateAction());
registerHandler.accept(new RestRemoteStoreStatsAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;
import org.opensearch.rest.pagination.PageParams;

import java.io.IOException;
import java.util.Map;
Expand All @@ -27,11 +30,27 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private PageParams pageParams = null;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
super(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readTimeValue();
pageParams = PageParams.readPageParams(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeStringArray(indices);
out.writeTimeValue(cancelAfterTimeInterval);
pageParams.writePageParams(out);
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
Expand All @@ -55,6 +74,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}

public PageParams getPageParams() {
return pageParams;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.rest.pagination.PageToken;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of a cat shards request.
Expand All @@ -26,17 +31,29 @@ public class CatShardsResponse extends ActionResponse {
private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;
private List<ShardRouting> responseShards = new ArrayList<>();
private PageToken pageToken = null;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
clusterStateResponse = new ClusterStateResponse(in);
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
responseShards = in.readList(ShardRouting::new);
pageToken = PageToken.readPageToken(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeList(responseShards);
pageToken.writePageToken(out);
}
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
Expand All @@ -54,4 +71,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}

public void setResponseShards(List<ShardRouting> responseShards) {
this.responseShards = responseShards;
}

public List<ShardRouting> getResponseShards() {
return this.responseShards;
}

public void setPageToken(PageToken pageToken) {
this.pageToken = pageToken;
}

public PageToken getPageToken() {
return this.pageToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.rest.pagination.PageParams;
import org.opensearch.rest.pagination.ShardPaginationStrategy;
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.Objects;

/**
* Perform cat shards action
*
Expand All @@ -44,7 +48,11 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis
clusterStateRequest.setShouldCancelOnTimeout(true);
clusterStateRequest.local(shardsRequest.local());
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
if (Objects.nonNull(shardsRequest.getPageParams())) {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
} else {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);
}
assert parentTask instanceof CancellableTask;
clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Expand Down Expand Up @@ -73,11 +81,21 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(shardsRequest.getPageParams(), clusterStateResponse);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
catShardsResponse.setResponseShards(
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(shardsRequest.getIndices());
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
try {
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
Expand Down Expand Up @@ -107,4 +125,8 @@ public void onFailure(Exception e) {
}

}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse {

private Map<ShardRouting, ShardStats> shardStatsMap;

IndicesStatsResponse(StreamInput in) throws IOException {
public IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
}
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.time.DateFormatter;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.pagination.PageToken;

import java.time.Instant;
import java.time.ZoneOffset;
Expand All @@ -59,9 +60,19 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
/**
* paginatedQueryResponse if null will imply the Table response is not paginated.
*/
private PageToken pageToken;
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PageToken pageToken) {
this.pageToken = pageToken;
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +241,10 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public PageToken getPageToken() {
return pageToken;
}

/**
* Cell in a table
*
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ default boolean allowSystemIndexAccessByDefault() {
return false;
}

/**
* Denotes whether the RestHandler will output paginated responses or not.
*/
default boolean isActionPaginated() {
return false;
}

static RestHandler wrapper(RestHandler delegate) {
return new Wrapper(delegate);
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.pagination.PageParams;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -67,6 +68,9 @@

import static org.opensearch.common.unit.TimeValue.parseTimeValue;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
import static org.opensearch.rest.pagination.PageParams.PARAM_NEXT_TOKEN;
import static org.opensearch.rest.pagination.PageParams.PARAM_SIZE;
import static org.opensearch.rest.pagination.PageParams.PARAM_SORT;

/**
* REST Request
Expand Down Expand Up @@ -591,6 +595,10 @@ public static MediaType parseContentType(List<String> header) {
throw new IllegalArgumentException("empty Content-Type header");
}

public PageParams parsePaginatedQueryParams(String defaultSortOrder, int defaultPageSize) {
return new PageParams(param(PARAM_NEXT_TOKEN), param(PARAM_SORT, defaultSortOrder), paramAsInt(PARAM_SIZE, defaultPageSize));
}

/**
* Thrown if there is an error in the content type header.
*
Expand Down
Loading
Loading