Skip to content

Commit

Permalink
Rename streamContent/Separator to bulkContent/Separator (elastic#111716)
Browse files Browse the repository at this point in the history
Rename `xContent.streamSeparator()` and
`RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and
`RestHandler.supportsBulkContent()`.

I want to reserve use of "supportsStreamContent" for current work in
HTTP layer to [support incremental content
handling](elastic#111438) besides
fully aggregated byte buffers. `supportsStreamContent` would indicate
that handler can parse chunks of http content as they arrive.
  • Loading branch information
mhl-b authored Aug 8, 2024
1 parent ea3379e commit 1163d2e
Show file tree
Hide file tree
Showing 29 changed files with 57 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public XContentType type() {
}

@Override
public byte streamSeparator() {
throw new XContentParseException("cbor does not support stream parsing...");
public byte bulkSeparator() {
throw new XContentParseException("cbor does not support bulk parsing...");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public XContentType type() {
}

@Override
public byte streamSeparator() {
public byte bulkSeparator() {
return '\n';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public XContentType type() {
}

@Override
public byte streamSeparator() {
public byte bulkSeparator() {
return (byte) 0xFF;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public XContentType type() {
}

@Override
public byte streamSeparator() {
throw new UnsupportedOperationException("yaml does not support stream parsing...");
public byte bulkSeparator() {
throw new UnsupportedOperationException("yaml does not support bulk parsing...");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface XContent {
*/
XContentType type();

byte streamSeparator();
byte bulkSeparator();

@Deprecated
boolean detectContent(byte[] bytes, int offset, int length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ public static byte[] writeMultiLineFormat(MultiSearchTemplateRequest multiSearch
MultiSearchRequest.writeSearchRequestParams(searchRequest, xContentBuilder);
BytesReference.bytes(xContentBuilder).writeTo(output);
}
output.write(xContent.streamSeparator());
output.write(xContent.bulkSeparator());
try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) {
templateRequest.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
BytesReference.bytes(xContentBuilder).writeTo(output);
}
output.write(xContent.streamSeparator());
output.write(xContent.bulkSeparator());
}
return output.toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static MultiSearchTemplateRequest parseRequest(RestRequest restRequest, b
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void parse(
XContent xContent = xContentType.xContent();
int line = 0;
int from = 0;
byte marker = xContent.streamSeparator();
byte marker = xContent.bulkSeparator();
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public static void readMultiLineFormat(
TriFunction<String, Object, SearchRequest, Boolean> extraParamParser
) throws IOException {
int from = 0;
byte marker = xContent.streamSeparator();
byte marker = xContent.bulkSeparator();
while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
Expand Down Expand Up @@ -343,7 +343,7 @@ public static byte[] writeMultiLineFormat(MultiSearchRequest multiSearchRequest,
writeSearchRequestParams(request, xContentBuilder);
BytesReference.bytes(xContentBuilder).writeTo(output);
}
output.write(xContent.streamSeparator());
output.write(xContent.bulkSeparator());
try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) {
if (request.source() != null) {
request.source().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
Expand All @@ -353,7 +353,7 @@ public static byte[] writeMultiLineFormat(MultiSearchRequest multiSearchRequest,
}
BytesReference.bytes(xContentBuilder).writeTo(output);
}
output.write(xContent.streamSeparator());
output.write(xContent.bulkSeparator());
}
return output.toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public boolean allowsUnsafeBuffers() {
}

@Override
public boolean supportsContentStream() {
return delegate.supportsContentStream();
public boolean supportsBulkContent() {
return delegate.supportsBulkContent();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ private void dispatchRequest(
}
final XContentType xContentType = request.getXContentType();
// TODO consider refactoring to handler.supportsContentStream(xContentType). It is only used with JSON and SMILE
if (handler.supportsContentStream()
if (handler.supportsBulkContent()
&& XContentType.JSON != xContentType.canonical()
&& XContentType.SMILE != xContentType.canonical()) {
channel.sendResponse(
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ default boolean canTripCircuitBreaker() {
}

/**
* Indicates if the RestHandler supports content as a stream. A stream would be multiple objects delineated by
* {@link XContent#streamSeparator()}. If a handler returns true this will affect the types of content that can be sent to
* this endpoint.
* Indicates if the RestHandler supports bulk content. A bulk request contains multiple objects
* delineated by {@link XContent#bulkSeparator()}. If a handler returns true this will affect
* the types of content that can be sent to this endpoint.
*/
default boolean supportsContentStream() {
default boolean supportsBulkContent() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public static void parseMultiLineRequest(
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ public void testSmileIsSupported() throws IOException {
builder.endObject();
builder.endObject();
}
out.write(xContentType.xContent().streamSeparator());
out.write(xContentType.xContent().bulkSeparator());
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, out)) {
builder.startObject();
builder.field("field", "value");
builder.endObject();
}
out.write(xContentType.xContent().streamSeparator());
out.write(xContentType.xContent().bulkSeparator());
data = out.bytes();
}

Expand Down Expand Up @@ -327,7 +327,7 @@ public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException
builder.endObject();
builder.endObject();
}
out.write(xContentType.xContent().streamSeparator());
out.write(xContentType.xContent().bulkSeparator());
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, out)) {
builder.startObject();
builder.startObject("doc").endObject();
Expand All @@ -338,7 +338,7 @@ public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException
builder.field("upsert", values);
builder.endObject();
}
out.write(xContentType.xContent().streamSeparator());
out.write(xContentType.xContent().bulkSeparator());
data = out.bytes();
}
BulkRequest bulkRequest = new BulkRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,17 @@ public void testInvalidHeaderValueEmpty() {
expectThrows(IllegalArgumentException.class, () -> DeprecationRestHandler.requireValidHeader(blank));
}

public void testSupportsContentStreamTrue() {
when(handler.supportsContentStream()).thenReturn(true);
public void testSupportsBulkContentTrue() {
when(handler.supportsBulkContent()).thenReturn(true);
assertTrue(
new DeprecationRestHandler(handler, METHOD, PATH, null, deprecationMessage, deprecationLogger, false).supportsContentStream()
new DeprecationRestHandler(handler, METHOD, PATH, null, deprecationMessage, deprecationLogger, false).supportsBulkContent()
);
}

public void testSupportsContentStreamFalse() {
when(handler.supportsContentStream()).thenReturn(false);
public void testSupportsBulkContentFalse() {
when(handler.supportsBulkContent()).thenReturn(false);
assertFalse(
new DeprecationRestHandler(handler, METHOD, PATH, null, deprecationMessage, deprecationLogger, false).supportsContentStream()
new DeprecationRestHandler(handler, METHOD, PATH, null, deprecationMessage, deprecationLogger, false).supportsBulkContent()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
});
Expand Down Expand Up @@ -637,7 +637,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
});
Expand All @@ -662,7 +662,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
});
Expand All @@ -688,7 +688,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
});
Expand All @@ -713,7 +713,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private HttpEntity createEntity(List<Map<String, Object>> bodies, Map<String, St
for (int i = bytesRef.offset; i < bytesRef.length; i++) {
bytes[position++] = bytesRef.bytes[i];
}
bytes[position++] = xContentType.xContent().streamSeparator();
bytes[position++] = xContentType.xContent().bulkSeparator();
}
return new ByteArrayEntity(bytes, ContentType.create(xContentType.mediaTypeWithoutParameters(), StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void writeJsonXContent(CategorizationAnalyzer categorizationAnalyzer, In

private void writeSmileXContent(CategorizationAnalyzer categorizationAnalyzer, InputStream inputStream) throws IOException {
while (true) {
byte[] nextObject = findNextObject(XContentType.SMILE.xContent().streamSeparator(), inputStream);
byte[] nextObject = findNextObject(XContentType.SMILE.xContent().bulkSeparator(), inputStream);
if (nextObject.length == 0) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void process() throws IOException {
}

private BytesReference parseResults(XContent xContent, BytesReference bytesRef) throws IOException {
byte marker = xContent.streamSeparator();
byte marker = xContent.bulkSeparator();
int from = 0;
while (true) {
int nextMarker = findNextMarker(marker, bytesRef, from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public String getErrors() {
}

private BytesReference parseMessages(XContent xContent, BytesReference bytesRef) {
byte marker = xContent.streamSeparator();
byte marker = xContent.bulkSeparator();
int from = 0;
while (true) {
int nextMarker = findNextMarker(marker, bytesRef, from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void testWrite_Smile() throws Exception {
xsonGen.writeStringField("value", "1.0");
xsonGen.writeEndObject();
xsonGen.close();
xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator());
xsonOs.writeByte(XContentType.SMILE.xContent().bulkSeparator());

xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs);
xsonGen.writeStartObject();
Expand All @@ -382,7 +382,7 @@ public void testWrite_Smile() throws Exception {
xsonGen.writeStringField("value", "2.0");
xsonGen.writeEndObject();
xsonGen.flush();
xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator());
xsonOs.writeByte(XContentType.SMILE.xContent().bulkSeparator());

InputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(xsonOs.bytes()));
JsonDataToProcessWriter writer = createWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOExcepti
}

// Adds action metadata line bulk separator
out.write(xContent.streamSeparator());
out.write(xContent.bulkSeparator());

// Adds the source of the monitoring document
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
doc.toXContent(builder, ToXContent.EMPTY_PARAMS);
}

// Adds final bulk separator
out.write(xContent.streamSeparator());
out.write(xContent.bulkSeparator());

logger.trace("http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]", name, index, id, doc.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
public boolean supportsContentStream() {
public boolean supportsBulkContent() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ public void testAddRequestContent() throws IOException {
builder.endObject();

builder.flush();
content.write(xContentType.xContent().streamSeparator());
content.write(xContentType.xContent().bulkSeparator());

sources[i] = RandomObjects.randomSource(random(), xContentType);
BytesRef bytes = sources[i].toBytesRef();
content.write(bytes.bytes, bytes.offset, bytes.length);

content.write(xContentType.xContent().streamSeparator());
content.write(xContentType.xContent().bulkSeparator());
}

content.write(xContentType.xContent().streamSeparator());
content.write(xContentType.xContent().bulkSeparator());
}

final MonitoredSystem system = randomFrom(MonitoredSystem.values());
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testAddRequestContentWithEmptySource() throws IOException {
final int totalDocs = nbDocs + nbEmptyDocs;

final XContentType xContentType = XContentType.JSON;
final byte separator = xContentType.xContent().streamSeparator();
final byte separator = xContentType.xContent().bulkSeparator();

final BytesStreamOutput content = new BytesStreamOutput();
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, content)) {
Expand Down Expand Up @@ -192,7 +192,7 @@ public void testAddRequestContentWithUnrecognizedIndexName() throws IOException
final String indexName = randomAlphaOfLength(10);

final XContentType xContentType = XContentType.JSON;
final byte separator = xContentType.xContent().streamSeparator();
final byte separator = xContentType.xContent().bulkSeparator();

final BytesStreamOutput content = new BytesStreamOutput();
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, content)) {
Expand Down
Loading

0 comments on commit 1163d2e

Please sign in to comment.