Skip to content

Commit

Permalink
Added paginated query request and response classes and removed page t…
Browse files Browse the repository at this point in the history
…oken interface

Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Sep 4, 2024
1 parent 2bc80f0 commit f1b5016
Show file tree
Hide file tree
Showing 12 changed files with 574 additions and 312 deletions.
56 changes: 9 additions & 47 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.time.DateFormatter;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.pagination.PaginatedQueryResponse;

import java.time.Instant;
import java.time.ZoneOffset;
Expand All @@ -43,8 +44,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import reactor.util.annotation.NonNull;

import static java.util.Collections.emptyMap;

/**
Expand All @@ -61,16 +60,17 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null);
/**
* paginatedQueryResponse if null will imply the Table response is not paginated.
*/
private PaginatedQueryResponse paginatedQueryResponse;
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PaginationMetadata paginationMetadata) {
if (paginationMetadata != null) {
this.paginationMetadata = paginationMetadata;
}
public Table(@Nullable PaginatedQueryResponse paginatedQueryResponse) {
this.paginatedQueryResponse = paginatedQueryResponse;
}

public Table startHeaders() {
Expand Down Expand Up @@ -241,16 +241,8 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public boolean isPaginated() {
return paginationMetadata.isResponsePaginated;
}

public String getPaginatedElement() {
return paginationMetadata.paginatedElement;
}

public String getNextToken() {
return paginationMetadata.nextToken;
public PaginatedQueryResponse getPaginatedQueryResponse() {
return paginatedQueryResponse;
}

/**
Expand All @@ -277,34 +269,4 @@ public Cell(Object value, Map<String, String> attr) {
this.attr = attr;
}
}

/**
* Pagination metadata for a table.
*
* @opensearch.internal
*/
public static class PaginationMetadata {

/**
* boolean denoting whether the table is paginated or not.
*/
public final boolean isResponsePaginated;

/**
* String denoting the element which is being paginated (for e.g. shards, indices..).
*/
public final String paginatedElement;

/**
* String denoting the next_token of paginated response, which will be used to fetch next page (if any).
*/
public final String nextToken;

public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
}
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.pagination.PaginatedQueryRequest;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -67,6 +68,9 @@

import static org.opensearch.common.unit.TimeValue.parseTimeValue;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
import static org.opensearch.rest.pagination.PaginatedQueryRequest.PAGINATED_QUERY_PARAM_NEXT_TOKEN_KEY;
import static org.opensearch.rest.pagination.PaginatedQueryRequest.PAGINATED_QUERY_PARAM_SIZE_KEY;
import static org.opensearch.rest.pagination.PaginatedQueryRequest.PAGINATED_QUERY_PARAM_SORT_KEY;

/**
* REST Request
Expand Down Expand Up @@ -591,6 +595,14 @@ public static MediaType parseContentType(List<String> header) {
throw new IllegalArgumentException("empty Content-Type header");
}

public PaginatedQueryRequest parsePaginatedQueryParams(String defaultSortOrder, int defaultPageSize) {
return new PaginatedQueryRequest(
param(PAGINATED_QUERY_PARAM_NEXT_TOKEN_KEY),
param(PAGINATED_QUERY_PARAM_SORT_KEY, defaultSortOrder),
paramAsInt(PAGINATED_QUERY_PARAM_SIZE_KEY, defaultPageSize)
);
}

/**
* Thrown if there is an error in the content type header.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.pagination.PageToken;
import org.opensearch.rest.pagination.PaginatedQueryRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

Expand All @@ -60,7 +59,7 @@
*/
public abstract class AbstractCatAction extends BaseRestHandler {

protected PaginationQueryMetadata paginationQueryMetadata;
protected PaginatedQueryRequest paginatedQueryRequest;

protected abstract RestChannelConsumer doCatRequest(RestRequest request, NodeClient client);

Expand Down Expand Up @@ -91,8 +90,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
};
} else {
if (isActionPaginated()) {
this.paginationQueryMetadata = validateAndGetPaginationMetadata(request);
assert Objects.nonNull(paginationQueryMetadata) : "paginationQueryMetadata can not be null for paginated queries";
this.paginatedQueryRequest = validateAndGetPaginationMetadata(request);
assert Objects.nonNull(paginatedQueryRequest) : "paginatedQueryRequest can not be null for paginated queries";
}
return doCatRequest(request, client);
}
Expand Down Expand Up @@ -120,34 +119,10 @@ public boolean isActionPaginated() {
*
* @return Metadata that can be extracted out from the rest request. Each paginated action to override and provide
* its own implementation. Query params supported by the action specific to pagination along with the respective validations,
* should be added here. The actions would also use the {@param restRequest} to initialise a {@link PageToken}.
* should be added here.
*/
protected PaginationQueryMetadata validateAndGetPaginationMetadata(RestRequest restRequest) {
protected PaginatedQueryRequest validateAndGetPaginationMetadata(RestRequest restRequest) {
return null;
}

/**
* A pagination helper class which would contain requested page token and
* a map of query params required by a paginated API.
*
* @opensearch.internal
*/
public static class PaginationQueryMetadata {
private final Map<String, Object> paginationQueryParams;
private final PageToken requestedPageToken;

public PaginationQueryMetadata(final Map<String, Object> paginationQueryParams, PageToken requestedPageToken) {
this.paginationQueryParams = paginationQueryParams;
this.requestedPageToken = requestedPageToken;
}

public Map<String, Object> getPaginationQueryParams() {
return paginationQueryParams;
}

public PageToken getRequestedPageToken() {
return requestedPageToken;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.rest.pagination.IndexBasedPaginationStrategy;
import org.opensearch.rest.pagination.PaginatedQueryResponse;

import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -152,17 +153,14 @@ public RestResponse buildResponse(final Table table) throws Exception {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
IndexBasedPaginationStrategy paginationStrategy = getPaginationStrategy(clusterStateResponse);
Table.PaginationMetadata paginationMetadata = getTablePaginationMetadata(paginationStrategy);
final String[] indicesToBeQueried = isActionPaginated()
? paginationStrategy.getElementsFromRequestedToken().toArray(new String[0])
: indices;
final String[] indicesToBeQueried = getIndicesToBeQueried(indices, paginationStrategy);

final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
listener,
indicesToBeQueried,
paginationMetadata
getPaginatedQueryResponse(paginationStrategy)
);
groupedListener.onResponse(clusterStateResponse);

Expand All @@ -172,20 +170,19 @@ public void onResponse(final ClusterStateResponse clusterStateResponse) {
// force the IndicesOptions for all the sub-requests to be as inclusive as possible.
final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden();

// Indices that were successfully resolved during the get settings request might be deleted when the subsequent
// cluster
// state, cluster health and indices stats requests execute. We have to distinguish two cases:
// Indices that were successfully resolved during the cluster state request might be deleted when the subsequent
// get settings, cluster health and indices stats requests execute. We have to distinguish two cases:
// 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we want the
// subsequent requests to fail.
// 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent requests
// not to
// fail on the deleted index (as we want to ignore wildcards that cannot be resolved).
// This behavior can be ensured by letting the cluster state, cluster health and indices stats requests re-resolve
// This behavior can be ensured by letting the get settings, cluster health and indices stats requests re-resolve
// the
// index names with the same indices options that we used for the initial cluster state request (strictExpand).
sendGetSettingsRequest(
indicesToBeQueried,
subRequestIndicesOptions,
indicesOptions,
local,
clusterManagerNodeTimeout,
client,
Expand Down Expand Up @@ -301,7 +298,7 @@ private GroupedActionListener<ActionResponse> createGroupedListener(
final int size,
final ActionListener<Table> listener,
final String[] indicesToBeQueried,
final Table.PaginationMetadata paginationMetadata
final PaginatedQueryResponse paginatedQueryResponse
) {
return new GroupedActionListener<>(new ActionListener<Collection<ActionResponse>>() {
@Override
Expand Down Expand Up @@ -332,7 +329,7 @@ public void onResponse(final Collection<ActionResponse> responses) {
indicesStats,
indicesStates,
indicesToBeQueried,
paginationMetadata
paginatedQueryResponse
);
listener.onResponse(responseTable);
} catch (Exception e) {
Expand Down Expand Up @@ -365,8 +362,8 @@ protected Table getTableWithHeader(final RestRequest request) {
return getTableWithHeader(request, null);
}

protected Table getTableWithHeader(final RestRequest request, final Table.PaginationMetadata paginationMetadata) {
Table table = new Table(paginationMetadata);
protected Table getTableWithHeader(final RestRequest request, final PaginatedQueryResponse paginatedQueryResponse) {
Table table = new Table(paginatedQueryResponse);
table.startHeaders();
table.addCell("health", "alias:h;desc:current health status");
table.addCell("status", "alias:s;desc:open/close status");
Expand Down Expand Up @@ -737,11 +734,11 @@ Table buildTable(
final Map<String, IndexStats> indicesStats,
final Map<String, IndexMetadata> indicesMetadatas,
final String[] indicesToBeQueried,
final Table.PaginationMetadata paginationMetadata
final PaginatedQueryResponse paginatedQueryResponse
) {

final String healthParam = request.param("health");
final Table table = getTableWithHeader(request, paginationMetadata);
final Table table = getTableWithHeader(request, paginatedQueryResponse);

if (isActionPaginated() && indicesToBeQueried.length == 0) {
// to handle cases where paginationStrategy couldn't find any indices that should be queried
Expand Down Expand Up @@ -1030,8 +1027,12 @@ protected IndexBasedPaginationStrategy getPaginationStrategy(ClusterStateRespons
return null;
}

protected Table.PaginationMetadata getTablePaginationMetadata(IndexBasedPaginationStrategy paginationStrategy) {
protected PaginatedQueryResponse getPaginatedQueryResponse(IndexBasedPaginationStrategy paginationStrategy) {
return null;
}

protected String[] getIndicesToBeQueried(String[] indices, IndexBasedPaginationStrategy paginationStrategy) {
return indices;
}

}
18 changes: 11 additions & 7 deletions server/src/main/java/org/opensearch/rest/action/cat/RestTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.opensearch.rest.pagination.PaginatedQueryResponse.PAGINATED_RESPONSE_NEXT_TOKEN_KEY;

/**
* a REST table
*
Expand Down Expand Up @@ -88,11 +91,12 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
XContentBuilder builder = channel.newBuilder();
List<DisplayHeader> displayHeaders = buildDisplayHeaders(table, request);

if (table.isPaginated()) {
assert table.getPaginatedElement() != null : "Paginated element is required in-case nextToken is not null";
if (Objects.nonNull(table.getPaginatedQueryResponse())) {
assert Objects.nonNull(table.getPaginatedQueryResponse().getPaginatedElement())
: "Paginated element is required in-case of paginated responses";
builder.startObject();
builder.field("next_token", table.getNextToken());
builder.startArray(table.getPaginatedElement());
builder.field(PAGINATED_RESPONSE_NEXT_TOKEN_KEY, table.getPaginatedQueryResponse().getNextToken());
builder.startArray(table.getPaginatedQueryResponse().getPaginatedElement());
} else {
builder.startArray();
}
Expand All @@ -105,7 +109,7 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
builder.endObject();
}
builder.endArray();
if (table.isPaginated()) {
if (Objects.nonNull(table.getPaginatedQueryResponse())) {
builder.endObject();
}
return new BytesRestResponse(RestStatus.OK, builder);
Expand Down Expand Up @@ -147,8 +151,8 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
out.append("\n");
}
// Adding a new row for next_token, in the response if the table is paginated.
if (table.isPaginated()) {
out.append("next_token" + " " + table.getNextToken());
if (Objects.nonNull(table.getPaginatedQueryResponse())) {
out.append("next_token" + " " + table.getPaginatedQueryResponse().getNextToken());
out.append("\n");
}
out.close();
Expand Down
Loading

0 comments on commit f1b5016

Please sign in to comment.