Skip to content

Commit

Permalink
Implementing pagination for _cat/indices
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Aug 5, 2024
1 parent f14b5c8 commit 0cfb0e9
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 8 deletions.
68 changes: 68 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import reactor.util.annotation.NonNull;

import static java.util.Collections.emptyMap;

/**
Expand All @@ -59,9 +61,18 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null, null);
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PaginationMetadata paginationMetadata) {
if (paginationMetadata != null) {
this.paginationMetadata = paginationMetadata;
}
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +241,22 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public boolean isPaginated() {
return paginationMetadata.isResponsePaginated;
}

public String getPaginatedElement() {
return paginationMetadata.paginatedElement;
}

public String getNextToken() {
return paginationMetadata.nextToken;
}

public String getPreviousToken() {
return paginationMetadata.previousToken;
}

/**
* Cell in a table
*
Expand All @@ -254,4 +281,45 @@ public Cell(Object value, Map<String, String> attr) {
this.attr = attr;
}
}

/**
* Pagination metadata for a table.
*
* @opensearch.internal
*/
public static class PaginationMetadata {

/**
* boolean denoting whether the table is paginated or not.
*/
public final boolean isResponsePaginated;

/**
* String denoting the element which is being paginated (for e.g. shards, indices..).
*/
public final String paginatedElement;

/**
* String denoting the next_token of paginated response, which will be used to fetch next page (if any).
*/
public final String nextToken;

/**
* String denoting the previous_token of paginated response, which will be used to fetch previous page (if any).
*/
public final String previousToken;

public PaginationMetadata(
@NonNull boolean isResponsePaginated,
@Nullable String paginatedElement,
@Nullable String nextToken,
@Nullable String previousToken
) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
this.previousToken = previousToken;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.rest.pagination.IndexBasedPaginationStrategy;

import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -95,10 +96,18 @@ public class RestIndicesAction extends AbstractCatAction {
"Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead.";
private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";
private static final String DEFAULT_CAT_INDICES_PAGE_SIZE_STRING = "1000";

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}")));
return unmodifiableList(
asList(
new Route(GET, "/_cat/indices"),
new Route(GET, "/_cat/indices/{index}"),
new Route(GET, "/_cat/V2/indices"),
new Route(GET, "/_cat/V2/indices/{index}")
)
);
}

@Override
Expand Down Expand Up @@ -131,9 +140,13 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
}
clusterManagerTimeout = request.paramAsTime("master_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);
}
// Check for a paginated query.
if (request.path().contains("/V2/indices")) {
return doPaginatedCatRequest(request, client, clusterManagerTimeout, indices);
}

final TimeValue clusterManagerNodeTimeout = clusterManagerTimeout;
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
Expand Down Expand Up @@ -205,6 +218,91 @@ public void onFailure(final Exception e) {
};
}

public RestChannelConsumer doPaginatedCatRequest(
final RestRequest request,
final NodeClient client,
final TimeValue clusterManagerNodeTimeout,
final String[] indices
) {
final boolean local = request.paramAsBoolean("local", false);
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);
final String requestedToken = request.param("next_token");
IndexBasedPaginationStrategy.validateRequestedRequest(requestedToken);
final int pageSize = Integer.parseInt(request.param("max_page_size", DEFAULT_CAT_INDICES_PAGE_SIZE_STRING));
final boolean latestIndicesFirst = request.paramAsBoolean("latest_indices_first", false);

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
public RestResponse buildResponse(final Table table) throws Exception {
return RestTable.buildResponse(table, channel);
}
});

// Fetch all the indices from clusterStateRequest for a paginated query.
sendClusterStateRequest(
indices,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
IndexBasedPaginationStrategy paginationStrategy = new IndexBasedPaginationStrategy(
requestedToken,
pageSize,
latestIndicesFirst,
clusterStateResponse.getState()
);

final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
listener,
new Table.PaginationMetadata(
true,
"indices",
paginationStrategy.getNextToken(),
paginationStrategy.getPreviousToken()
)
);
groupedListener.onResponse(clusterStateResponse);
final String[] indicesToBeQueried = paginationStrategy.getPageElements().toArray(new String[0]);
sendGetSettingsRequest(
indicesToBeQueried,
IndicesOptions.fromRequest(request, IndicesOptions.strictExpand()),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendIndicesStatsRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendClusterHealthRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}
);
};
}

/**
* We're using the Get Settings API here to resolve the authorized indices for the user.
* This is because the Cluster State and Cluster Health APIs do not filter output based
Expand Down Expand Up @@ -288,6 +386,15 @@ private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener
) {
return createGroupedListener(request, size, listener, null);
}

private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener,
final Table.PaginationMetadata paginationMetadata
) {
return new GroupedActionListener<>(new ActionListener<Collection<ActionResponse>>() {
@Override
Expand All @@ -311,7 +418,14 @@ public void onResponse(final Collection<ActionResponse> responses) {
IndicesStatsResponse statsResponse = extractResponse(responses, IndicesStatsResponse.class);
Map<String, IndexStats> indicesStats = statsResponse.getIndices();

Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
Table responseTable = buildTable(
request,
indicesSettings,
indicesHealths,
indicesStats,
indicesStates,
paginationMetadata
);
listener.onResponse(responseTable);
} catch (Exception e) {
onFailure(e);
Expand Down Expand Up @@ -340,7 +454,11 @@ protected Set<String> responseParams() {

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
return getTableWithHeader(request, null);
}

protected Table getTableWithHeader(final RestRequest request, final Table.PaginationMetadata paginationMetadata) {
Table table = new Table(paginationMetadata);
table.startHeaders();
table.addCell("health", "alias:h;desc:current health status");
table.addCell("status", "alias:s;desc:open/close status");
Expand Down Expand Up @@ -709,11 +827,12 @@ Table buildTable(
final Map<String, Settings> indicesSettings,
final Map<String, ClusterIndexHealth> indicesHealths,
final Map<String, IndexStats> indicesStats,
final Map<String, IndexMetadata> indicesMetadatas
final Map<String, IndexMetadata> indicesMetadatas,
final Table.PaginationMetadata paginationMetadata
) {

final String healthParam = request.param("health");
final Table table = getTableWithHeader(request);
final Table table = getTableWithHeader(request, paginationMetadata);

indicesSettings.forEach((indexName, settings) -> {
if (indicesMetadatas.containsKey(indexName) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
XContentBuilder builder = channel.newBuilder();
List<DisplayHeader> displayHeaders = buildDisplayHeaders(table, request);

builder.startArray();
if (table.isPaginated()) {
assert table.getPaginatedElement() != null : "Paginated element is required in-case nextToken is not null";
builder.startObject();
builder.field("previous_token", table.getPreviousToken());
builder.field("next_token", table.getNextToken());
builder.startArray(table.getPaginatedElement());
} else {
builder.startArray();
}
List<Integer> rowOrder = getRowOrder(table, request);
for (Integer row : rowOrder) {
builder.startObject();
Expand All @@ -98,6 +106,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
builder.endObject();
}
builder.endArray();
if (table.isPaginated()) {
builder.endObject();
}
return new BytesRestResponse(RestStatus.OK, builder);
}

Expand Down Expand Up @@ -136,6 +147,13 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
out.append("\n");
}
// Adding a nextToken row, post an empty line, in the response if the table is paginated.
if (table.isPaginated()) {
out.append("previous_token" + " " + table.getPreviousToken());
out.append("\n");
out.append("next_token" + " " + table.getNextToken());
out.append("\n");
}
out.close();
return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes());
}
Expand Down
Loading

0 comments on commit 0cfb0e9

Please sign in to comment.