From eaf54b77f54e22696abdfd40838a17f1ce757e6f Mon Sep 17 00:00:00 2001 From: jayjlu <107919366+jayjlu@users.noreply.github.com> Date: Mon, 9 Oct 2023 13:01:34 +0800 Subject: [PATCH] improving v1 connector socket close, and improving test cases for v3 coverage (#8) --- pom.xml | 6 +- .../cranker/connector/ConnectorSocket.java | 3 +- .../cranker/connector/BaseEndToEndTest.java | 98 ++++--- .../connector/ConcurrentUploadTest.java | 98 +++++++ .../cranker/connector/ConnectRetryTest.java | 46 ++-- .../CrankerConnectorBuilderTest.java | 123 +++++++++ .../connector/CrankerConnectorHttp2Test.java | 21 +- .../CrankerConnectorListenerTest.java | 129 ++++++++++ .../connector/CrankerConnectorStopTest.java | 239 +++++++++++++++--- .../connector/CrankerConnectorTest.java | 86 +++++-- .../cranker/connector/DualRouterTest.java | 143 ++++++++--- .../cranker/connector/RouterSupplierTest.java | 191 ++++++++++---- .../connector/ServerSentEventTest.java | 27 +- .../connector => manual}/ManualTest.java | 8 +- src/test/java/manual/RunLocalConnector.java | 55 ++++ 15 files changed, 1047 insertions(+), 226 deletions(-) create mode 100644 src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java create mode 100644 src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java create mode 100644 src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java rename src/test/java/{com/hsbc/cranker/connector => manual}/ManualTest.java (96%) create mode 100644 src/test/java/manual/RunLocalConnector.java diff --git a/pom.xml b/pom.xml index e3d1594..488bf51 100644 --- a/pom.xml +++ b/pom.xml @@ -54,19 +54,19 @@ org.slf4j slf4j-simple - 2.0.7 + 2.0.9 test com.hsbc.cranker mu-cranker-router - 1.0.2 + 1.0.3 test io.muserver mu-server - 0.74.1 + 0.74.2 test diff --git a/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java b/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java index 65134a2..df5ad95 100644 --- a/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java +++ b/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java @@ -285,7 +285,8 @@ public void close(State newState, int statusCode, Throwable error) { if (webSocket != null && !webSocket.isOutputClosed()) { webSocket.sendClose(statusCode, error != null ? error.getMessage() : ""); } - if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled()) { + if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled() + && statusCode != WebSocket.NORMAL_CLOSURE) { responseFuture.cancel(true); } if (responseBodySubscription != null) { diff --git a/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java b/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java index 95f9a10..49cd021 100644 --- a/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java +++ b/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java @@ -1,27 +1,36 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; +import com.hsbc.cranker.mucranker.CrankerRouterBuilder; import io.muserver.MuServer; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepetitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.hsbc.cranker.mucranker.CrankerRouter; -import com.hsbc.cranker.mucranker.CrankerRouterBuilder; +import scaffolding.AssertUtils; import java.net.URI; import java.net.http.HttpClient; +import java.util.ArrayList; import java.util.List; import java.util.stream.Stream; +import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_1; +import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_3; import static io.muserver.MuServerBuilder.muServer; import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static scaffolding.AssertUtils.assertEventually; public class BaseEndToEndTest { - private static final Logger log = LoggerFactory.getLogger(BaseEndToEndTest.class); - protected final HttpClient testClient = HttpUtils.createHttpClientBuilder(true).build(); - protected CrankerRouter crankerRouter = CrankerRouterBuilder.crankerRouter().start(); + protected CrankerRouter crankerRouter = CrankerRouterBuilder + .crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); protected MuServer registrationServer = startRegistrationServer(0); protected MuServer crankerServer = startCrankerServer(0); @@ -38,45 +47,64 @@ protected static URI registrationUri(URI routerUri) { } public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter, - String targetServiceName, - MuServer target, - int slidingWindowSize, - MuServer... registrationRouters) { - CrankerConnector connector = startConnector(targetServiceName, target, slidingWindowSize, registrationRouters); - waitForRegistration(targetServiceName, 2, crankerRouter); + String targetServiceName, + MuServer target, + int slidingWindowSize, + MuServer... registrationRouters) { + return startConnectorAndWaitForRegistration(crankerRouter, targetServiceName, target, List.of(CRANKER_PROTOCOL_1), slidingWindowSize, registrationRouters); + } + + public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter, + String targetServiceName, + MuServer target, + List preferredProtocols, + int slidingWindowSize, + MuServer... registrationRouters) { + CrankerConnector connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withTarget(target.uri()) + .withRoute(targetServiceName) + .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(registrationRouters) + .map(s -> registrationUri(s.uri())) + .collect(toList()))) + .withSlidingWindowSize(slidingWindowSize) + .start(); + + waitForRegistration(targetServiceName, connector.connectorId(), 2, crankerRouter); + + assertEventually( + () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(), + equalTo(preferredProtocols.get(0))); + return connector; } - public static void waitForRegistration(String targetServiceName, int slidingWindow, CrankerRouter... crankerRouters) { - int attempts = 0; + public static void waitForRegistration(String targetServiceName, String connectorInstanceId, int slidingWindow, CrankerRouter... crankerRouters) { final String serviceKey = targetServiceName.isEmpty() ? "*" : targetServiceName; for (CrankerRouter crankerRouter : crankerRouters) { - while (crankerRouter.collectInfo().services() + AssertUtils.assertEventually(() -> crankerRouter.collectInfo().services() + .stream() + .anyMatch(service -> service.route().equals(serviceKey) + && !service.connectors().isEmpty() + && service.connectors() .stream() - .noneMatch(service -> service.route().equals(serviceKey) - && service.connectors().size() > 0 - && service.connectors().get(0).connections().size() >= slidingWindow)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (attempts++ == 100) throw new RuntimeException("Failed to register " + targetServiceName); - } + .anyMatch(connector -> connector.connectorInstanceID().equals(connectorInstanceId) + && connector.connections().size() >= slidingWindow) + ), is(true)); } } - public static CrankerConnector startConnector(String targetServiceName, MuServer target, int slidingWindowSize, MuServer... registrationRouters) { - List uris = Stream.of(registrationRouters) - .map(s -> registrationUri(s.uri())) - .collect(toList()); - return CrankerConnectorBuilder.connector() - .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) - .withTarget(target.uri()) - .withRoute(targetServiceName) - .withRouterUris(RegistrationUriSuppliers.fixedUris(uris)) - .withSlidingWindowSize(slidingWindowSize) - .start(); + public static List preferredProtocols(RepetitionInfo repetitionInfo) { + final int currentRepetition = repetitionInfo.getCurrentRepetition(); + switch (currentRepetition) { + case 1: + return List.of(CRANKER_PROTOCOL_1); + case 2: + return List.of(CRANKER_PROTOCOL_3); + default: + return List.of(CRANKER_PROTOCOL_3, CRANKER_PROTOCOL_1); + } } @AfterEach diff --git a/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java b/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java new file mode 100644 index 0000000..1e84353 --- /dev/null +++ b/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java @@ -0,0 +1,98 @@ +package com.hsbc.cranker.connector; + +import io.muserver.MuHandler; +import io.muserver.MuServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.muserver.MuServerBuilder.httpServer; +import static org.junit.jupiter.api.Assertions.*; + +public class ConcurrentUploadTest extends BaseEndToEndTest { + + private static final Logger log = LoggerFactory.getLogger(ConcurrentUploadTest.class); + + private volatile MuHandler handler = (request, response) -> false; + + protected MuServer targetServer = httpServer() + .addHandler((request, response) -> handler.handle(request, response)) + .start(); + + private CrankerConnector connector; + + @BeforeEach + void setUp(RepetitionInfo repetitionInfo) { + connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri()))) + .withRoute("*") + .withTarget(targetServer.uri()) + .withProxyEventListener(new ProxyEventListener() { + @Override + public void onProxyError(HttpRequest request, Throwable error) { + log.warn("onProxyError, request=" + request, error); + } + }) + .withComponentName("cranker-connector-unit-test") + .start(); + + waitForRegistration("*", connector.connectorId(),2, crankerRouter); + } + + @AfterEach + public void stop() throws Exception { + if (connector != null) assertTrue(connector.stop(10, TimeUnit.SECONDS)); + if (targetServer != null) targetServer.stop(); + } + + @RepeatedTest(3) + public void postLargeBody() throws InterruptedException { + + handler = (request, response) -> { + response.status(200); + response.write(request.readBodyAsString()); + return true; + }; + + Queue> responses = new ConcurrentLinkedQueue<>(); + CountDownLatch countDownLatch = new CountDownLatch(10); + + final String body = "c".repeat(10 * 1000); + for(int i = 0; i < 10; i++) { + final int finalI = i; + new Thread(() -> { + try { + HttpResponse resp = testClient.send(HttpRequest.newBuilder() + .method("POST", HttpRequest.BodyPublishers.ofString(body)) + .uri(crankerServer.uri().resolve("/?task=" + finalI)) + .build(), HttpResponse.BodyHandlers.ofString()); + responses.add(resp); + countDownLatch.countDown(); + } catch (Exception e) { + log.error("Concurrent request error", e); + responses.add(null); + } + }).start(); + } + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertEquals(10, responses.size()); + for (HttpResponse response: responses) { + assertNotNull(response); + assertEquals(200, response.statusCode()); + assertEquals(body, response.body()); + } + } +} diff --git a/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java b/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java index 6df1e75..03affbe 100644 --- a/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java +++ b/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java @@ -1,22 +1,24 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; import io.muserver.Http2ConfigBuilder; import io.muserver.MuServer; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import com.hsbc.cranker.mucranker.CrankerRouter; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration; -import static com.hsbc.cranker.connector.BaseEndToEndTest.waitForRegistration; +import static com.hsbc.cranker.connector.BaseEndToEndTest.*; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; import static io.muserver.MuServerBuilder.httpServer; import static io.muserver.MuServerBuilder.httpsServer; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,7 +28,6 @@ import static scaffolding.Action.swallowException; import static scaffolding.AssertUtils.assertEventually; import static scaffolding.StringUtils.randomAsciiStringOfLength; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; public class ConnectRetryTest { @@ -43,8 +44,8 @@ public void after() { if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS)); } - @Test - public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() throws Exception { + @RepeatedTest(3) + public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain(RepetitionInfo repetitionInfo) throws Exception { this.targetServer = httpServer() .addHandler((request, response) -> { @@ -54,7 +55,7 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(400) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.router = httpsServer() .addHandler(crankerRouter.createRegistrationHandler()) @@ -62,7 +63,7 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols(repetitionInfo), 2, router); String body = randomAsciiStringOfLength(100); HttpResponse resp = testClient.send(HttpRequest.newBuilder() @@ -80,7 +81,9 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr assertThat(connector.routers().get(0).currentUnsuccessfulConnectionAttempts(), greaterThan(0)); - this.crankerRouter = crankerRouter().withConnectorMaxWaitInMillis(4000).start(); + this.crankerRouter = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); this.router = httpsServer() .withHttpsPort(originalPort) .addHandler(crankerRouter.createRegistrationHandler()) @@ -101,20 +104,22 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr } - @Test - void connectionErrorListenerIsTriggeredOnWssConnectionFailed() throws InterruptedException, IOException { + @RepeatedTest(3) + void connectionErrorListenerIsTriggeredOnWssConnectionFailed(RepetitionInfo repetitionInfo) throws InterruptedException, IOException { AtomicInteger exceptionCount = new AtomicInteger(0); int slidingWindow = 2; this.targetServer = httpServer() .addHandler((request, response) -> { - response.write(request.readBodyAsString()); + final String text = request.readBodyAsString(); + response.write(text); return true; }) .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000).start(); + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); this.router = httpsServer() .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) @@ -123,6 +128,7 @@ void connectionErrorListenerIsTriggeredOnWssConnectionFailed() throws Interrupte .start(); this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) .withTarget(targetServer.uri()) .withRoute("*") @@ -137,11 +143,11 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti }) .start(); - waitForRegistration("*", slidingWindow, crankerRouter); + waitForRegistration("*", connector.connectorId(), slidingWindow, crankerRouter); assertThat(exceptionCount.get(), equalTo(0)); - verifyHttpRequestWroking(); + verifyHttpRequestWorking(); int originalPort = router.uri().getPort(); crankerRouter.stop(); @@ -152,7 +158,7 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti assertEventually(exceptionCount::get, greaterThan(0)); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.router = httpsServer() .withHttpsPort(originalPort) @@ -161,12 +167,12 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti .addHandler(crankerRouter.createHttpHandler()) .start(); - waitForRegistration("*", slidingWindow, crankerRouter); + waitForRegistration("*", connector.connectorId(), slidingWindow, crankerRouter); - verifyHttpRequestWroking(); + verifyHttpRequestWorking(); } - private void verifyHttpRequestWroking() throws IOException, InterruptedException { + private void verifyHttpRequestWorking() throws IOException, InterruptedException { String body = "hello"; HttpResponse resp = testClient.send(HttpRequest.newBuilder() .method("POST", HttpRequest.BodyPublishers.ofString(body)) diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java new file mode 100644 index 0000000..c4f213b --- /dev/null +++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java @@ -0,0 +1,123 @@ +package com.hsbc.cranker.connector; + +import com.hsbc.cranker.mucranker.CrankerRouter; +import io.muserver.ContentTypes; +import io.muserver.Http2ConfigBuilder; +import io.muserver.Method; +import io.muserver.MuServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols; +import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; +import static io.muserver.MuServerBuilder.httpsServer; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static scaffolding.Action.swallowException; + +class CrankerConnectorBuilderTest { + + private final HttpClient http2Client = HttpUtils.createHttpClientBuilder(true) + .version(HttpClient.Version.HTTP_2) + .build(); + + private final HttpClient http1Client = HttpUtils.createHttpClientBuilder(true) + .version(HttpClient.Version.HTTP_1_1) + .build(); + + private CrankerRouter crankerRouter; + private MuServer targetServer; + private MuServer crankerServer; + private CrankerConnector connector; + + @AfterEach + public void after() { + if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS)); + if (targetServer != null) swallowException(targetServer::stop); + if (crankerServer != null) swallowException(crankerServer::stop); + if (crankerRouter != null) swallowException(crankerRouter::stop); + } + + @RepeatedTest(3) + void allowSlash() { + CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello").build(); + CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello_123").build(); + CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello-123").build(); + CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello/123").build(); + } + + @RepeatedTest(3) + void testMaxHeadersSize_normal(RepetitionInfo repetitionInfo) throws IOException, InterruptedException { + + setupServerForMaxHeaders(40000, preferredProtocols(repetitionInfo)); + + final String bigHeader = "b".repeat(18000); + + HttpResponse resp1 = http1Client.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/test")) + .header("big-header", bigHeader) + .build(), HttpResponse.BodyHandlers.ofString()); + assertEquals(200, resp1.statusCode()); + assertEquals(bigHeader, resp1.headers().firstValue("big-header").orElse("not exist")); + + HttpResponse resp2 = http2Client.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/test")) + .header("big-header", bigHeader) + .build(), HttpResponse.BodyHandlers.ofString()); + assertEquals(200, resp2.statusCode()); + assertEquals(bigHeader, resp2.headers().firstValue("big-header").orElse("not exist")); + } + + @RepeatedTest(3) + void testMaxHeadersSize_exception(RepetitionInfo repetitionInfo) throws IOException, InterruptedException { + + setupServerForMaxHeaders(40000, preferredProtocols(repetitionInfo)); + + final String bigHeader = "b".repeat(58000); // test header size from 2000 to 20000 + + HttpResponse resp1 = http1Client.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/test")) + .header("big-header", bigHeader) + .build(), HttpResponse.BodyHandlers.ofString()); + assertEquals(431, resp1.statusCode()); + + HttpResponse resp2 = http2Client.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/test")) + .header("big-header", bigHeader) + .build(), HttpResponse.BodyHandlers.ofString()); + assertEquals(431, resp2.statusCode()); + } + + private void setupServerForMaxHeaders(int maxHeadersSize, List preferredProtocols) { + this.targetServer = httpsServer() + .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true)) + .withMaxHeadersSize(maxHeadersSize) + .addHandler(Method.GET, "/test", (request, response, pathParams) -> { + response.contentType(ContentTypes.TEXT_PLAIN_UTF8); + response.headers().set("big-header", request.headers().get("big-header", "nothing")); + }) + .start(); + + this.crankerRouter = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + + this.crankerServer = httpsServer() + .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) + .withMaxHeadersSize(maxHeadersSize) + .addHandler(crankerRouter.createRegistrationHandler()) + .addHandler(crankerRouter.createHttpHandler()) + .start(); + + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols, 2, this.crankerServer); + } +} diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java index fa52e98..58937ca 100644 --- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java +++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java @@ -1,24 +1,27 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; import io.muserver.Http2ConfigBuilder; import io.muserver.Method; import io.muserver.MuServer; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import com.hsbc.cranker.mucranker.CrankerRouter; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import java.io.IOException; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.List; import java.util.concurrent.TimeUnit; +import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols; import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; import static io.muserver.MuServerBuilder.httpsServer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static scaffolding.Action.swallowException; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; public class CrankerConnectorHttp2Test { @@ -38,20 +41,18 @@ public void after() { if (crankerRouter != null) swallowException(crankerRouter::stop); } - @Test - void canWorkWithHttp2MicroServiceAndHttp1Cranker() throws IOException, InterruptedException { + @RepeatedTest(3) + void canWorkWithHttp2MicroServiceAndHttp1Cranker(RepetitionInfo repetitionInfo) throws IOException, InterruptedException { System.getProperties().setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString()); this.targetServer = httpsServer() .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true)) - .addHandler(Method.GET, "/test", (request, response, pathParams) -> { - response.write("hello world"); - }) + .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world")) .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -60,7 +61,7 @@ void canWorkWithHttp2MicroServiceAndHttp1Cranker() throws IOException, Interrupt .addHandler(crankerRouter.createHttpHandler()) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols(repetitionInfo),2, this.routerServer); HttpResponse response = http2Client.send(HttpRequest.newBuilder() .uri(this.routerServer.uri().resolve("/test")) diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java new file mode 100644 index 0000000..569f60d --- /dev/null +++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java @@ -0,0 +1,129 @@ +package com.hsbc.cranker.connector; + + +import com.hsbc.cranker.mucranker.CrankerRouter; +import io.muserver.Http2ConfigBuilder; +import io.muserver.Method; +import io.muserver.MuServer; +import io.muserver.RouteHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; +import static io.muserver.MuServerBuilder.httpsServer; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static scaffolding.Action.swallowException; + +public class CrankerConnectorListenerTest { + + private final HttpClient httpClient = HttpUtils.createHttpClientBuilder(true) + .version(HttpClient.Version.HTTP_2) + .build(); + private CrankerRouter crankerRouter; + private MuServer targetServer; + private MuServer routerServer; + private CrankerConnector connector; + + @BeforeEach + public void before() { + + this.crankerRouter = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + + this.routerServer = httpsServer() + .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) + .addHandler(crankerRouter.createRegistrationHandler()) + .addHandler(crankerRouter.createHttpHandler()) + .start(); + } + + @AfterEach + public void after() { + if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS)); + if (targetServer != null) swallowException(targetServer::stop); + if (routerServer != null) swallowException(routerServer::stop); + if (crankerRouter != null) swallowException(crankerRouter::stop); + } + + @RepeatedTest(3) + void testProxyEventListenerInvoked(RepetitionInfo repetitionInfo) throws Exception { + + final RouteHandler handler = (request, response, pathParams) -> { + StringBuilder bodyBuilder = new StringBuilder(); + for (Map.Entry header : request.headers()) { + bodyBuilder.append(header.getKey()).append(":").append(header.getValue()).append("\n"); + } + bodyBuilder.append("\n"); + bodyBuilder.append(request.readBodyAsString()); + response.status(200); + response.write(bodyBuilder.toString()); + }; + + this.targetServer = httpsServer() + .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true)) + .addHandler(Method.GET, "/test", handler) + .addHandler(Method.POST, "/test", handler) + .start(); + + this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withTarget(targetServer.uri()) + .withRoute("*") + .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(new MuServer[]{this.routerServer}) + .map(s -> BaseEndToEndTest.registrationUri(s.uri())) + .collect(toList()))) + .withSlidingWindowSize(2) + .withProxyEventListener(new ProxyEventListener() { + @Override + public HttpRequest beforeProxyToTarget(HttpRequest request, HttpRequest.Builder requestBuilder) { + // add extra header + final String clientHeader = request.headers().firstValue("x-client-header").orElseGet(() -> ""); + requestBuilder.header("x-connector-header", "connector-value_" + clientHeader); + return requestBuilder.build(); + } + }) + .start(); + + BaseEndToEndTest.waitForRegistration("*", connector.connectorId(), 2, crankerRouter); + + // GET + HttpResponse response = httpClient.send(HttpRequest.newBuilder() + .header("x-client-header", "client-value") + .uri(this.routerServer.uri().resolve("/test")) + .build(), HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(200)); + final String body = response.body(); + assertThat(body, containsString("x-client-header:client-value\n")); + assertThat(body, containsString("x-connector-header:connector-value_client-value\n")); + + // POST + HttpResponse response_2 = httpClient.send(HttpRequest.newBuilder() + .header("x-client-header", "client-value") + .uri(this.routerServer.uri().resolve("/test")) + .method("POST", HttpRequest.BodyPublishers.ofString("this is request body string")) + .build(), HttpResponse.BodyHandlers.ofString()); + assertThat(response_2.statusCode(), is(200)); + final String body_2 = response_2.body(); + assertThat(body_2, containsString("x-client-header:client-value\n")); + assertThat(body_2, containsString("x-connector-header:connector-value_client-value\n")); + assertThat(body_2, containsString("this is request body string")); + + } + +} diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java index 1e473a3..1aa9637 100644 --- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java +++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java @@ -1,31 +1,37 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; import io.muserver.*; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.hsbc.cranker.mucranker.CrankerRouter; +import scaffolding.AssertUtils; -import java.io.PrintWriter; +import java.io.*; +import java.net.Socket; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static com.hsbc.cranker.connector.BaseEndToEndTest.registrationUri; -import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration; +import static com.hsbc.cranker.connector.BaseEndToEndTest.*; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; +import static io.muserver.MuServerBuilder.httpServer; import static io.muserver.MuServerBuilder.httpsServer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.*; import static scaffolding.Action.swallowException; import static scaffolding.AssertUtils.assertEventually; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; public class CrankerConnectorStopTest { @@ -39,8 +45,8 @@ public class CrankerConnectorStopTest { private MuServer routerServer; private CrankerConnector connector; - private AtomicInteger counter = new AtomicInteger(0); - private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, r -> new Thread(r, "test-pool" + counter.incrementAndGet())); + private final AtomicInteger counter = new AtomicInteger(0); + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, r -> new Thread(r, "test-pool" + counter.incrementAndGet())); @AfterEach public void after() { @@ -48,11 +54,11 @@ public void after() { if (targetServer != null) swallowException(targetServer::stop); if (routerServer != null) swallowException(routerServer::stop); if (crankerRouter != null) swallowException(crankerRouter::stop); - if (executorService != null) swallowException(executorService::shutdownNow); + swallowException(executorService::shutdownNow); } - @Test - void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionException, InterruptedException, TimeoutException { + @RepeatedTest(3) + void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout(RepetitionInfo repetitionInfo) { AtomicInteger serverCounter = new AtomicInteger(0); AtomicInteger clientCounter = new AtomicInteger(0); @@ -71,7 +77,7 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -80,7 +86,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx .addHandler(crankerRouter.createHttpHandler()) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, + preferredProtocols(repetitionInfo),2, this.routerServer); int requestCount = 3; for (int i = 0; i < requestCount; i++) { @@ -105,8 +112,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx assertEventually(clientCounter::get, equalTo(requestCount)); } - @Test - void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws ExecutionException, InterruptedException, TimeoutException { + @RepeatedTest(3) + void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery(RepetitionInfo repetitionInfo) { AtomicInteger serverCounter = new AtomicInteger(0); AtomicInteger clientCounter = new AtomicInteger(0); @@ -120,14 +127,14 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E for (int i = 0; i < 12; i++) { writer.print(i + ","); writer.flush(); - Thread.sleep(1000L); + Thread.sleep(100L); } } }) .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -136,8 +143,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E .addHandler(crankerRouter.createHttpHandler()) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer); - + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, + preferredProtocols(repetitionInfo),2, this.routerServer); executorService.submit(() -> { try { @@ -162,8 +169,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E } - @Test - void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, InterruptedException, TimeoutException { + @RepeatedTest(3) + void getTimeoutExceptionIfExceedTimeout(RepetitionInfo repetitionInfo) { AtomicInteger serverCounter = new AtomicInteger(0); AtomicInteger serverExceptionCounter = new AtomicInteger(0); @@ -180,7 +187,6 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted asyncHandle.write(ByteBuffer.wrap("hello world".getBytes())); asyncHandle.complete(); } catch (Throwable throwable) { - System.out.println(throwable); serverExceptionCounter.incrementAndGet(); } }, 3, TimeUnit.SECONDS); @@ -188,7 +194,7 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -197,7 +203,8 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted .addHandler(crankerRouter.createHttpHandler()) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, + preferredProtocols(repetitionInfo),2, this.routerServer); int requestCount = 3; for (int i = 0; i < requestCount; i++) { @@ -221,18 +228,16 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted this.targetServer.stop(); } - @Test - public void returnFalseWhenCallingStopBeforeCallingStart() { + @RepeatedTest(3) + public void throwIllegalStateExceptionWhenCallingStopBeforeCallingStart(RepetitionInfo repetitionInfo) { this.targetServer = httpsServer() .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true)) - .addHandler(Method.GET, "/test", (request, response, pathParams) -> { - response.write("hello world"); - }) + .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world")) .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -242,6 +247,7 @@ public void returnFalseWhenCallingStopBeforeCallingStart() { .start(); this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) .withRouterUris(RegistrationUriSuppliers.fixedUris(URI.create("wss://localhost:1234"))) .withRoute("*") @@ -249,23 +255,21 @@ public void returnFalseWhenCallingStopBeforeCallingStart() { .withComponentName("cranker-connector-unit-test") .build(); // not start - // call before start - assertFalse(() -> connector.stop(1, TimeUnit.SECONDS)); + + assertFalse(connector.stop(1, TimeUnit.SECONDS)); } - @Test - public void returnFalseWhenCallingStopMultipleTime() { + @RepeatedTest(3) + public void throwIllegalStateExceptionWhenCallingStopMultipleTime(RepetitionInfo repetitionInfo) { this.targetServer = httpsServer() .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true)) - .addHandler(Method.GET, "/test", (request, response, pathParams) -> { - response.write("hello world"); - }) + .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world")) .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000) + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) .start(); this.routerServer = httpsServer() @@ -275,6 +279,7 @@ public void returnFalseWhenCallingStopMultipleTime() { .start(); this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri()))) .withRoute("*") @@ -283,10 +288,162 @@ public void returnFalseWhenCallingStopMultipleTime() { .start(); // call stop the first time - connector.stop(10, TimeUnit.SECONDS); + assertTrue(connector.stop(5, TimeUnit.SECONDS)); + + // return false for the second time + assertFalse(connector.stop(1, TimeUnit.SECONDS)); + } + + + @RepeatedTest(3) + public void clientDropEarlyCanNotifyMicroservice(RepetitionInfo repetitionInfo) throws IOException { + + final ResponseInfo[] responseInfo = new ResponseInfo[1]; + final AtomicBoolean serverReceived = new AtomicBoolean(false); + + this.targetServer = httpServer() + .addHandler(Method.GET, "/test", (request, response, pathParams) -> { + // no response, just holding the tcp connection until client drop + final AsyncHandle asyncHandle = request.handleAsync(); + asyncHandle.addResponseCompleteHandler(info -> { + log.info("http server response complete, info={}", info); + responseInfo[0] = info; + }); + serverReceived.set(true); + asyncHandle.write(ByteBuffer.wrap("hello1".getBytes())); + }) + .start(); + + log.info("http server started at {}", this.targetServer.uri()); + + this.crankerRouter = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + + this.routerServer = httpServer() + .withHttpPort(0) + .addHandler(crankerRouter.createRegistrationHandler()) + .addHandler(crankerRouter.createHttpHandler()) + .start(); + + this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri()))) + .withRoute("*") + .withTarget(targetServer.uri()) + .withComponentName("cranker-connector-unit-test") + .start(); + + waitForRegistration("*", connector.connectorId(), 2, crankerRouter); - // second time will throw exception - assertFalse(() -> connector.stop(1, TimeUnit.SECONDS)); + + String host = this.routerServer.uri().getHost(); + int port = this.routerServer.uri().getPort(); + String path = "/test"; + + try (Socket socket = new Socket(host, port); + OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)) { + writer.write("GET " + path + " HTTP/1.1\r\n"); + writer.write("Host: " + host + "\r\n"); + writer.write("User-Agent: Mozilla/5.0\r\n"); + writer.write("Connection: Close\r\n"); + writer.write("\r\n"); + writer.flush(); + + assertEventually(serverReceived::get, is(true)); + + // client sending request and close it early + writer.close(); + socket.close(); + + AssertUtils.assertEventually(() -> responseInfo[0] != null && !responseInfo[0].completedSuccessfully(), is(true), 10, 100); + } + } + + @RepeatedTest(3) + public void connectorStopCanNotifyServiceAndClient(RepetitionInfo repetitionInfo) { + + final ResponseInfo[] responseInfo = new ResponseInfo[1]; + final AtomicBoolean serverReceived = new AtomicBoolean(false); + + this.targetServer = httpServer() + .addHandler(Method.GET, "/test", (request, response, pathParams) -> { + // no response, just holding the tcp connection until client drop + final AsyncHandle asyncHandle = request.handleAsync(); + asyncHandle.addResponseCompleteHandler(info -> { + log.info("http server response complete, info={}", info); + responseInfo[0] = info; + }); + serverReceived.set(true); + asyncHandle.write(ByteBuffer.wrap("hello1".getBytes())); + }) + .start(); + + log.info("http server started at {}", this.targetServer.uri()); + + this.crankerRouter = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + + this.routerServer = httpServer() + .withHttpPort(0) + .addHandler(crankerRouter.createRegistrationHandler()) + .addHandler(crankerRouter.createHttpHandler()) + .start(); + + this.connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri()))) + .withRoute("*") + .withTarget(targetServer.uri()) + .withComponentName("cranker-connector-unit-test") + .start(); + + waitForRegistration("*", connector.connectorId(), 2, crankerRouter); + + String host = this.routerServer.uri().getHost(); + int port = this.routerServer.uri().getPort(); + String path = "/test"; + + // start a client and wait for exception + AtomicBoolean isClientCompleted = new AtomicBoolean(false); + new Thread(() -> { + try (Socket socket = new Socket(host, port); + OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { + writer.write("GET " + path + " HTTP/1.1\r\n"); + writer.write("Host: " + host + "\r\n"); + writer.write("User-Agent: Mozilla/5.0\r\n"); + writer.write("Connection: Close\r\n"); + writer.write("\r\n"); + writer.flush(); + + + String line; + while ((line = reader.readLine()) != null) { + log.info(line); + } + isClientCompleted.set(true); + } catch (Exception exception) { + isClientCompleted.set(true); + } + }).start(); + + // assert request already arrived server + assertEventually(serverReceived::get, is(true)); + + // close connector while request still inflight + log.info("connector stopping"); + final boolean isSuccess = connector.stop(1, TimeUnit.SECONDS); + log.info("connector stopped, isSuccess={}", isSuccess); + assertThat(isSuccess, is(false)); + + // microservice and client both aware + AssertUtils.assertEventually(() -> responseInfo[0] != null && !responseInfo[0].completedSuccessfully(), is(true), 10, 100); + AssertUtils.assertEventually(isClientCompleted::get, is(true)); } + } diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java index 1c0c495..2f7e33e 100644 --- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java +++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java @@ -7,18 +7,23 @@ import io.muserver.handlers.ResourceHandlerBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import scaffolding.ByteUtils; +import scaffolding.StringUtils; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import static io.muserver.MuServerBuilder.httpServer; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -33,26 +38,34 @@ public class CrankerConnectorTest extends BaseEndToEndTest { .addHandler((request, response) -> handler.handle(request, response)) .start(); - private final CrankerConnector connector = CrankerConnectorBuilder.connector() - .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) - .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri()))) - .withRoute("*") - .withTarget(targetServer.uri()) - .withComponentName("cranker-connector-unit-test") - .start(); + private CrankerConnector connector; @BeforeEach - public void waitForRegistration() { - waitForRegistration("*", 2, crankerRouter); + void setUp(RepetitionInfo repetitionInfo) { + final List preferredProtocols = preferredProtocols(repetitionInfo); + connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri()))) + .withRoute("*") + .withTarget(targetServer.uri()) + .withComponentName("cranker-connector-unit-test") + .start(); + + waitForRegistration("*", connector.connectorId(), 2, crankerRouter); + + assertEventually( + () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(), + equalTo(preferredProtocols.get(0))); } @AfterEach public void stop() throws Exception { - connector.stop(10, TimeUnit.SECONDS); + assertThat(connector.stop(10, TimeUnit.SECONDS), is(true)); targetServer.stop(); } - @Test + @RepeatedTest(3) public void postingBodiesWorks() throws Exception { final String[] contentLength = new String[1]; handler = (request, response) -> { @@ -74,8 +87,9 @@ public void postingBodiesWorks() throws Exception { assertEventually(() -> contentLength[0], equalTo("100000")); } - @Test + @RepeatedTest(3) public void getRequestsWork() throws Exception { + handler = (request, response) -> { response.contentType(ContentTypes.TEXT_PLAIN_UTF8); response.sendChunk("This "); @@ -94,7 +108,47 @@ public void getRequestsWork() throws Exception { } } - @Test + @RepeatedTest(3) + public void getRequestsWork_largeResponse() throws Exception { + + final String text = StringUtils.randomStringOfLength(100000); + handler = (request, response) -> { + String agent = request.headers().get("user-agent"); + response.contentType(ContentTypes.TEXT_PLAIN_UTF8); + response.write(text + agent); + return true; + }; + + for (int i = 0; i < 10; i++) { + HttpResponse resp = testClient.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/something")) + .header("user-agent", "cranker-connector-test-client-" + i) + .build(), HttpResponse.BodyHandlers.ofString()); + assertThat(resp.statusCode(), is(200)); + assertEquals(text + "cranker-connector-test-client-" + i, resp.body()); + } + } + + + @RepeatedTest(3) + public void forwardHeaderSetCorrectly() throws Exception { + + handler = (request, response) -> { + response.write(request.headers().get("forwarded")); + return true; + }; + + HttpResponse resp = testClient.send(HttpRequest.newBuilder() + .uri(crankerServer.uri().resolve("/something")) + .header("user-agent", "cranker-connector-test-client") + .build(), HttpResponse.BodyHandlers.ofString()); + assertThat(resp.body(), containsString("by=")); + assertThat(resp.body(), containsString("for=")); + assertThat(resp.body(), containsString("host=")); + assertThat(resp.body(), containsString("proto=")); + } + + @RepeatedTest(3) public void ifTheTargetReturnsGzippedContentThenItIsProxiedCompressed() throws Exception { String body = randomAsciiStringOfLength(100000); handler = (request, response) -> { @@ -111,7 +165,7 @@ public void ifTheTargetReturnsGzippedContentThenItIsProxiedCompressed() throws E assertEquals(body, decompressedBody); } - @Test + @RepeatedTest(3) public void largeResponsesWork() throws Exception { String body = Files.readString(Path.of("src/test/resources/web/large-file.txt")); handler = ResourceHandlerBuilder.classpathHandler("/web").build(); @@ -125,7 +179,7 @@ public void largeResponsesWork() throws Exception { assertEquals(body, actual); } - @Test + @RepeatedTest(3) public void toStringReturnsUsefulInfo() { assertThat(connector.toString(), startsWith("CrankerConnector (" + connector.connectorId() + ") registered to: [RouterRegistration")); } diff --git a/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java b/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java index e49bdc3..b7b261a 100644 --- a/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java +++ b/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java @@ -1,23 +1,33 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; import io.muserver.MuHandler; import io.muserver.MuServer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import scaffolding.StringUtils; -import com.hsbc.cranker.mucranker.CrankerRouter; import java.net.URI; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; import static io.muserver.ContextHandlerBuilder.context; import static io.muserver.MuServerBuilder.httpServer; +import static java.util.stream.Collectors.toList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static scaffolding.Action.swallowException; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; +import static scaffolding.AssertUtils.assertEventually; + + public class DualRouterTest extends BaseEndToEndTest { @@ -31,20 +41,97 @@ public class DualRouterTest extends BaseEndToEndTest { } }; - private static final MuServer targetA1 = httpServer().addHandler(context("a").addHandler(textHandler)).start(); - private static final MuServer targetA2 = httpServer().addHandler(context("a").addHandler(textHandler)).start(); - private static final MuServer targetB = httpServer().addHandler(context("b").addHandler(textHandler)).start(); - private static final MuServer targetCatchAll = httpServer().addHandler(textHandler).start(); - private static final CrankerRouter router1 = crankerRouter().start(); - private static final MuServer routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start(); - private static final CrankerRouter router2 = crankerRouter().start(); - private static final MuServer routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start(); - private static final CrankerConnector connector1 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "a", targetA1, 2, routerServer1, routerServer2); - private static final CrankerConnector connector2 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "b", targetB, 2, routerServer1, routerServer2); - private static final CrankerConnector connector3 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "a", targetA2, 2, routerServer1, routerServer2); - private static final CrankerConnector connector4 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "*", targetCatchAll, 2, routerServer1, routerServer2); - - @Test + private MuServer targetA1; + private MuServer targetA2; + private MuServer targetB; + private MuServer targetCatchAll; + private CrankerRouter router1; + private MuServer routerServer1; + private CrankerRouter router2; + private MuServer routerServer2; + private CrankerConnector connector1; + private CrankerConnector connector2; + private CrankerConnector connector3; + private CrankerConnector connector4; + + private static CrankerConnector startConnector(String targetServiceName, + MuServer target, + List preferredProtocols, + int slidingWindowSize, + MuServer... registrationRouters) { + CrankerConnector connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withTarget(target.uri()) + .withRoute(targetServiceName) + .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(registrationRouters) + .map(s -> registrationUri(s.uri())) + .collect(toList()))) + .withSlidingWindowSize(slidingWindowSize) + .start(); + + assertEventually( + () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(), + equalTo(preferredProtocols.get(0))); + + return connector; + } + + + @BeforeEach + void setUp(RepetitionInfo repetitionInfo) { + final List preferredProtocols = preferredProtocols(repetitionInfo); + + targetA1 = httpServer().addHandler(context("a").addHandler(textHandler)).start(); + targetA2 = httpServer().addHandler(context("a").addHandler(textHandler)).start(); + targetB = httpServer().addHandler(context("b").addHandler(textHandler)).start(); + targetCatchAll = httpServer().addHandler(textHandler).start(); + + router1 = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + routerServer1 = httpServer() + .addHandler(router1.createRegistrationHandler()) + .addHandler(router1.createHttpHandler()).start(); + + router2 = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + routerServer2 = httpServer() + .addHandler(router2.createRegistrationHandler()) + .addHandler(router2.createHttpHandler()).start(); + + connector1 = startConnector("a", targetA1,preferredProtocols, 2, routerServer1, routerServer2); + waitForRegistration("a", connector1.connectorId(), 2, router1, router2); + + connector2 = startConnector("b", targetB, preferredProtocols,2, routerServer1, routerServer2); + waitForRegistration("b", connector2.connectorId(), 2, router1, router2); + + connector3 = startConnector("a", targetA2, preferredProtocols,2, routerServer1, routerServer2); + waitForRegistration("a", connector3.connectorId(), 2, router1, router2); + + connector4 = startConnector("*", targetCatchAll, preferredProtocols,2, routerServer1, routerServer2); + waitForRegistration("*", connector4.connectorId(), 2, router1, router2); + + } + + @AfterEach + public void stop() { + CrankerConnector[] connectors = {connector1, connector2, connector3, connector4}; + for (CrankerConnector connector : connectors) { + swallowException(() -> connector.stop(30, TimeUnit.SECONDS)); + } + swallowException(routerServer1::stop); + swallowException(routerServer2::stop); + swallowException(targetA1::stop); + swallowException(targetA2::stop); + swallowException(targetB::stop); + swallowException(targetCatchAll::stop); + swallowException(router1::stop); + swallowException(router2::stop); + } + + @RepeatedTest(3) public void canMakeGETRequestsToBothConnectorApp() throws Exception { assertGetLargeHtmlWorks(routerServer1, "/a"); assertGetLargeHtmlWorks(routerServer1, "/b"); @@ -54,9 +141,9 @@ public void canMakeGETRequestsToBothConnectorApp() throws Exception { assertGetLargeHtmlWorks(routerServer2, ""); } - @Test + @RepeatedTest(3) public void canMakeRequestWhenOneConnectorAppIsDown() throws Exception { - connector1.stop(1, TimeUnit.MINUTES); + assertThat(connector1.stop(1, TimeUnit.MINUTES), is(true)); assertGetLargeHtmlWorks(routerServer1, "/a"); assertGetLargeHtmlWorks(routerServer1, "/b"); assertGetLargeHtmlWorks(routerServer1, ""); @@ -79,20 +166,4 @@ private void assertGetLargeHtmlWorks(MuServer routerServer, String prefix) throw assertThat(resp.body(), is(TEXT)); } - @AfterAll - public static void stop() { - CrankerConnector[] connectors = {connector1, connector2, connector3, connector4}; - for (CrankerConnector connector : connectors) { - swallowException(() -> connector.stop(30, TimeUnit.SECONDS)); - } - swallowException(routerServer1::stop); - swallowException(routerServer2::stop); - swallowException(targetA1::stop); - swallowException(targetA2::stop); - swallowException(targetB::stop); - swallowException(targetCatchAll::stop); - swallowException(router1::stop); - swallowException(router2::stop); - } - } diff --git a/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java b/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java index 6140cf1..f24dfc2 100644 --- a/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java +++ b/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java @@ -1,24 +1,21 @@ package com.hsbc.cranker.connector; +import com.hsbc.cranker.mucranker.CrankerRouter; import io.muserver.Method; import io.muserver.MuServer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import com.hsbc.cranker.mucranker.CrankerRouter; +import io.muserver.SsePublisher; +import org.junit.jupiter.api.*; +import scaffolding.SseTestClient; import java.net.URI; import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; import static io.muserver.ContextHandlerBuilder.context; import static io.muserver.MuServerBuilder.httpServer; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,27 +23,27 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static scaffolding.Action.swallowException; import static scaffolding.AssertUtils.assertEventually; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; public class RouterSupplierTest extends BaseEndToEndTest { private static final String route = "my-service"; - private static final MuServer target = httpServer().addHandler(context(route).addHandler(Method.GET, "/hello", (request, response, pathParams) -> response.write("Hello from target"))).start(); - private static final CrankerRouter router1 = crankerRouter().start(); - private static final MuServer routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start(); - private static final CrankerRouter router2 = crankerRouter().start(); - private static final MuServer routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start(); + private MuServer target; + private CrankerRouter router1; + private MuServer routerServer1; + private CrankerRouter router2; + private MuServer routerServer2; - @Test - public void dnsLookupCanWorkForLocalhost() throws Exception { + @RepeatedTest(3) + public void dnsLookupCanWorkForLocalhost(RepetitionInfo repetitionInfo) throws Exception { CrankerConnector connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) .withTarget(target.uri()) .withRoute(route) .withRouterLookupByDNS(registrationUri(routerServer1), registrationUri(routerServer2)) .start(); - waitForRegistration(route, 2, router1); - waitForRegistration(route, 2, router2); + waitForRegistration(route, connector.connectorId(), 2, router1); + waitForRegistration(route, connector.connectorId(), 2, router2); for (URI routerUri : List.of(routerServer1.uri(), routerServer2.uri())) { for (int i = 0; i < 4; i++) { // when resolving localhost, it may get two URLs (an IP4 and IP6), so just do some extra calls to make sure all are hit at least once assertGetWorks(routerUri); @@ -55,67 +52,136 @@ public void dnsLookupCanWorkForLocalhost() throws Exception { assertStoppable(connector); } - RouterEventListener.ChangeData changeData; - @Test - public void routersCanBeDynamicallyAddedAndRemoved() throws Exception { + @RepeatedTest(3) + public void routersCanBeDynamicallyAddedAndRemoved(RepetitionInfo repetitionInfo) throws Exception { + final AtomicReference changeData = new AtomicReference<>(); URI reg1 = registrationUri(routerServer1); URI reg2 = registrationUri(routerServer2); List routerUris = new ArrayList<>(List.of(reg1, reg2)); + final List preferredProtocols = preferredProtocols(repetitionInfo); + final int expectErrorCode = preferredProtocols.get(0).startsWith("cranker_1.0") ? 503 : 404; CrankerConnectorImpl connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols) + .withSlidingWindowSize(2) .withTarget(target.uri()) .withRoute(route) .withRouterUris(() -> routerUris) .withRouterRegistrationListener(new RouterEventListener() { @Override public void onRegistrationChanged(ChangeData data) { - changeData = data; + changeData.set(data); } }) .start(); - waitForRegistration(route, 2, router1); - waitForRegistration(route, 2, router2); + waitForRegistration(route, connector.connectorId(), 2, router1); + waitForRegistration(route, connector.connectorId(), 2, router2); - assertThat(changeData.added(), hasSize(2)); - assertThat(changeData.removed(), hasSize(0)); - assertThat(changeData.unchanged(), hasSize(0)); + assertThat(changeData.get().added(), hasSize(2)); + assertThat(changeData.get().removed(), hasSize(0)); + assertThat(changeData.get().unchanged(), hasSize(0)); + + assertGetWorks(routerServer1.uri()); + assertGetWorks(routerServer2.uri()); + + final List registrations = connector.routers(); + assertThat(registrations.size(), is(2)); + assertThat(registrations.get(0).idleSocketSize(), is(2)); + assertThat(registrations.get(1).idleSocketSize(), is(2)); routerUris.remove(reg2); connector.updateRoutersAsync().get(); - assertThat(changeData.added(), hasSize(0)); - assertThat(changeData.removed(), hasSize(1)); - assertThat(changeData.removed().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/"))); - assertThat(changeData.unchanged(), hasSize(1)); - assertThat(changeData.unchanged().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/"))); + assertThat(changeData.get().added(), hasSize(0)); + assertThat(changeData.get().removed(), hasSize(1)); + assertThat(changeData.get().removed().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/"))); + assertThat(changeData.get().unchanged(), hasSize(1)); + assertThat(changeData.get().unchanged().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/"))); assertGetWorks(routerServer1.uri()); - assertGetDoesNotWork(routerServer2.uri()); + assertGetNotWorks(routerServer2.uri(), expectErrorCode); + + final List registrationsAfterRemove2 = connector.routers(); + assertThat(registrationsAfterRemove2.size(), is(1)); + assertThat(registrationsAfterRemove2.get(0).idleSocketSize(), is(2)); routerUris.remove(reg1); routerUris.add(reg2); connector.updateRoutersAsync().get(); - assertThat(changeData.added(), hasSize(1)); - assertThat(changeData.added().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/"))); - assertThat(changeData.removed(), hasSize(1)); - assertThat(changeData.removed().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/"))); - assertThat(changeData.unchanged(), hasSize(0)); + assertThat(changeData.get().added(), hasSize(1)); + assertThat(changeData.get().added().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/"))); + assertThat(changeData.get().removed(), hasSize(1)); + assertThat(changeData.get().removed().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/"))); + assertThat(changeData.get().unchanged(), hasSize(0)); - assertGetDoesNotWork(routerServer1.uri()); - waitForRegistration(route, 2, router2); + assertGetNotWorks(routerServer1.uri(), expectErrorCode); + waitForRegistration(route, connector.connectorId(), 2, router2); assertGetWorks(routerServer2.uri()); assertStoppable(connector); } - @Test + @RepeatedTest(3) + public void routersCanBeUpdatedWhenUpdateRouteTaskTimeout(RepetitionInfo repetitionInfo) throws Exception { + final AtomicReference changeData = new AtomicReference<>(); + URI reg1 = registrationUri(routerServer1); + URI reg2 = registrationUri(routerServer2); + List routerUris = new ArrayList<>(List.of(reg1, reg2)); + CrankerConnectorImpl connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) + .withTarget(target.uri()) + .withRoute(route) + .withRouterUris(() -> routerUris) + .withRouterRegistrationListener(new RouterEventListener() { + @Override + public void onRegistrationChanged(ChangeData data) { + changeData.set(data); + } + }) + .withRouterUpdateInterval(500, TimeUnit.MILLISECONDS) + .start(); + + waitForRegistration(route, connector.connectorId(),1, router1); + waitForRegistration(route, connector.connectorId(), 1, router2); + + assertThat(changeData.get().added(), hasSize(2)); + assertThat(changeData.get().removed(), hasSize(0)); + assertThat(changeData.get().unchanged(), hasSize(0)); + + SseTestClient client = SseTestClient.startSse(routerServer2.uri().resolve("/my-service/sse/counter")); + routerUris.remove(reg2); + Thread.sleep(1000); + + assertThat(changeData.get().added(), hasSize(0)); + assertThat(changeData.get().removed(), hasSize(1)); + assertThat(changeData.get().unchanged(), hasSize(1)); + + // clean it + changeData.set(new RouterEventListener.ChangeData(Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); + routerUris.remove(reg1); + routerUris.add(reg2); + + // reach timeout 500ms + Thread.sleep(1000); + + // routers can be updated + assertThat(changeData.get().added(), hasSize(1)); + assertThat(changeData.get().removed(), hasSize(1)); + assertThat(changeData.get().unchanged(), hasSize(0)); + + assertStoppable(connector); + client.stop(); + } + + @RepeatedTest(3) @Disabled("Takes a minute to run so not normally run as part of build") - public void nonExistantDomainsReturnErrorsToListener() throws Exception { + public void nonExistentDomainsReturnErrorsToListener(RepetitionInfo repetitionInfo) throws Exception { AtomicReference received = new AtomicReference<>(); AtomicBoolean useNonExistantDomain = new AtomicBoolean(false); var connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector() + .withPreferredProtocols(preferredProtocols(repetitionInfo)) .withTarget(target.uri()) .withRoute(route) .withRouterUris(() -> { @@ -123,7 +189,7 @@ public void nonExistantDomainsReturnErrorsToListener() throws Exception { // can't have an exception on the first lookup because that will get bubbled from .start() return Set.of(URI.create("ws://localhost:9000")); } else { - return RegistrationUriSuppliers.dnsLookup(URI.create("ws://" + UUID.randomUUID() + ".example.org:9000")).get(); + return RegistrationUriSuppliers.dnsLookup(URI.create("ws://" + UUID.randomUUID() + ".example.hsbc:9000")).get(); } }) .withRouterRegistrationListener(new RouterEventListener() { @@ -142,7 +208,6 @@ public void onRouterDnsLookupError(Throwable error) { } - private void assertGetWorks(URI routerUri) throws java.io.IOException, InterruptedException { HttpResponse resp = testClient.send(HttpRequest.newBuilder() .uri(routerUri.resolve("/" + route + "/hello")) @@ -151,13 +216,13 @@ private void assertGetWorks(URI routerUri) throws java.io.IOException, Interrupt assertEquals("Hello from target", resp.body()); } - private void assertGetDoesNotWork(URI routerUri) { + private void assertGetNotWorks(URI routerUri, int expectedStatusCode) { assertEventually(() -> { HttpResponse resp = testClient.send(HttpRequest.newBuilder() .uri(routerUri.resolve("/" + route + "/hello")) .build(), HttpResponse.BodyHandlers.ofString()); return resp.statusCode(); - }, equalTo(503)); + }, equalTo(expectedStatusCode)); } @@ -165,8 +230,36 @@ private void assertStoppable(CrankerConnector connector) { Assertions.assertDoesNotThrow(() -> connector.stop(30, TimeUnit.SECONDS)); } - @AfterAll - public static void stop() { + @BeforeEach + public void start() { + this.target = httpServer() + .addHandler(context(route).addHandler(Method.GET, "/hello", (request, response, pathParams) -> response.write("Hello from target"))) + .addHandler(Method.GET, "/my-service/sse/counter", (request, response, pathParams) -> { + SsePublisher publisher = SsePublisher.start(request, response); + for (int i1 = 0; i1 < 1000; i1++) { + try { + publisher.send("Number " + i1); + Thread.sleep(1000); + } catch (Exception e) { + // The user has probably disconnected so stopping + break; + } + } + publisher.close(); + }) + .start(); + this.router1 = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + this.routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start(); + this.router2 = crankerRouter() + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); + this.routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start(); + } + + @AfterEach + public void stop() { swallowException(routerServer1::stop); swallowException(routerServer2::stop); swallowException(target::stop); diff --git a/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java b/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java index dba75cb..822e2de 100644 --- a/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java +++ b/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java @@ -5,18 +5,21 @@ import io.muserver.MuServer; import io.muserver.SsePublisher; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; import org.junit.jupiter.api.Test; import scaffolding.SseTestClient; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; +import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; import static io.muserver.MuServerBuilder.httpServer; import static io.muserver.MuServerBuilder.httpsServer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; import static scaffolding.Action.swallowException; -import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter; public class ServerSentEventTest extends BaseEndToEndTest { @@ -33,8 +36,8 @@ public void after() { if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS)); } - @Test - public void MuServer_NormalSseTest() throws Exception { + @RepeatedTest(3) + public void MuServer_NormalSseTest(RepetitionInfo repetitionInfo) throws Exception { this.targetServer = httpServer() .addHandler(Method.GET, "/sse/counter", (request, response, pathParams) -> { @@ -47,7 +50,8 @@ public void MuServer_NormalSseTest() throws Exception { .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000).start(); + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); this.router = httpsServer() .addHandler(crankerRouter.createRegistrationHandler()) @@ -55,7 +59,8 @@ public void MuServer_NormalSseTest() throws Exception { .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, + preferredProtocols(repetitionInfo),2, router); this.client = SseTestClient.startSse(router.uri().resolve("/sse/counter")); this.client.waitUntilClose(5, TimeUnit.SECONDS); @@ -69,7 +74,7 @@ public void MuServer_NormalSseTest() throws Exception { ))); } - @Test + @Test public void MuServer_TargetServerDownInMiddleTest_ClientTalkToTargetServer() throws Exception { this.targetServer = httpServer() @@ -94,8 +99,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToTargetServer() thr ))); } - @Test - public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Exception { + @RepeatedTest(3) + public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter(RepetitionInfo repetitionInfo) throws Exception { this.targetServer = httpServer() .addHandler(Method.GET, "/sse/counter", (request, response, pathParams) -> { @@ -108,7 +113,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Ex .start(); this.crankerRouter = crankerRouter() - .withConnectorMaxWaitInMillis(4000).start(); + .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0")) + .start(); this.router = httpsServer() .addHandler(crankerRouter.createRegistrationHandler()) @@ -116,7 +122,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Ex .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false)) .start(); - this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router); + this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, + preferredProtocols(repetitionInfo), 2, router); this.client = SseTestClient.startSse(router.uri().resolve("/sse/counter")); this.client.waitUntilError(100, TimeUnit.SECONDS); diff --git a/src/test/java/com/hsbc/cranker/connector/ManualTest.java b/src/test/java/manual/ManualTest.java similarity index 96% rename from src/test/java/com/hsbc/cranker/connector/ManualTest.java rename to src/test/java/manual/ManualTest.java index 787c5c6..20df11e 100644 --- a/src/test/java/com/hsbc/cranker/connector/ManualTest.java +++ b/src/test/java/manual/ManualTest.java @@ -1,5 +1,6 @@ -package com.hsbc.cranker.connector; +package manual; +import com.hsbc.cranker.connector.*; import io.muserver.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,10 +29,7 @@ public static void main(String[] args) { MuServer targetServer = muServer() .withHttpsPort(12123) .withHttp2Config(Http2ConfigBuilder.http2Enabled()) - .addHandler(Method.GET, "/hi", (request, response, pathParams) -> { - System.out.println("received /hi"); - response.write("hi"); - }) + .addHandler(Method.GET, "/hi", (request, response, pathParams) -> response.write("hi")) .addHandler(Method.GET, "/health", (request, response, pathParams) -> { response.contentType("text/plain;charset=utf-8"); for (HttpConnection con : request.server().activeConnections()) { diff --git a/src/test/java/manual/RunLocalConnector.java b/src/test/java/manual/RunLocalConnector.java new file mode 100644 index 0000000..bc8973d --- /dev/null +++ b/src/test/java/manual/RunLocalConnector.java @@ -0,0 +1,55 @@ +package manual; + +import com.hsbc.cranker.connector.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.http.HttpRequest; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_3; + +public class RunLocalConnector { + + private static final Logger log = LoggerFactory.getLogger(RunLocalConnector.class); + + static { + System.setProperty("jdk.internal.httpclient.disableHostnameVerification", "true"); + } + + public static void main(String[] args) { + + final CrankerConnector connector = CrankerConnectorBuilder.connector() + .withPreferredProtocols(List.of(CRANKER_PROTOCOL_3)) + .withDomain("127.0.0.1") + .withRouterUris(() -> List.of(URI.create("wss://localhost:12666"))) + .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build()) + .withComponentName("local-test") + .withRoute("*") + .withTarget(URI.create("http://localhost:14444")) + .withRouterRegistrationListener(new RouterEventListener() { + public void onRegistrationChanged(ChangeData data) { + log.info("Router registration changed: " + data); + } + public void onSocketConnectionError(RouterRegistration router1, Throwable exception) { + log.warn("Error connecting to " + router1, exception); + } + }) + .withProxyEventListener(new ProxyEventListener() { + @Override + public void onProxyError(HttpRequest request, Throwable error) { + log.warn("onProxyError, request="+ request, error); + } + }) + .start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + final boolean stop = connector.stop(5, TimeUnit.SECONDS); + log.info("connector stop, state={}", stop); + })); + + System.out.println("connector started"); + } +}