From d7d96940dafefd466f702ed1839f88ff8709edb5 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Mon, 2 Sep 2024 10:33:03 +0530 Subject: [PATCH] Adding _list/shards API Signed-off-by: Harsh Garg --- .../org/opensearch/action/ActionModule.java | 10 + .../java/org/opensearch/common/Table.java | 53 ++ .../rest/action/cat/RestShardsAction.java | 663 ++++++++++-------- .../opensearch/rest/action/cat/RestTable.java | 17 +- .../rest/action/list/AbstractListAction.java | 77 ++ .../rest/action/list/RestListAction.java | 58 ++ .../action/list/RestShardsListAction.java | 122 ++++ .../rest/action/list/package-info.java | 12 + .../IndexBasedPaginationStrategy.java | 193 +++++ .../opensearch/rest/pagination/PageToken.java | 41 ++ .../rest/pagination/PaginationStrategy.java | 64 ++ .../ShardBasedPaginationStrategy.java | 250 +++++++ .../rest/pagination/package-info.java | 12 + 13 files changed, 1265 insertions(+), 307 deletions(-) create mode 100644 server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/list/RestListAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/list/package-info.java create mode 100644 server/src/main/java/org/opensearch/rest/pagination/IndexBasedPaginationStrategy.java create mode 100644 server/src/main/java/org/opensearch/rest/pagination/PageToken.java create mode 100644 server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java create mode 100644 server/src/main/java/org/opensearch/rest/pagination/ShardBasedPaginationStrategy.java create mode 100644 server/src/main/java/org/opensearch/rest/pagination/package-info.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 16c15f553951c..daea41789e2b0 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -456,6 +456,9 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; +import org.opensearch.rest.action.list.AbstractListAction; +import org.opensearch.rest.action.list.RestListAction; +import org.opensearch.rest.action.list.RestShardsListAction; import org.opensearch.rest.action.search.RestClearScrollAction; import org.opensearch.rest.action.search.RestCountAction; import org.opensearch.rest.action.search.RestCreatePitAction; @@ -793,9 +796,12 @@ private ActionFilters setupActionFilters(List actionPlugins) { public void initRestHandlers(Supplier nodesInCluster) { List catActions = new ArrayList<>(); + List listActions = new ArrayList<>(); Consumer registerHandler = handler -> { if (handler instanceof AbstractCatAction) { catActions.add((AbstractCatAction) handler); + } else if (handler instanceof AbstractListAction) { + listActions.add((AbstractListAction) handler); } restController.registerHandler(handler); }; @@ -968,6 +974,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + // LIST API + registerHandler.accept(new RestShardsListAction()); + // Point in time API registerHandler.accept(new RestCreatePitAction()); registerHandler.accept(new RestDeletePitAction()); @@ -999,6 +1008,7 @@ public void initRestHandlers(Supplier nodesInCluster) { } } registerHandler.accept(new RestCatAction(catActions)); + registerHandler.accept(new RestListAction(listActions)); registerHandler.accept(new RestDecommissionAction()); registerHandler.accept(new RestGetDecommissionStateAction()); registerHandler.accept(new RestRemoteStoreStatsAction()); diff --git a/server/src/main/java/org/opensearch/common/Table.java b/server/src/main/java/org/opensearch/common/Table.java index da14f628efa0f..a2d0e7990dac2 100644 --- a/server/src/main/java/org/opensearch/common/Table.java +++ b/server/src/main/java/org/opensearch/common/Table.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import reactor.util.annotation.NonNull; + import static java.util.Collections.emptyMap; /** @@ -59,9 +61,18 @@ public class Table { private List currentCells; private boolean inHeaders = false; private boolean withTime = false; + private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null); public static final String EPOCH = "epoch"; public static final String TIMESTAMP = "timestamp"; + public Table() {} + + public Table(@Nullable PaginationMetadata paginationMetadata) { + if (paginationMetadata != null) { + this.paginationMetadata = paginationMetadata; + } + } + public Table startHeaders() { inHeaders = true; currentCells = new ArrayList<>(); @@ -230,6 +241,18 @@ public Map getAliasMap() { return headerAliasMap; } + public boolean isPaginated() { + return paginationMetadata.isResponsePaginated; + } + + public String getPaginatedElement() { + return paginationMetadata.paginatedElement; + } + + public String getNextToken() { + return paginationMetadata.nextToken; + } + /** * Cell in a table * @@ -254,4 +277,34 @@ public Cell(Object value, Map attr) { this.attr = attr; } } + + /** + * Pagination metadata for a table. + * + * @opensearch.internal + */ + public static class PaginationMetadata { + + /** + * boolean denoting whether the table is paginated or not. + */ + public final boolean isResponsePaginated; + + /** + * String denoting the element which is being paginated (for e.g. shards, indices..). + */ + public final String paginatedElement; + + /** + * String denoting the next_token of paginated response, which will be used to fetch next page (if any). + */ + public final String nextToken; + + public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) { + this.isResponsePaginated = isResponsePaginated; + assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated"; + this.paginatedElement = paginatedElement; + this.nextToken = nextToken; + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 4413c8eb370be..d68d15390566f 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -132,328 +132,379 @@ public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) thr @Override protected Table getTableWithHeader(final RestRequest request) { - Table table = new Table(); - table.startHeaders() - .addCell("index", "default:true;alias:i,idx;desc:index name") - .addCell("shard", "default:true;alias:s,sh;desc:shard name") - .addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica") - .addCell("state", "default:true;alias:st;desc:shard state") - .addCell("docs", "alias:d,dc;text-align:right;desc:number of docs in shard") - .addCell("store", "alias:sto;text-align:right;desc:store size of shard (how much disk it uses)") - .addCell("ip", "default:true;desc:ip of node where it lives") - .addCell("id", "default:false;desc:unique id of node where it lives") - .addCell("node", "default:true;alias:n;desc:name of node where it lives"); - - table.addCell("sync_id", "alias:sync_id;default:false;desc:sync id"); - - table.addCell("unassigned.reason", "alias:ur;default:false;desc:reason shard is unassigned"); - table.addCell("unassigned.at", "alias:ua;default:false;desc:time shard became unassigned (UTC)"); - table.addCell("unassigned.for", "alias:uf;default:false;text-align:right;desc:time has been unassigned"); - table.addCell("unassigned.details", "alias:ud;default:false;desc:additional details as to why the shard became unassigned"); - - table.addCell("recoverysource.type", "alias:rs;default:false;desc:recovery source type"); - - table.addCell("completion.size", "alias:cs,completionSize;default:false;text-align:right;desc:size of completion"); - - table.addCell("fielddata.memory_size", "alias:fm,fielddataMemory;default:false;text-align:right;desc:used fielddata cache"); - table.addCell("fielddata.evictions", "alias:fe,fielddataEvictions;default:false;text-align:right;desc:fielddata evictions"); - - table.addCell("query_cache.memory_size", "alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache"); - table.addCell("query_cache.evictions", "alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions"); - - table.addCell("flush.total", "alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes"); - table.addCell("flush.total_time", "alias:ftt,flushTotalTime;default:false;text-align:right;desc:time spent in flush"); - - table.addCell("get.current", "alias:gc,getCurrent;default:false;text-align:right;desc:number of current get ops"); - table.addCell("get.time", "alias:gti,getTime;default:false;text-align:right;desc:time spent in get"); - table.addCell("get.total", "alias:gto,getTotal;default:false;text-align:right;desc:number of get ops"); - table.addCell("get.exists_time", "alias:geti,getExistsTime;default:false;text-align:right;desc:time spent in successful gets"); - table.addCell("get.exists_total", "alias:geto,getExistsTotal;default:false;text-align:right;desc:number of successful gets"); - table.addCell("get.missing_time", "alias:gmti,getMissingTime;default:false;text-align:right;desc:time spent in failed gets"); - table.addCell("get.missing_total", "alias:gmto,getMissingTotal;default:false;text-align:right;desc:number of failed gets"); - - table.addCell( - "indexing.delete_current", - "alias:idc,indexingDeleteCurrent;default:false;text-align:right;desc:number of current deletions" - ); - table.addCell("indexing.delete_time", "alias:idti,indexingDeleteTime;default:false;text-align:right;desc:time spent in deletions"); - table.addCell("indexing.delete_total", "alias:idto,indexingDeleteTotal;default:false;text-align:right;desc:number of delete ops"); - table.addCell( - "indexing.index_current", - "alias:iic,indexingIndexCurrent;default:false;text-align:right;desc:number of current indexing ops" - ); - table.addCell("indexing.index_time", "alias:iiti,indexingIndexTime;default:false;text-align:right;desc:time spent in indexing"); - table.addCell("indexing.index_total", "alias:iito,indexingIndexTotal;default:false;text-align:right;desc:number of indexing ops"); - table.addCell( - "indexing.index_failed", - "alias:iif,indexingIndexFailed;default:false;text-align:right;desc:number of failed indexing ops" - ); - - table.addCell("merges.current", "alias:mc,mergesCurrent;default:false;text-align:right;desc:number of current merges"); - table.addCell( - "merges.current_docs", - "alias:mcd,mergesCurrentDocs;default:false;text-align:right;desc:number of current merging docs" - ); - table.addCell("merges.current_size", "alias:mcs,mergesCurrentSize;default:false;text-align:right;desc:size of current merges"); - table.addCell("merges.total", "alias:mt,mergesTotal;default:false;text-align:right;desc:number of completed merge ops"); - table.addCell("merges.total_docs", "alias:mtd,mergesTotalDocs;default:false;text-align:right;desc:docs merged"); - table.addCell("merges.total_size", "alias:mts,mergesTotalSize;default:false;text-align:right;desc:size merged"); - table.addCell("merges.total_time", "alias:mtt,mergesTotalTime;default:false;text-align:right;desc:time spent in merges"); - - table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes"); - table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes"); - table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes"); - table.addCell( - "refresh.external_time", - "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in external refreshes" - ); - table.addCell( - "refresh.listeners", - "alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners" - ); - - table.addCell("search.fetch_current", "alias:sfc,searchFetchCurrent;default:false;text-align:right;desc:current fetch phase ops"); - table.addCell("search.fetch_time", "alias:sfti,searchFetchTime;default:false;text-align:right;desc:time spent in fetch phase"); - table.addCell("search.fetch_total", "alias:sfto,searchFetchTotal;default:false;text-align:right;desc:total fetch ops"); - table.addCell("search.open_contexts", "alias:so,searchOpenContexts;default:false;text-align:right;desc:open search contexts"); - table.addCell("search.query_current", "alias:sqc,searchQueryCurrent;default:false;text-align:right;desc:current query phase ops"); - table.addCell("search.query_time", "alias:sqti,searchQueryTime;default:false;text-align:right;desc:time spent in query phase"); - table.addCell("search.query_total", "alias:sqto,searchQueryTotal;default:false;text-align:right;desc:total query phase ops"); - table.addCell( - "search.concurrent_query_current", - "alias:scqc,searchConcurrentQueryCurrent;default:false;text-align:right;desc:current concurrent query phase ops" - ); - table.addCell( - "search.concurrent_query_time", - "alias:scqti,searchConcurrentQueryTime;default:false;text-align:right;desc:time spent in concurrent query phase" - ); - table.addCell( - "search.concurrent_query_total", - "alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops" - ); - table.addCell( - "search.concurrent_avg_slice_count", - "alias:casc,searchConcurrentAvgSliceCount;default:false;text-align:right;desc:average query concurrency" - ); - table.addCell("search.scroll_current", "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts"); - table.addCell( - "search.scroll_time", - "alias:scti,searchScrollTime;default:false;text-align:right;desc:time scroll contexts held open" - ); - table.addCell("search.scroll_total", "alias:scto,searchScrollTotal;default:false;text-align:right;desc:completed scroll contexts"); - table.addCell( - "search.point_in_time_current", - "alias:spc,searchPointInTimeCurrent;default:false;text-align:right;desc:open point in time contexts" - ); - table.addCell( - "search.point_in_time_time", - "alias:spti,searchPointInTimeTime;default:false;text-align:right;desc:time point in time contexts held open" - ); - table.addCell( - "search.point_in_time_total", - "alias:spto,searchPointInTimeTotal;default:false;text-align:right;desc:completed point in time contexts" - ); - table.addCell( - "search.search_idle_reactivate_count_total", - "alias:ssirct,searchSearchIdleReactivateCountTotal;default:false;text-align:right;desc:number of times a shard reactivated" - ); - - table.addCell("segments.count", "alias:sc,segmentsCount;default:false;text-align:right;desc:number of segments"); - table.addCell("segments.memory", "alias:sm,segmentsMemory;default:false;text-align:right;desc:memory used by segments"); - table.addCell( - "segments.index_writer_memory", - "alias:siwm,segmentsIndexWriterMemory;default:false;text-align:right;desc:memory used by index writer" - ); - table.addCell( - "segments.version_map_memory", - "alias:svmm,segmentsVersionMapMemory;default:false;text-align:right;desc:memory used by version map" - ); - table.addCell( - "segments.fixed_bitset_memory", - "alias:sfbm,fixedBitsetMemory;default:false;text-align:right;desc:memory used by fixed bit sets for nested object" - + " field types and type filters for types referred in _parent fields" - ); + return RestShardsActionCommonUtils.getTableWithHeader(request); + } - table.addCell("seq_no.max", "alias:sqm,maxSeqNo;default:false;text-align:right;desc:max sequence number"); - table.addCell("seq_no.local_checkpoint", "alias:sql,localCheckpoint;default:false;text-align:right;desc:local checkpoint"); - table.addCell("seq_no.global_checkpoint", "alias:sqg,globalCheckpoint;default:false;text-align:right;desc:global checkpoint"); + // package private for testing + Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) { + return RestShardsActionCommonUtils.buildTable(request, state, stats); + } - table.addCell("warmer.current", "alias:wc,warmerCurrent;default:false;text-align:right;desc:current warmer ops"); - table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops"); - table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers"); + public static class RestShardsActionCommonUtils { - table.addCell("path.data", "alias:pd,dataPath;default:false;text-align:right;desc:shard data path"); - table.addCell("path.state", "alias:ps,statsPath;default:false;text-align:right;desc:shard state path"); - table.addCell("docs.deleted", "alias:dd,docsDeleted;default:false;text-align:right;desc:number of deleted docs in shard"); + public static Table getTableWithHeader(final RestRequest request) { + return getTableWithHeader(request, null); + } - table.endHeaders(); - return table; - } + public static Table getTableWithHeader(final RestRequest request, Table.PaginationMetadata paginationMetadata) { + Table table = new Table(paginationMetadata); + table.startHeaders() + .addCell("index", "default:true;alias:i,idx;desc:index name") + .addCell("shard", "default:true;alias:s,sh;desc:shard name") + .addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica") + .addCell("state", "default:true;alias:st;desc:shard state") + .addCell("docs", "alias:d,dc;text-align:right;desc:number of docs in shard") + .addCell("store", "alias:sto;text-align:right;desc:store size of shard (how much disk it uses)") + .addCell("ip", "default:true;desc:ip of node where it lives") + .addCell("id", "default:false;desc:unique id of node where it lives") + .addCell("node", "default:true;alias:n;desc:name of node where it lives"); + + table.addCell("sync_id", "alias:sync_id;default:false;desc:sync id"); + + table.addCell("unassigned.reason", "alias:ur;default:false;desc:reason shard is unassigned"); + table.addCell("unassigned.at", "alias:ua;default:false;desc:time shard became unassigned (UTC)"); + table.addCell("unassigned.for", "alias:uf;default:false;text-align:right;desc:time has been unassigned"); + table.addCell("unassigned.details", "alias:ud;default:false;desc:additional details as to why the shard became unassigned"); + + table.addCell("recoverysource.type", "alias:rs;default:false;desc:recovery source type"); + + table.addCell("completion.size", "alias:cs,completionSize;default:false;text-align:right;desc:size of completion"); + + table.addCell("fielddata.memory_size", "alias:fm,fielddataMemory;default:false;text-align:right;desc:used fielddata cache"); + table.addCell("fielddata.evictions", "alias:fe,fielddataEvictions;default:false;text-align:right;desc:fielddata evictions"); + + table.addCell("query_cache.memory_size", "alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache"); + table.addCell( + "query_cache.evictions", + "alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions" + ); + + table.addCell("flush.total", "alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes"); + table.addCell("flush.total_time", "alias:ftt,flushTotalTime;default:false;text-align:right;desc:time spent in flush"); + + table.addCell("get.current", "alias:gc,getCurrent;default:false;text-align:right;desc:number of current get ops"); + table.addCell("get.time", "alias:gti,getTime;default:false;text-align:right;desc:time spent in get"); + table.addCell("get.total", "alias:gto,getTotal;default:false;text-align:right;desc:number of get ops"); + table.addCell("get.exists_time", "alias:geti,getExistsTime;default:false;text-align:right;desc:time spent in successful gets"); + table.addCell("get.exists_total", "alias:geto,getExistsTotal;default:false;text-align:right;desc:number of successful gets"); + table.addCell("get.missing_time", "alias:gmti,getMissingTime;default:false;text-align:right;desc:time spent in failed gets"); + table.addCell("get.missing_total", "alias:gmto,getMissingTotal;default:false;text-align:right;desc:number of failed gets"); + + table.addCell( + "indexing.delete_current", + "alias:idc,indexingDeleteCurrent;default:false;text-align:right;desc:number of current deletions" + ); + table.addCell( + "indexing.delete_time", + "alias:idti,indexingDeleteTime;default:false;text-align:right;desc:time spent in deletions" + ); + table.addCell( + "indexing.delete_total", + "alias:idto,indexingDeleteTotal;default:false;text-align:right;desc:number of delete ops" + ); + table.addCell( + "indexing.index_current", + "alias:iic,indexingIndexCurrent;default:false;text-align:right;desc:number of current indexing ops" + ); + table.addCell("indexing.index_time", "alias:iiti,indexingIndexTime;default:false;text-align:right;desc:time spent in indexing"); + table.addCell( + "indexing.index_total", + "alias:iito,indexingIndexTotal;default:false;text-align:right;desc:number of indexing ops" + ); + table.addCell( + "indexing.index_failed", + "alias:iif,indexingIndexFailed;default:false;text-align:right;desc:number of failed indexing ops" + ); + + table.addCell("merges.current", "alias:mc,mergesCurrent;default:false;text-align:right;desc:number of current merges"); + table.addCell( + "merges.current_docs", + "alias:mcd,mergesCurrentDocs;default:false;text-align:right;desc:number of current merging docs" + ); + table.addCell("merges.current_size", "alias:mcs,mergesCurrentSize;default:false;text-align:right;desc:size of current merges"); + table.addCell("merges.total", "alias:mt,mergesTotal;default:false;text-align:right;desc:number of completed merge ops"); + table.addCell("merges.total_docs", "alias:mtd,mergesTotalDocs;default:false;text-align:right;desc:docs merged"); + table.addCell("merges.total_size", "alias:mts,mergesTotalSize;default:false;text-align:right;desc:size merged"); + table.addCell("merges.total_time", "alias:mtt,mergesTotalTime;default:false;text-align:right;desc:time spent in merges"); + + table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes"); + table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes"); + table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes"); + table.addCell( + "refresh.external_time", + "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in external refreshes" + ); + table.addCell( + "refresh.listeners", + "alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners" + ); + + table.addCell( + "search.fetch_current", + "alias:sfc,searchFetchCurrent;default:false;text-align:right;desc:current fetch phase ops" + ); + table.addCell("search.fetch_time", "alias:sfti,searchFetchTime;default:false;text-align:right;desc:time spent in fetch phase"); + table.addCell("search.fetch_total", "alias:sfto,searchFetchTotal;default:false;text-align:right;desc:total fetch ops"); + table.addCell("search.open_contexts", "alias:so,searchOpenContexts;default:false;text-align:right;desc:open search contexts"); + table.addCell( + "search.query_current", + "alias:sqc,searchQueryCurrent;default:false;text-align:right;desc:current query phase ops" + ); + table.addCell("search.query_time", "alias:sqti,searchQueryTime;default:false;text-align:right;desc:time spent in query phase"); + table.addCell("search.query_total", "alias:sqto,searchQueryTotal;default:false;text-align:right;desc:total query phase ops"); + table.addCell( + "search.concurrent_query_current", + "alias:scqc,searchConcurrentQueryCurrent;default:false;text-align:right;desc:current concurrent query phase ops" + ); + table.addCell( + "search.concurrent_query_time", + "alias:scqti,searchConcurrentQueryTime;default:false;text-align:right;desc:time spent in concurrent query phase" + ); + table.addCell( + "search.concurrent_query_total", + "alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops" + ); + table.addCell( + "search.concurrent_avg_slice_count", + "alias:casc,searchConcurrentAvgSliceCount;default:false;text-align:right;desc:average query concurrency" + ); + table.addCell( + "search.scroll_current", + "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts" + ); + table.addCell( + "search.scroll_time", + "alias:scti,searchScrollTime;default:false;text-align:right;desc:time scroll contexts held open" + ); + table.addCell( + "search.scroll_total", + "alias:scto,searchScrollTotal;default:false;text-align:right;desc:completed scroll contexts" + ); + table.addCell( + "search.point_in_time_current", + "alias:spc,searchPointInTimeCurrent;default:false;text-align:right;desc:open point in time contexts" + ); + table.addCell( + "search.point_in_time_time", + "alias:spti,searchPointInTimeTime;default:false;text-align:right;desc:time point in time contexts held open" + ); + table.addCell( + "search.point_in_time_total", + "alias:spto,searchPointInTimeTotal;default:false;text-align:right;desc:completed point in time contexts" + ); + table.addCell( + "search.search_idle_reactivate_count_total", + "alias:ssirct,searchSearchIdleReactivateCountTotal;default:false;text-align:right;desc:number of times a shard reactivated" + ); + + table.addCell("segments.count", "alias:sc,segmentsCount;default:false;text-align:right;desc:number of segments"); + table.addCell("segments.memory", "alias:sm,segmentsMemory;default:false;text-align:right;desc:memory used by segments"); + table.addCell( + "segments.index_writer_memory", + "alias:siwm,segmentsIndexWriterMemory;default:false;text-align:right;desc:memory used by index writer" + ); + table.addCell( + "segments.version_map_memory", + "alias:svmm,segmentsVersionMapMemory;default:false;text-align:right;desc:memory used by version map" + ); + table.addCell( + "segments.fixed_bitset_memory", + "alias:sfbm,fixedBitsetMemory;default:false;text-align:right;desc:memory used by fixed bit sets for nested object" + + " field types and type filters for types referred in _parent fields" + ); + + table.addCell("seq_no.max", "alias:sqm,maxSeqNo;default:false;text-align:right;desc:max sequence number"); + table.addCell("seq_no.local_checkpoint", "alias:sql,localCheckpoint;default:false;text-align:right;desc:local checkpoint"); + table.addCell("seq_no.global_checkpoint", "alias:sqg,globalCheckpoint;default:false;text-align:right;desc:global checkpoint"); + + table.addCell("warmer.current", "alias:wc,warmerCurrent;default:false;text-align:right;desc:current warmer ops"); + table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops"); + table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers"); + + table.addCell("path.data", "alias:pd,dataPath;default:false;text-align:right;desc:shard data path"); + table.addCell("path.state", "alias:ps,statsPath;default:false;text-align:right;desc:shard state path"); + table.addCell("docs.deleted", "alias:dd,docsDeleted;default:false;text-align:right;desc:number of deleted docs in shard"); + + table.endHeaders(); + return table; + } - private static Object getOrNull(S stats, Function accessor, Function func) { - if (stats != null) { - T t = accessor.apply(stats); - if (t != null) { - return func.apply(t); - } + public static Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) { + return buildTable(request, state, stats, state.getState().routingTable().allShards(), null); } - return null; - } - // package private for testing - Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) { - Table table = getTableWithHeader(request); - - for (ShardRouting shard : state.getState().routingTable().allShards()) { - ShardStats shardStats = stats.asMap().get(shard); - CommonStats commonStats = null; - CommitStats commitStats = null; - if (shardStats != null) { - commonStats = shardStats.getStats(); - commitStats = shardStats.getCommitStats(); - } + public static Table buildTable( + RestRequest request, + ClusterStateResponse state, + IndicesStatsResponse stats, + List shardRoutingList, + Table.PaginationMetadata paginationMetadata + ) { + Table table = getTableWithHeader(request, paginationMetadata); + for (ShardRouting shard : shardRoutingList) { + ShardStats shardStats = stats.asMap().get(shard); + CommonStats commonStats = null; + CommitStats commitStats = null; + if (shardStats != null) { + commonStats = shardStats.getStats(); + commitStats = shardStats.getCommitStats(); + } - table.startRow(); + table.startRow(); - table.addCell(shard.getIndexName()); - table.addCell(shard.id()); + table.addCell(shard.getIndexName()); + table.addCell(shard.id()); - if (shard.primary()) { - table.addCell("p"); - } else { - table.addCell("r"); - } - table.addCell(shard.state()); - table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount)); - table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::getSize)); - if (shard.assignedToNode()) { - String ip = state.getState().nodes().get(shard.currentNodeId()).getHostAddress(); - String nodeId = shard.currentNodeId(); - StringBuilder name = new StringBuilder(); - name.append(state.getState().nodes().get(shard.currentNodeId()).getName()); - if (shard.relocating()) { - String reloIp = state.getState().nodes().get(shard.relocatingNodeId()).getHostAddress(); - String reloNme = state.getState().nodes().get(shard.relocatingNodeId()).getName(); - String reloNodeId = shard.relocatingNodeId(); - name.append(" -> "); - name.append(reloIp); - name.append(" "); - name.append(reloNodeId); - name.append(" "); - name.append(reloNme); + if (shard.primary()) { + table.addCell("p"); + } else { + table.addCell("r"); + } + table.addCell(shard.state()); + table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount)); + table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::getSize)); + if (shard.assignedToNode()) { + String ip = state.getState().nodes().get(shard.currentNodeId()).getHostAddress(); + String nodeId = shard.currentNodeId(); + StringBuilder name = new StringBuilder(); + name.append(state.getState().nodes().get(shard.currentNodeId()).getName()); + if (shard.relocating()) { + String reloIp = state.getState().nodes().get(shard.relocatingNodeId()).getHostAddress(); + String reloNme = state.getState().nodes().get(shard.relocatingNodeId()).getName(); + String reloNodeId = shard.relocatingNodeId(); + name.append(" -> "); + name.append(reloIp); + name.append(" "); + name.append(reloNodeId); + name.append(" "); + name.append(reloNme); + } + table.addCell(ip); + table.addCell(nodeId); + table.addCell(name); + } else { + table.addCell(null); + table.addCell(null); + table.addCell(null); } - table.addCell(ip); - table.addCell(nodeId); - table.addCell(name); - } else { - table.addCell(null); - table.addCell(null); - table.addCell(null); - } - table.addCell(commitStats == null ? null : commitStats.getUserData().get(Engine.SYNC_COMMIT_ID)); - - if (shard.unassignedInfo() != null) { - table.addCell(shard.unassignedInfo().getReason()); - Instant unassignedTime = Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis()); - table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime)); - table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getUnassignedTimeInMillis())); - table.addCell(shard.unassignedInfo().getDetails()); - } else { - table.addCell(null); - table.addCell(null); - table.addCell(null); - table.addCell(null); - } + table.addCell(commitStats == null ? null : commitStats.getUserData().get(Engine.SYNC_COMMIT_ID)); + + if (shard.unassignedInfo() != null) { + table.addCell(shard.unassignedInfo().getReason()); + Instant unassignedTime = Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis()); + table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.format(unassignedTime)); + table.addCell( + TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getUnassignedTimeInMillis()) + ); + table.addCell(shard.unassignedInfo().getDetails()); + } else { + table.addCell(null); + table.addCell(null); + table.addCell(null); + table.addCell(null); + } - if (shard.recoverySource() != null) { - table.addCell(shard.recoverySource().getType().toString().toLowerCase(Locale.ROOT)); - } else { - table.addCell(null); + if (shard.recoverySource() != null) { + table.addCell(shard.recoverySource().getType().toString().toLowerCase(Locale.ROOT)); + } else { + table.addCell(null); + } + + table.addCell(getOrNull(commonStats, CommonStats::getCompletion, CompletionStats::getSize)); + + table.addCell(getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getMemorySize)); + table.addCell(getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getEvictions)); + + table.addCell(getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getMemorySize)); + table.addCell(getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getEvictions)); + + table.addCell(getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotalTime)); + + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::current)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getTime)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getCount)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsTime)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsCount)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingTime)); + table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingCount)); + + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteTime())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteCount())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexTime())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexCount())); + table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexFailedCount())); + + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrent)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentNumDocs)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentSize)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalNumDocs)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalSize)); + table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalTime)); + + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotalTime)); + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotal)); + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotalTime)); + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getListeners)); + + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, SearchStats::getOpenContexts)); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentAvgSliceCount())); + + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getSearchIdleReactivateCount())); + + table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)); + table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)); + table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)); + table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)); + table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)); + + table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getMaxSeqNo)); + table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getLocalCheckpoint)); + table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)); + + table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::current)); + table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::total)); + table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::totalTime)); + + table.addCell(getOrNull(shardStats, ShardStats::getDataPath, s -> s)); + table.addCell(getOrNull(shardStats, ShardStats::getStatePath, s -> s)); + table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getDeleted)); + + table.endRow(); } - table.addCell(getOrNull(commonStats, CommonStats::getCompletion, CompletionStats::getSize)); - - table.addCell(getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getMemorySize)); - table.addCell(getOrNull(commonStats, CommonStats::getFieldData, FieldDataStats::getEvictions)); - - table.addCell(getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getMemorySize)); - table.addCell(getOrNull(commonStats, CommonStats::getQueryCache, QueryCacheStats::getEvictions)); - - table.addCell(getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotal)); - table.addCell(getOrNull(commonStats, CommonStats::getFlush, FlushStats::getTotalTime)); - - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::current)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getTime)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getCount)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsTime)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getExistsCount)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingTime)); - table.addCell(getOrNull(commonStats, CommonStats::getGet, GetStats::getMissingCount)); - - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteTime())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getDeleteCount())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexTime())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexCount())); - table.addCell(getOrNull(commonStats, CommonStats::getIndexing, i -> i.getTotal().getIndexFailedCount())); - - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrent)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentNumDocs)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getCurrentSize)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotal)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalNumDocs)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalSize)); - table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalTime)); - - table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotal)); - table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotalTime)); - table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotal)); - table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotalTime)); - table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getListeners)); - - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchTime())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchCount())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, SearchStats::getOpenContexts)); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryTime())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCount())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryTime())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCount())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentAvgSliceCount())); - - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollTime())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCount())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCurrent())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitTime())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCount())); - table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getSearchIdleReactivateCount())); - - table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount)); - table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory)); - table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getIndexWriterMemory)); - table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getVersionMapMemory)); - table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getBitsetMemory)); - - table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getMaxSeqNo)); - table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getLocalCheckpoint)); - table.addCell(getOrNull(shardStats, ShardStats::getSeqNoStats, SeqNoStats::getGlobalCheckpoint)); - - table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::current)); - table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::total)); - table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::totalTime)); - - table.addCell(getOrNull(shardStats, ShardStats::getDataPath, s -> s)); - table.addCell(getOrNull(shardStats, ShardStats::getStatePath, s -> s)); - table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getDeleted)); - - table.endRow(); + return table; } - return table; + private static Object getOrNull(S stats, Function accessor, Function func) { + if (stats != null) { + T t = accessor.apply(stats); + if (t != null) { + return func.apply(t); + } + } + return null; + } } + } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java index 4f1090b163ee6..1eecc70b7e213 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java @@ -88,7 +88,14 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel XContentBuilder builder = channel.newBuilder(); List displayHeaders = buildDisplayHeaders(table, request); - builder.startArray(); + if (table.isPaginated()) { + assert table.getPaginatedElement() != null : "Paginated element is required in-case nextToken is not null"; + builder.startObject(); + builder.field("next_token", table.getNextToken()); + builder.startArray(table.getPaginatedElement()); + } else { + builder.startArray(); + } List rowOrder = getRowOrder(table, request); for (Integer row : rowOrder) { builder.startObject(); @@ -98,6 +105,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel builder.endObject(); } builder.endArray(); + if (table.isPaginated()) { + builder.endObject(); + } return new BytesRestResponse(RestStatus.OK, builder); } @@ -136,6 +146,11 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann } out.append("\n"); } + // Adding a new row for next_token, in the response if the table is paginated. + if (table.isPaginated()) { + out.append("next_token" + " " + table.getNextToken()); + out.append("\n"); + } out.close(); return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes()); } diff --git a/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java b/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java new file mode 100644 index 0000000000000..ff32e3c49434f --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Table; +import org.opensearch.common.io.Streams; +import org.opensearch.common.io.UTF8StreamWriter; +import org.opensearch.core.common.io.stream.BytesStream; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.rest.action.cat.RestTable.buildHelpWidths; +import static org.opensearch.rest.action.cat.RestTable.pad; + +/** + * Base Transport action class for _list APIs + * + * @opensearch.api + */ +public abstract class AbstractListAction extends BaseRestHandler { + protected abstract RestChannelConsumer doListRequest(RestRequest request, NodeClient client); + + protected abstract void documentation(StringBuilder sb); + + protected abstract Table getTableWithHeader(RestRequest request); + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + boolean helpWanted = request.paramAsBoolean("help", false); + if (helpWanted) { + return channel -> { + Table table = getTableWithHeader(request); + int[] width = buildHelpWidths(table, request); + BytesStream bytesOutput = Streams.flushOnCloseStream(channel.bytesOutput()); + UTF8StreamWriter out = new UTF8StreamWriter().setOutput(bytesOutput); + for (Table.Cell cell : table.getHeaders()) { + // need to do left-align always, so create new cells + pad(new Table.Cell(cell.value), width[0], request, out); + out.append(" | "); + pad(new Table.Cell(cell.attr.containsKey("alias") ? cell.attr.get("alias") : ""), width[1], request, out); + out.append(" | "); + pad(new Table.Cell(cell.attr.containsKey("desc") ? cell.attr.get("desc") : "not available"), width[2], request, out); + out.append("\n"); + } + out.close(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOutput.bytes())); + }; + } else { + return doListRequest(request, client); + } + } + + static Set RESPONSE_PARAMS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList("format", "h", "v", "ts", "pri", "bytes", "size", "time", "s", "timeout")) + ); + + @Override + protected Set responseParams() { + return RESPONSE_PARAMS; + } + +} diff --git a/server/src/main/java/org/opensearch/rest/action/list/RestListAction.java b/server/src/main/java/org/opensearch/rest/action/list/RestListAction.java new file mode 100644 index 0000000000000..4b8551ea7e14a --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/RestListAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Base _list API endpoint + * + * @opensearch.api + */ +public class RestListAction extends BaseRestHandler { + + private static final String LIST = ":‑|"; + private static final String LIST_NL = LIST + "\n"; + private final String HELP; + + public RestListAction(List listActions) { + StringBuilder sb = new StringBuilder(); + sb.append(LIST_NL); + for (AbstractListAction listAction : listActions) { + listAction.documentation(sb); + } + HELP = sb.toString(); + } + + @Override + public List routes() { + return singletonList(new Route(GET, "/_list")); + } + + @Override + public String getName() { + return "list_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, HELP)); + } + +} diff --git a/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java b/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java new file mode 100644 index 0000000000000..8ed1cc01892cc --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Table; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; +import org.opensearch.rest.action.cat.RestShardsAction; +import org.opensearch.rest.action.cat.RestTable; +import org.opensearch.rest.pagination.ShardBasedPaginationStrategy; + +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * _list API action to output shards in pages. + * + * @opensearch.api + */ +public class RestShardsListAction extends AbstractListAction { + + private static final int DEFAULT_LIST_SHARDS_PAGE_SIZE_STRING = 5000; + private static final String PAGINATED_ELEMENT_KEY = "shards"; + + @Override + public List routes() { + return unmodifiableList(asList(new RestHandler.Route(GET, "/_list/shards"), new RestHandler.Route(GET, "/_list/shards/{index}"))); + } + + @Override + public String getName() { + return "list_shards_action"; + } + + @Override + public boolean allowSystemIndexAccessByDefault() { + return true; + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_list/shards\n"); + sb.append("/_list/shards/{index}\n"); + } + + @Override + public BaseRestHandler.RestChannelConsumer doListRequest(final RestRequest request, final NodeClient client) { + final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); + clusterStateRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) + ); + clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices).metadata(true); + final String requestedToken = request.param("next_token"); + final int pageSize = request.paramAsInt("size", DEFAULT_LIST_SHARDS_PAGE_SIZE_STRING); + if (pageSize < DEFAULT_LIST_SHARDS_PAGE_SIZE_STRING) { + throw new IllegalArgumentException("size should be greater than or equal to [" + DEFAULT_LIST_SHARDS_PAGE_SIZE_STRING + "]"); + } + final String requestedSortOrder = request.param("sort", "ascending"); + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + @Override + public void processResponse(final ClusterStateResponse clusterStateResponse) { + ShardBasedPaginationStrategy paginationStrategy = new ShardBasedPaginationStrategy( + requestedToken == null ? null : new ShardBasedPaginationStrategy.ShardStrategyPageToken(requestedToken), + pageSize, + requestedSortOrder, + clusterStateResponse.getState() + ); + + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.all(); + indicesStatsRequest.indices(paginationStrategy.getIndicesFromRequestedToken().toArray(new String[0])); + client.admin().indices().stats(indicesStatsRequest, new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) throws Exception { + return RestTable.buildResponse( + RestShardsAction.RestShardsActionCommonUtils.buildTable( + request, + clusterStateResponse, + indicesStatsResponse, + paginationStrategy.getElementsFromRequestedToken(), + new Table.PaginationMetadata( + true, + PAGINATED_ELEMENT_KEY, + paginationStrategy.getNextToken() == null + ? null + : paginationStrategy.getNextToken().generateEncryptedToken() + ) + ), + channel + ); + } + }); + } + }); + } + + @Override + protected Table getTableWithHeader(final RestRequest request) { + return RestShardsAction.RestShardsActionCommonUtils.getTableWithHeader(request, null); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/list/package-info.java b/server/src/main/java/org/opensearch/rest/action/list/package-info.java new file mode 100644 index 0000000000000..8d6563ff9b344 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * {@link org.opensearch.rest.RestHandler}s for actions that list out results in chunks of pages. + */ +package org.opensearch.rest.action.list; diff --git a/server/src/main/java/org/opensearch/rest/pagination/IndexBasedPaginationStrategy.java b/server/src/main/java/org/opensearch/rest/pagination/IndexBasedPaginationStrategy.java new file mode 100644 index 0000000000000..25dd50a5ebb39 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/pagination/IndexBasedPaginationStrategy.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.pagination; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.cluster.ClusterState; +import org.opensearch.common.Nullable; + +import java.util.Base64; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * This strategy can be used by the Rest APIs wanting to paginate the responses based on Indices. + * The strategy considers create timestamps of indices as the keys to iterate over pages. + * + * @opensearch.internal + */ +public class IndexBasedPaginationStrategy implements PaginationStrategy { + + private static final String DESCENDING_SORT_PARAM_VALUE = "descending"; + private final IndexStrategyPageToken nextToken; + private final List indicesFromRequestedToken; + + private static final String INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE = + "Parameter [next_token] has been tainted and is incorrect. Please provide a valid [next_token]."; + + public IndexBasedPaginationStrategy( + @Nullable IndexStrategyPageToken requestedToken, + int maxPageSize, + String sortOrder, + ClusterState clusterState + ) { + // Get sorted list of indices from metadata and filter out the required number of indices + List sortedIndicesList = getListOfIndicesSortedByCreateTime(clusterState, sortOrder, requestedToken); + final int newPageStartIndexNumber = getNewPageIndexStartNumber(requestedToken, sortedIndicesList, clusterState); // inclusive + int newPageEndIndexNumber = Math.min(newPageStartIndexNumber + maxPageSize, sortedIndicesList.size()); // exclusive + this.indicesFromRequestedToken = sortedIndicesList.subList(newPageStartIndexNumber, newPageEndIndexNumber); + long queryStartTime = requestedToken == null + ? clusterState.metadata().indices().get(sortedIndicesList.get(sortedIndicesList.size() - 1)).getCreationDate() + : requestedToken.queryStartTime; + IndexStrategyPageToken nextPageToken = new IndexStrategyPageToken( + newPageEndIndexNumber, + clusterState.metadata().indices().get(sortedIndicesList.get(newPageEndIndexNumber - 1)).getCreationDate(), + queryStartTime, + sortedIndicesList.get(newPageEndIndexNumber - 1) + ); + this.nextToken = newPageEndIndexNumber >= sortedIndicesList.size() ? null : nextPageToken; + } + + @Override + @Nullable + public PageToken getNextToken() { + return nextToken; + } + + @Override + @Nullable + public List getElementsFromRequestedToken() { + return indicesFromRequestedToken; + } + + private List getListOfIndicesSortedByCreateTime( + final ClusterState clusterState, + String sortOrder, + IndexStrategyPageToken requestedPageToken + ) { + long latestValidIndexCreateTime = requestedPageToken == null ? Long.MAX_VALUE : requestedPageToken.queryStartTime; + // Filter out the indices which have been created after the latest index which was present when paginated query started. + // Also, sort the indices list based on their creation timestamps + return clusterState.getRoutingTable() + .getIndicesRouting() + .keySet() + .stream() + .filter(index -> (latestValidIndexCreateTime - clusterState.metadata().indices().get(index).getCreationDate()) >= 0) + .sorted((index1, index2) -> { + Long index1CreationTimeStamp = clusterState.metadata().indices().get(index1).getCreationDate(); + Long index2CreationTimeStamp = clusterState.metadata().indices().get(index2).getCreationDate(); + if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) { + return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder) ? index2.compareTo(index1) : index1.compareTo(index2); + } + return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder) + ? Long.compare(index2CreationTimeStamp, index1CreationTimeStamp) + : Long.compare(index1CreationTimeStamp, index2CreationTimeStamp); + }) + .collect(Collectors.toList()); + } + + private int getNewPageIndexStartNumber( + final IndexStrategyPageToken requestedPageToken, + final List sortedIndicesList, + final ClusterState clusterState + ) { + if (Objects.isNull(requestedPageToken)) { + return 0; + } + int newPageStartIndexNumber = Math.min(requestedPageToken.posToStartPage, sortedIndicesList.size() - 1); + if (newPageStartIndexNumber > 0 + && !Objects.equals(sortedIndicesList.get(newPageStartIndexNumber - 1), requestedPageToken.nameOfLastRespondedIndex)) { + // case denoting an already responded index has been deleted while the paginated queries are being executed + // find the index whose creation time is just after the index which was last responded + newPageStartIndexNumber--; + while (newPageStartIndexNumber > 0) { + if (clusterState.metadata() + .indices() + .get(sortedIndicesList.get(newPageStartIndexNumber - 1)) + .getCreationDate() < requestedPageToken.creationTimeOfLastRespondedIndex) { + break; + } + newPageStartIndexNumber--; + } + } + return newPageStartIndexNumber; + } + + /** + * Token to be used by {@link IndexBasedPaginationStrategy}. + * Token would like: IndexNumberToStartTheNextPageFrom + $ + CreationTimeOfLastRespondedIndex + $ + + * QueryStartTime + $ + NameOfLastRespondedIndex + */ + public static class IndexStrategyPageToken implements PageToken { + + private final int posToStartPage; + private final long creationTimeOfLastRespondedIndex; + private final long queryStartTime; + private final String nameOfLastRespondedIndex; + + /** + * Will perform simple validations on token received in the request and initialize the data members. + * The token should be base64 encoded, and should contain the expected number of elements separated by "$". + * The timestamps should also be a valid long. + * + * @param requestedTokenString string denoting the encoded next token requested by the user + */ + public IndexStrategyPageToken(String requestedTokenString) { + Objects.requireNonNull(requestedTokenString, "requestedTokenString can not be null"); + try { + requestedTokenString = new String(Base64.getDecoder().decode(requestedTokenString), UTF_8); + } catch (IllegalArgumentException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + + final String[] requestedTokenElements = requestedTokenString.split("\\$"); + if (requestedTokenElements.length != 4) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + + try { + this.posToStartPage = Integer.parseInt(requestedTokenElements[0]); + this.creationTimeOfLastRespondedIndex = Long.parseLong(requestedTokenElements[1]); + this.queryStartTime = Long.parseLong(requestedTokenElements[2]); + this.nameOfLastRespondedIndex = requestedTokenElements[3]; + if (posToStartPage < 0 || creationTimeOfLastRespondedIndex < 0 || queryStartTime < 0) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } catch (NumberFormatException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } + + public IndexStrategyPageToken( + int indexNumberToStartPageFrom, + long creationTimeOfLastRespondedIndex, + long queryStartTime, + String nameOfLastRespondedIndex + ) { + Objects.requireNonNull(nameOfLastRespondedIndex, "index name should be provided"); + this.posToStartPage = indexNumberToStartPageFrom; + this.creationTimeOfLastRespondedIndex = creationTimeOfLastRespondedIndex; + this.queryStartTime = queryStartTime; + this.nameOfLastRespondedIndex = nameOfLastRespondedIndex; + } + + @Override + public String generateEncryptedToken() { + return Base64.getEncoder() + .encodeToString( + (posToStartPage + "$" + creationTimeOfLastRespondedIndex + "$" + queryStartTime + "$" + nameOfLastRespondedIndex) + .getBytes(UTF_8) + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/PageToken.java b/server/src/main/java/org/opensearch/rest/pagination/PageToken.java new file mode 100644 index 0000000000000..cca105694ad3a --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/pagination/PageToken.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.pagination; + +import org.opensearch.OpenSearchParseException; + +import java.util.Base64; +import java.util.Objects; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * To be implemented by tokens getting returned/generated by {@link PaginationStrategy}. + * + * @opensearch.internal + */ +public interface PageToken { + String INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE = + "Parameter [next_token] has been tainted and is incorrect. Please provide a valid [next_token]."; + + String generateEncryptedToken(); + + static String encryptStringToken(String tokenString) { + return Base64.getEncoder().encodeToString(tokenString.getBytes(UTF_8)); + } + + static String decryptStringToken(String encTokenString) { + Objects.requireNonNull(encTokenString, "encTokenString can not be null"); + try { + return new String(Base64.getDecoder().decode(encTokenString), UTF_8); + } catch (IllegalArgumentException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java b/server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java new file mode 100644 index 0000000000000..a34faa951dce6 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.pagination; + +import org.opensearch.cluster.ClusterState; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Interface to be implemented by any strategy getting used for paginating rest responses. + * + * @opensearch.internal + */ +public interface PaginationStrategy { + String DESCENDING_SORT_PARAM_VALUE = "descending"; + + /** + * + * @return Base64 encoded string, which can be used to fetch next page of response. + */ + PageToken getNextToken(); + + /** + * + * @return List of elements fetched corresponding to the store and token received by the strategy. + */ + List getElementsFromRequestedToken(); + + /** + * + * Utility method to get list of indices sorted by their creation time with latestValidIndexCreateTime being used to filter out the indices created after it. + */ + static List getListOfIndicesSortedByCreateTime( + final ClusterState clusterState, + String sortOrder, + final long latestValidIndexCreateTime + ) { + // Filter out the indices which have been created after the latest index which was present when paginated query started. + // Also, sort the indices list based on their creation timestamps + return clusterState.getRoutingTable() + .getIndicesRouting() + .keySet() + .stream() + .filter(index -> (latestValidIndexCreateTime - clusterState.metadata().indices().get(index).getCreationDate()) >= 0) + .sorted((index1, index2) -> { + Long index1CreationTimeStamp = clusterState.metadata().indices().get(index1).getCreationDate(); + Long index2CreationTimeStamp = clusterState.metadata().indices().get(index2).getCreationDate(); + if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) { + return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder) ? index2.compareTo(index1) : index1.compareTo(index2); + } + return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder) + ? Long.compare(index2CreationTimeStamp, index1CreationTimeStamp) + : Long.compare(index1CreationTimeStamp, index2CreationTimeStamp); + }) + .collect(Collectors.toList()); + } +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/ShardBasedPaginationStrategy.java b/server/src/main/java/org/opensearch/rest/pagination/ShardBasedPaginationStrategy.java new file mode 100644 index 0000000000000..fc7dedc09c4b4 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/pagination/ShardBasedPaginationStrategy.java @@ -0,0 +1,250 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.pagination; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.Nullable; +import org.opensearch.common.collect.Tuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This strategy can be used by the Rest APIs wanting to paginate the responses based on Indices. + * The strategy considers create timestamps of indices as the keys to iterate over pages. + * + * @opensearch.internal + */ +public class ShardBasedPaginationStrategy implements PaginationStrategy { + + private final ShardStrategyPageToken nextToken; + private final List indicesFromRequestedToken; + private final List shardRoutingsFromRequestedToken; + + public ShardBasedPaginationStrategy( + @Nullable ShardStrategyPageToken requestedToken, + int maxPageSize, + String sortOrder, + ClusterState clusterState + ) { + // Get sorted list of indices from metadata and filter out the required number of indices + List sortedIndicesList = PaginationStrategy.getListOfIndicesSortedByCreateTime( + clusterState, + sortOrder, + requestedToken == null ? Long.MAX_VALUE : requestedToken.queryStartTime + ); + Tuple>, List> detailsFromRequestedToken = getDetailsFromRequestedToken( + requestedToken, + sortedIndicesList, + clusterState, + maxPageSize, + sortOrder + ); + int newPageEndIndexNumber = detailsFromRequestedToken.v1().v1(); + this.indicesFromRequestedToken = detailsFromRequestedToken.v1().v2(); + this.shardRoutingsFromRequestedToken = detailsFromRequestedToken.v2(); + long queryStartTime = requestedToken == null + ? DESCENDING_SORT_PARAM_VALUE.equals(sortOrder) + ? clusterState.metadata().indices().get(sortedIndicesList.get(0)).getCreationDate() + : clusterState.metadata().indices().get(sortedIndicesList.get(sortedIndicesList.size() - 1)).getCreationDate() + : requestedToken.queryStartTime; + + if (newPageEndIndexNumber >= sortedIndicesList.size() || newPageEndIndexNumber < 0) { + this.nextToken = null; + } else { + this.nextToken = new ShardStrategyPageToken( + shardRoutingsFromRequestedToken.get(shardRoutingsFromRequestedToken.size() - 1).id(), + newPageEndIndexNumber, + clusterState.metadata().indices().get(sortedIndicesList.get(newPageEndIndexNumber - 1)).getCreationDate(), + queryStartTime, + sortedIndicesList.get(newPageEndIndexNumber - 1) + ); + } + } + + /** + * + * Used to extract out lastProcessedIndexNumber, indicesToBeQueried, shardRoutingResponseList + */ + private Tuple>, List> getDetailsFromRequestedToken( + final ShardStrategyPageToken requestedPageToken, + final List sortedIndicesList, + final ClusterState clusterState, + final int maxPageSize, + final String sortOrder + ) { + final List shardRoutingResponseList = new ArrayList<>(); + final List indicesToBeQueried = new ArrayList<>(); + + // Since all the shards corresponding to the last processed index might not have been included in the last page, + // start iterating from the last index number itself + int newPageStartIndexNumber = requestedPageToken == null ? 0 : requestedPageToken.indexPosToStartPage; + // Since all the shards for last ID would have already been sent in the last response, + // start iterating from the next shard for current page + int newPageStartShardID = requestedPageToken == null ? 0 : requestedPageToken.shardNumToStartPage + 1; + if (newPageStartIndexNumber >= sortedIndicesList.size() + || (newPageStartIndexNumber > 0 + && !Objects.equals(sortedIndicesList.get(newPageStartIndexNumber), requestedPageToken.nameOfLastRespondedIndex))) { + // case denoting an already responded index has been deleted while the paginated queries are being executed + // find the index whose creation time is just after/before the index which was last responded + newPageStartIndexNumber = Math.min(newPageStartIndexNumber, sortedIndicesList.size() - 1); + while (newPageStartIndexNumber > -1) { + if (DESCENDING_SORT_PARAM_VALUE.equals(sortOrder)) { + if (newPageStartIndexNumber > 0 + && clusterState.metadata() + .indices() + .get(sortedIndicesList.get(newPageStartIndexNumber)) + .getCreationDate() > requestedPageToken.creationTimeOfLastRespondedIndex) { + break; + } + } else { + if (newPageStartIndexNumber > 0 + && clusterState.metadata() + .indices() + .get(sortedIndicesList.get(newPageStartIndexNumber)) + .getCreationDate() < requestedPageToken.creationTimeOfLastRespondedIndex) { + break; + } + } + newPageStartIndexNumber--; + } + + if (newPageStartIndexNumber < 0) { + return new Tuple<>(new Tuple<>(newPageStartIndexNumber, indicesToBeQueried), shardRoutingResponseList); + } + + newPageStartShardID = 0; + } + + // Get the number of shards upto the maxPageSize + long shardCountSoFar = 0L; + int indexNumberInSortedList = newPageStartIndexNumber; + for (; indexNumberInSortedList < sortedIndicesList.size(); indexNumberInSortedList++) { + String index = sortedIndicesList.get(indexNumberInSortedList); + Map indexShards = clusterState.getRoutingTable().getIndicesRouting().get(index).getShards(); + // If all the shards corresponding to the last index were already processed, move to the next Index + if (indexNumberInSortedList == newPageStartIndexNumber && (newPageStartShardID > indexShards.size() - 1)) { + newPageStartShardID = 0; + continue; + } + int lastProcessedShardNumberForCurrentIndex = -1; + int shardID = (indexNumberInSortedList == newPageStartIndexNumber) ? newPageStartShardID : 0; + for (; shardID < indexShards.size(); shardID++) { + shardCountSoFar += indexShards.get(shardID).shards().size(); + if (shardCountSoFar > maxPageSize) { + break; + } + shardRoutingResponseList.addAll(indexShards.get(shardID).shards()); + lastProcessedShardNumberForCurrentIndex = shardID; + } + + if (shardCountSoFar > maxPageSize) { + if (lastProcessedShardNumberForCurrentIndex != -1) { + indicesToBeQueried.add(index); + } + break; + } + indicesToBeQueried.add(index); + } + return new Tuple<>(new Tuple<>(indexNumberInSortedList, indicesToBeQueried), shardRoutingResponseList); + } + + @Override + @Nullable + public PageToken getNextToken() { + return nextToken; + } + + @Override + @Nullable + public List getElementsFromRequestedToken() { + return shardRoutingsFromRequestedToken; + } + + public List getIndicesFromRequestedToken() { + return indicesFromRequestedToken; + } + + /** + * Token to be used by {@link IndexBasedPaginationStrategy}. + * Token would like: IndexNumberToStartTheNextPageFrom + $ + CreationTimeOfLastRespondedIndex + $ + + * QueryStartTime + $ + NameOfLastRespondedIndex + */ + public static class ShardStrategyPageToken implements PageToken { + + private final int shardNumToStartPage; + private final int indexPosToStartPage; + private final long creationTimeOfLastRespondedIndex; + private final long queryStartTime; + private final String nameOfLastRespondedIndex; + + /** + * Will perform simple validations on token received in the request and initialize the data members. + * The token should be base64 encoded, and should contain the expected number of elements separated by "$". + * The timestamps should also be a valid long. + * + * @param requestedTokenString string denoting the encoded next token requested by the user + */ + public ShardStrategyPageToken(String requestedTokenString) { + String decryptedToken = PageToken.decryptStringToken(requestedTokenString); + final String[] decryptedTokenElements = decryptedToken.split("\\$"); + if (decryptedTokenElements.length != 5) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + try { + this.shardNumToStartPage = Integer.parseInt(decryptedTokenElements[0]); + this.indexPosToStartPage = Integer.parseInt(decryptedTokenElements[1]); + this.creationTimeOfLastRespondedIndex = Long.parseLong(decryptedTokenElements[2]); + this.queryStartTime = Long.parseLong(decryptedTokenElements[3]); + this.nameOfLastRespondedIndex = decryptedTokenElements[4]; + if (shardNumToStartPage < 0 || indexPosToStartPage < 0 || creationTimeOfLastRespondedIndex < 0 || queryStartTime < 0) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } catch (NumberFormatException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } + + public ShardStrategyPageToken( + int shardNumToStartPage, + int indexPositionToStartPageFrom, + long creationTimeOfLastRespondedIndex, + long queryStartTime, + String nameOfLastRespondedIndex + ) { + Objects.requireNonNull(nameOfLastRespondedIndex, "index name should be provided"); + this.shardNumToStartPage = shardNumToStartPage; + this.indexPosToStartPage = indexPositionToStartPageFrom; + this.creationTimeOfLastRespondedIndex = creationTimeOfLastRespondedIndex; + this.queryStartTime = queryStartTime; + this.nameOfLastRespondedIndex = nameOfLastRespondedIndex; + } + + @Override + public String generateEncryptedToken() { + return PageToken.encryptStringToken( + shardNumToStartPage + + "$" + + indexPosToStartPage + + "$" + + creationTimeOfLastRespondedIndex + + "$" + + queryStartTime + + "$" + + nameOfLastRespondedIndex + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/package-info.java b/server/src/main/java/org/opensearch/rest/pagination/package-info.java new file mode 100644 index 0000000000000..324b8a6c46f88 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/pagination/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Exposes utilities for Rest actions to paginate responses. + */ +package org.opensearch.rest.pagination;