diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bf90a75d0148..58521e1778230 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix warnings from SLF4J on startup when repository-s3 is installed ([#16194](https://github.com/opensearch-project/OpenSearch/pull/16194)) - Fix protobuf-java leak through client library dependencies ([#16254](https://github.com/opensearch-project/OpenSearch/pull/16254)) - Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265)) +- [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures [#16337](https://github.com/opensearch-project/OpenSearch/pull/16337)) ### Security diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 4bb6f3af7fbe4..5740c124910b9 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -34,7 +34,7 @@ netty = 4.1.114.Final joda = 2.12.7 # project reactor -reactor_netty = 1.1.22 +reactor_netty = 1.1.23 reactor = 3.5.20 # client dependencies diff --git a/plugins/repository-azure/licenses/reactor-netty-core-1.1.22.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-core-1.1.22.jar.sha1 deleted file mode 100644 index cc894568c5760..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-core-1.1.22.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -08356b59b29f86e7142c9daca0434653a64ae64b \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-core-1.1.23.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-core-1.1.23.jar.sha1 new file mode 100644 index 0000000000000..8f56bb5165fa3 --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-core-1.1.23.jar.sha1 @@ -0,0 +1 @@ +a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.22.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.22.jar.sha1 deleted file mode 100644 index 2402813f831ce..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-http-1.1.22.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -2faf64b3822b0512f15d72a325e2826eb8564413 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.23.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.23.jar.sha1 new file mode 100644 index 0000000000000..5bb3136f99e93 --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-http-1.1.23.jar.sha1 @@ -0,0 +1 @@ +94b294fa90aee2e88ad4337251e278aaac21362c \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.22.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.22.jar.sha1 deleted file mode 100644 index cc894568c5760..0000000000000 --- a/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.22.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -08356b59b29f86e7142c9daca0434653a64ae64b \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.23.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.23.jar.sha1 new file mode 100644 index 0000000000000..8f56bb5165fa3 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/reactor-netty-core-1.1.23.jar.sha1 @@ -0,0 +1 @@ +a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.22.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.22.jar.sha1 deleted file mode 100644 index 2402813f831ce..0000000000000 --- a/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.22.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -2faf64b3822b0512f15d72a325e2826eb8564413 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.23.jar.sha1 b/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.23.jar.sha1 new file mode 100644 index 0000000000000..5bb3136f99e93 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/reactor-netty-http-1.1.23.jar.sha1 @@ -0,0 +1 @@ +94b294fa90aee2e88ad4337251e278aaac21362c \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index 1b60023da0329..35515a8d95d67 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -304,7 +304,34 @@ public void testStreamingLargeDocument() throws IOException { String.format( Locale.getDefault(), "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n", - randomAlphaOfLength(5000) + randomAlphaOfLength(7000) + ) + ); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + } + + public void testStreamingLargeDocumentThatExceedsChunkSize() throws IOException { + final Stream stream = Stream.of( + String.format( + Locale.getDefault(), + "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n", + randomAlphaOfLength(9000) /* the default chunk size limit is set 8k */ ) ); diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 7f4a8f6cdef02..3dcee4e8ec045 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -230,6 +230,7 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti spec -> spec.maxChunkSize(maxChunkSize.bytesAsInt()) .maxHeaderSize(maxHeaderSize.bytesAsInt()) .maxInitialLineLength(maxInitialLineLength.bytesAsInt()) + .allowPartialChunks(false) ) .handle((req, res) -> incomingRequest(req, res)) );