From 3f8f298c7a765754512e736c6d9caed24f0d69d4 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 9 Sep 2024 09:18:30 -0400 Subject: [PATCH] Fix flaky org.opensearch.rest.ReactorNetty4StreamingStressIT.testCloseClientStreamingRequest test case Signed-off-by: Andriy Redko --- .../rest/ReactorNetty4StreamingStressIT.java | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java index a978af1b11db4..9da456f618ffc 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java @@ -16,24 +16,20 @@ import org.opensearch.test.rest.OpenSearchRestTestCase; import org.junit.After; +import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import reactor.core.publisher.Flux; -import reactor.test.subscriber.TestSubscriber; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; -import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.collection.IsEmptyCollection.empty; public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase { @After @@ -49,6 +45,8 @@ public void tearDown() throws Exception { } public void testCloseClientStreamingRequest() throws Exception { + final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); + final AtomicInteger id = new AtomicInteger(0); final Stream stream = Stream.generate( () -> "{ \"index\": { \"_index\": \"test-stress-streaming\", \"_id\": \"" @@ -57,39 +55,28 @@ public void testCloseClientStreamingRequest() throws Exception { + "{ \"name\": \"josh\" }\n" ); + final Duration delay = Duration.ofMillis(1); final StreamingRequest streamingRequest = new StreamingRequest<>( "POST", "/_bulk/stream", - Flux.fromStream(stream).delayElements(Duration.ofMillis(500)).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + Flux.fromStream(stream).delayElements(delay, scheduler).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) ); streamingRequest.addParameter("refresh", "true"); final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); - TestSubscriber subscriber = TestSubscriber.create(); - streamingResponse.getBody().subscribe(subscriber); - - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - try { - // Await for subscriber to receive at least one chunk - assertBusy(() -> assertThat(subscriber.getReceivedOnNext(), not(empty()))); - - // Close client forceably - executor.schedule(() -> { - client().close(); - return null; - }, 2, TimeUnit.SECONDS); + scheduler.advanceTimeBy(delay); /* emit first element */ - // Await for subscriber to terminate - subscriber.block(Duration.ofSeconds(10)); - assertThat( - subscriber.expectTerminalError(), - anyOf(instanceOf(InterruptedIOException.class), instanceOf(ConnectionClosedException.class)) - ); - } finally { - executor.shutdown(); - if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) { - executor.shutdownNow(); - } - } + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) + .then(() -> { + try { + client().close(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }) + .then(() -> scheduler.advanceTimeBy(delay)) + .expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException) + .verify(); } }