diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java index c0a7db8460433..5377a5af883fb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -13,15 +13,13 @@ import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.engine.Segment; -import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -33,7 +31,7 @@ import java.util.Locale; import java.util.Map; -public class IndicesSegmentResponse extends BaseBroadcastResponse implements ChunkedToXContent { +public class IndicesSegmentResponse extends ChunkedBroadcastResponse { private final ShardSegments[] shards; @@ -79,72 +77,72 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Iterator toXContentChunked(ToXContent.Params outerParams) { - return Iterators.concat(Iterators.single(((builder, params) -> { - builder.startObject(); - RestActions.buildBroadcastShardsHeader(builder, params, this); - return builder.startObject(Fields.INDICES); - })), getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, params) -> { - builder.startObject(indexSegments.getIndex()); - - builder.startObject(Fields.SHARDS); - for (IndexShardSegments indexSegment : indexSegments) { - builder.startArray(Integer.toString(indexSegment.shardId().id())); - for (ShardSegments shardSegments : indexSegment) { - builder.startObject(); - - builder.startObject(Fields.ROUTING); - builder.field(Fields.STATE, shardSegments.getShardRouting().state()); - builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary()); - builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId()); - if (shardSegments.getShardRouting().relocatingNodeId() != null) { - builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId()); - } - builder.endObject(); - - builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted()); - builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch()); - - builder.startObject(Fields.SEGMENTS); - for (Segment segment : shardSegments) { - builder.startObject(segment.getName()); - builder.field(Fields.GENERATION, segment.getGeneration()); - builder.field(Fields.NUM_DOCS, segment.getNumDocs()); - builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs()); - builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize()); - if (builder.getRestApiVersion() == RestApiVersion.V_7) { - builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO); - } - builder.field(Fields.COMMITTED, segment.isCommitted()); - builder.field(Fields.SEARCH, segment.isSearch()); - if (segment.getVersion() != null) { - builder.field(Fields.VERSION, segment.getVersion()); - } - if (segment.isCompound() != null) { - builder.field(Fields.COMPOUND, segment.isCompound()); + protected Iterator customXContentChunks(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)), + getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> { + builder.startObject(indexSegments.getIndex()); + + builder.startObject(Fields.SHARDS); + for (IndexShardSegments indexSegment : indexSegments) { + builder.startArray(Integer.toString(indexSegment.shardId().id())); + for (ShardSegments shardSegments : indexSegment) { + builder.startObject(); + + builder.startObject(Fields.ROUTING); + builder.field(Fields.STATE, shardSegments.getShardRouting().state()); + builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary()); + builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId()); + if (shardSegments.getShardRouting().relocatingNodeId() != null) { + builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId()); } - if (segment.getMergeId() != null) { - builder.field(Fields.MERGE_ID, segment.getMergeId()); - } - if (segment.getSegmentSort() != null) { - toXContent(builder, segment.getSegmentSort()); - } - if (segment.attributes != null && segment.attributes.isEmpty() == false) { - builder.field("attributes", segment.attributes); + builder.endObject(); + + builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted()); + builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch()); + + builder.startObject(Fields.SEGMENTS); + for (Segment segment : shardSegments) { + builder.startObject(segment.getName()); + builder.field(Fields.GENERATION, segment.getGeneration()); + builder.field(Fields.NUM_DOCS, segment.getNumDocs()); + builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs()); + builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize()); + if (builder.getRestApiVersion() == RestApiVersion.V_7) { + builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO); + } + builder.field(Fields.COMMITTED, segment.isCommitted()); + builder.field(Fields.SEARCH, segment.isSearch()); + if (segment.getVersion() != null) { + builder.field(Fields.VERSION, segment.getVersion()); + } + if (segment.isCompound() != null) { + builder.field(Fields.COMPOUND, segment.isCompound()); + } + if (segment.getMergeId() != null) { + builder.field(Fields.MERGE_ID, segment.getMergeId()); + } + if (segment.getSegmentSort() != null) { + toXContent(builder, segment.getSegmentSort()); + } + if (segment.attributes != null && segment.attributes.isEmpty() == false) { + builder.field("attributes", segment.attributes); + } + builder.endObject(); } builder.endObject(); - } - builder.endObject(); - builder.endObject(); + builder.endObject(); + } + builder.endArray(); } - builder.endArray(); - } - builder.endObject(); + builder.endObject(); - builder.endObject(); - return builder; - }).iterator(), Iterators.single((builder, params) -> builder.endObject().endObject())); + builder.endObject(); + return builder; + }).iterator(), + Iterators.single((builder, p) -> builder.endObject()) + ); } private static void toXContent(XContentBuilder builder, Sort sort) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index 25c804a340a72..85c28d57820bc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -11,20 +11,23 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.Index; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -33,7 +36,7 @@ import static java.util.Collections.unmodifiableMap; -public class IndicesStatsResponse extends BroadcastResponse { +public class IndicesStatsResponse extends ChunkedBroadcastResponse { private final Map indexHealthMap; @@ -171,7 +174,7 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { + protected Iterator customXContentChunks(ToXContent.Params params) { final String level = params.param("level", "indices"); final boolean isLevelValid = "cluster".equalsIgnoreCase(level) || "indices".equalsIgnoreCase(level) @@ -179,22 +182,11 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t if (isLevelValid == false) { throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]"); } - - builder.startObject("_all"); - - builder.startObject("primaries"); - getPrimaries().toXContent(builder, params); - builder.endObject(); - - builder.startObject("total"); - getTotal().toXContent(builder, params); - builder.endObject(); - - builder.endObject(); - if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { - builder.startObject(Fields.INDICES); - for (IndexStats indexStats : getIndices().values()) { + return Iterators.concat(Iterators.single(((builder, p) -> { + commonStats(builder, p); + return builder.startObject(Fields.INDICES); + })), getIndices().values().stream().map(indexStats -> (builder, p) -> { builder.startObject(indexStats.getIndex()); builder.field("uuid", indexStats.getUuid()); if (indexStats.getHealth() != null) { @@ -204,11 +196,11 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT)); } builder.startObject("primaries"); - indexStats.getPrimaries().toXContent(builder, params); + indexStats.getPrimaries().toXContent(builder, p); builder.endObject(); builder.startObject("total"); - indexStats.getTotal().toXContent(builder, params); + indexStats.getTotal().toXContent(builder, p); builder.endObject(); if ("shards".equalsIgnoreCase(level)) { @@ -217,17 +209,34 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t builder.startArray(Integer.toString(indexShardStats.getShardId().id())); for (ShardStats shardStats : indexShardStats) { builder.startObject(); - shardStats.toXContent(builder, params); + shardStats.toXContent(builder, p); builder.endObject(); } builder.endArray(); } builder.endObject(); } - builder.endObject(); - } - builder.endObject(); + return builder.endObject(); + }).iterator(), Iterators.single((b, p) -> b.endObject())); } + return Iterators.single((b, p) -> { + commonStats(b, p); + return b; + }); + } + + private void commonStats(XContentBuilder builder, ToXContent.Params p) throws IOException { + builder.startObject("_all"); + + builder.startObject("primaries"); + getPrimaries().toXContent(builder, p); + builder.endObject(); + + builder.startObject("total"); + getTotal().toXContent(builder, p); + builder.endObject(); + + builder.endObject(); } static final class Fields { diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java new file mode 100644 index 0000000000000..d65879578b995 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.action.support.broadcast; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public abstract class ChunkedBroadcastResponse extends BaseBroadcastResponse implements ChunkedToXContent { + public ChunkedBroadcastResponse(StreamInput in) throws IOException { + super(in); + } + + public ChunkedBroadcastResponse( + int totalShards, + int successfulShards, + int failedShards, + List shardFailures + ) { + super(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + public final Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(Iterators.single((b, p) -> { + b.startObject(); + RestActions.buildBroadcastShardsHeader(b, p, this); + return b; + }), customXContentChunks(params), Iterators.single((builder, p) -> builder.endObject())); + } + + protected abstract Iterator customXContentChunks(ToXContent.Params params); +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java index 737c1ccc4cba6..74924e4bfaf7f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -19,7 +19,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction; import java.io.IOException; @@ -140,7 +140,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .indices() - .stats(indicesStatsRequest, new RestToXContentListener<>(channel)); + .stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java index 2c211a66c7b28..012b24330ed3b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java @@ -74,6 +74,6 @@ public void testSerializesOneChunkPerIndex() { iterator.next(); chunks++; } - assertEquals(indices + 2, chunks); + assertEquals(indices + 4, chunks); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java index e1873b7714bcb..cec44f8fd0636 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java @@ -13,13 +13,17 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -41,7 +45,7 @@ public void testInvalidLevel() { final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> response.toXContent(JsonXContent.contentBuilder(), params) + () -> response.toXContentChunked(params).next().toXContent(JsonXContent.contentBuilder(), params) ); assertThat( e, @@ -64,7 +68,7 @@ public void testGetIndices() { ShardId shId = new ShardId(index, shardId); Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId)); ShardPath shardPath = new ShardPath(false, path, path, shId); - ShardRouting routing = createShardRouting(index, shId, (shardId == 0)); + ShardRouting routing = createShardRouting(shId, (shardId == 0)); shards.add(new ShardStats(routing, shardPath, null, null, null, null)); AtomicLong primaryShardsCounter = expectedIndexToPrimaryShardsCount.computeIfAbsent( index.getName(), @@ -105,7 +109,45 @@ public void testGetIndices() { } } - private ShardRouting createShardRouting(Index index, ShardId shardId, boolean isPrimary) { + public void testChunkedEncodingPerIndex() throws IOException { + final int shards = randomIntBetween(1, 10); + final List stats = new ArrayList<>(shards); + for (int i = 0; i < shards; i++) { + ShardId shId = new ShardId(createIndex("index-" + i), randomIntBetween(0, 1)); + Path path = createTempDir().resolve("indices").resolve(shId.getIndex().getUUID()).resolve(String.valueOf(shId.id())); + ShardPath shardPath = new ShardPath(false, path, path, shId); + ShardRouting routing = createShardRouting(shId, (shId.id() == 0)); + stats.add(new ShardStats(routing, shardPath, new CommonStats(), null, null, null)); + } + final IndicesStatsResponse indicesStatsResponse = new IndicesStatsResponse( + stats.toArray(new ShardStats[0]), + shards, + shards, + 0, + null, + ClusterState.EMPTY_STATE + ); + final ToXContent.Params paramsClusterLevel = new ToXContent.MapParams(Map.of("level", "cluster")); + final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(paramsClusterLevel); + int chunksSeenClusterLevel = 0; + final XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), Streams.NULL_OUTPUT_STREAM); + while (iteratorClusterLevel.hasNext()) { + iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel); + chunksSeenClusterLevel++; + } + assertEquals(3, chunksSeenClusterLevel); + + final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices")); + final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(paramsIndexLevel); + int chunksSeenIndexLevel = 0; + while (iteratorIndexLevel.hasNext()) { + iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel); + chunksSeenIndexLevel++; + } + assertEquals(4 + shards, chunksSeenIndexLevel); + } + + private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); }