diff --git a/pom.xml b/pom.xml
index e3d1594..488bf51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,19 +54,19 @@
org.slf4j
slf4j-simple
- 2.0.7
+ 2.0.9
test
com.hsbc.cranker
mu-cranker-router
- 1.0.2
+ 1.0.3
test
io.muserver
mu-server
- 0.74.1
+ 0.74.2
test
diff --git a/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java b/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java
index 65134a2..df5ad95 100644
--- a/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java
+++ b/src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java
@@ -285,7 +285,8 @@ public void close(State newState, int statusCode, Throwable error) {
if (webSocket != null && !webSocket.isOutputClosed()) {
webSocket.sendClose(statusCode, error != null ? error.getMessage() : "");
}
- if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled()) {
+ if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled()
+ && statusCode != WebSocket.NORMAL_CLOSURE) {
responseFuture.cancel(true);
}
if (responseBodySubscription != null) {
diff --git a/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java b/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java
index 95f9a10..49cd021 100644
--- a/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java
@@ -1,27 +1,36 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
+import com.hsbc.cranker.mucranker.CrankerRouterBuilder;
import io.muserver.MuServer;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepetitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.hsbc.cranker.mucranker.CrankerRouter;
-import com.hsbc.cranker.mucranker.CrankerRouterBuilder;
+import scaffolding.AssertUtils;
import java.net.URI;
import java.net.http.HttpClient;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
+import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_1;
+import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_3;
import static io.muserver.MuServerBuilder.muServer;
import static java.util.stream.Collectors.toList;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static scaffolding.AssertUtils.assertEventually;
public class BaseEndToEndTest {
- private static final Logger log = LoggerFactory.getLogger(BaseEndToEndTest.class);
-
protected final HttpClient testClient = HttpUtils.createHttpClientBuilder(true).build();
- protected CrankerRouter crankerRouter = CrankerRouterBuilder.crankerRouter().start();
+ protected CrankerRouter crankerRouter = CrankerRouterBuilder
+ .crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
protected MuServer registrationServer = startRegistrationServer(0);
protected MuServer crankerServer = startCrankerServer(0);
@@ -38,45 +47,64 @@ protected static URI registrationUri(URI routerUri) {
}
public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter,
- String targetServiceName,
- MuServer target,
- int slidingWindowSize,
- MuServer... registrationRouters) {
- CrankerConnector connector = startConnector(targetServiceName, target, slidingWindowSize, registrationRouters);
- waitForRegistration(targetServiceName, 2, crankerRouter);
+ String targetServiceName,
+ MuServer target,
+ int slidingWindowSize,
+ MuServer... registrationRouters) {
+ return startConnectorAndWaitForRegistration(crankerRouter, targetServiceName, target, List.of(CRANKER_PROTOCOL_1), slidingWindowSize, registrationRouters);
+ }
+
+ public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter,
+ String targetServiceName,
+ MuServer target,
+ List preferredProtocols,
+ int slidingWindowSize,
+ MuServer... registrationRouters) {
+ CrankerConnector connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols)
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withTarget(target.uri())
+ .withRoute(targetServiceName)
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(registrationRouters)
+ .map(s -> registrationUri(s.uri()))
+ .collect(toList())))
+ .withSlidingWindowSize(slidingWindowSize)
+ .start();
+
+ waitForRegistration(targetServiceName, connector.connectorId(), 2, crankerRouter);
+
+ assertEventually(
+ () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(),
+ equalTo(preferredProtocols.get(0)));
+
return connector;
}
- public static void waitForRegistration(String targetServiceName, int slidingWindow, CrankerRouter... crankerRouters) {
- int attempts = 0;
+ public static void waitForRegistration(String targetServiceName, String connectorInstanceId, int slidingWindow, CrankerRouter... crankerRouters) {
final String serviceKey = targetServiceName.isEmpty() ? "*" : targetServiceName;
for (CrankerRouter crankerRouter : crankerRouters) {
- while (crankerRouter.collectInfo().services()
+ AssertUtils.assertEventually(() -> crankerRouter.collectInfo().services()
+ .stream()
+ .anyMatch(service -> service.route().equals(serviceKey)
+ && !service.connectors().isEmpty()
+ && service.connectors()
.stream()
- .noneMatch(service -> service.route().equals(serviceKey)
- && service.connectors().size() > 0
- && service.connectors().get(0).connections().size() >= slidingWindow)) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- if (attempts++ == 100) throw new RuntimeException("Failed to register " + targetServiceName);
- }
+ .anyMatch(connector -> connector.connectorInstanceID().equals(connectorInstanceId)
+ && connector.connections().size() >= slidingWindow)
+ ), is(true));
}
}
- public static CrankerConnector startConnector(String targetServiceName, MuServer target, int slidingWindowSize, MuServer... registrationRouters) {
- List uris = Stream.of(registrationRouters)
- .map(s -> registrationUri(s.uri()))
- .collect(toList());
- return CrankerConnectorBuilder.connector()
- .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
- .withTarget(target.uri())
- .withRoute(targetServiceName)
- .withRouterUris(RegistrationUriSuppliers.fixedUris(uris))
- .withSlidingWindowSize(slidingWindowSize)
- .start();
+ public static List preferredProtocols(RepetitionInfo repetitionInfo) {
+ final int currentRepetition = repetitionInfo.getCurrentRepetition();
+ switch (currentRepetition) {
+ case 1:
+ return List.of(CRANKER_PROTOCOL_1);
+ case 2:
+ return List.of(CRANKER_PROTOCOL_3);
+ default:
+ return List.of(CRANKER_PROTOCOL_3, CRANKER_PROTOCOL_1);
+ }
}
@AfterEach
diff --git a/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java b/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java
new file mode 100644
index 0000000..1e84353
--- /dev/null
+++ b/src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java
@@ -0,0 +1,98 @@
+package com.hsbc.cranker.connector;
+
+import io.muserver.MuHandler;
+import io.muserver.MuServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static io.muserver.MuServerBuilder.httpServer;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ConcurrentUploadTest extends BaseEndToEndTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ConcurrentUploadTest.class);
+
+ private volatile MuHandler handler = (request, response) -> false;
+
+ protected MuServer targetServer = httpServer()
+ .addHandler((request, response) -> handler.handle(request, response))
+ .start();
+
+ private CrankerConnector connector;
+
+ @BeforeEach
+ void setUp(RepetitionInfo repetitionInfo) {
+ connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri())))
+ .withRoute("*")
+ .withTarget(targetServer.uri())
+ .withProxyEventListener(new ProxyEventListener() {
+ @Override
+ public void onProxyError(HttpRequest request, Throwable error) {
+ log.warn("onProxyError, request=" + request, error);
+ }
+ })
+ .withComponentName("cranker-connector-unit-test")
+ .start();
+
+ waitForRegistration("*", connector.connectorId(),2, crankerRouter);
+ }
+
+ @AfterEach
+ public void stop() throws Exception {
+ if (connector != null) assertTrue(connector.stop(10, TimeUnit.SECONDS));
+ if (targetServer != null) targetServer.stop();
+ }
+
+ @RepeatedTest(3)
+ public void postLargeBody() throws InterruptedException {
+
+ handler = (request, response) -> {
+ response.status(200);
+ response.write(request.readBodyAsString());
+ return true;
+ };
+
+ Queue> responses = new ConcurrentLinkedQueue<>();
+ CountDownLatch countDownLatch = new CountDownLatch(10);
+
+ final String body = "c".repeat(10 * 1000);
+ for(int i = 0; i < 10; i++) {
+ final int finalI = i;
+ new Thread(() -> {
+ try {
+ HttpResponse resp = testClient.send(HttpRequest.newBuilder()
+ .method("POST", HttpRequest.BodyPublishers.ofString(body))
+ .uri(crankerServer.uri().resolve("/?task=" + finalI))
+ .build(), HttpResponse.BodyHandlers.ofString());
+ responses.add(resp);
+ countDownLatch.countDown();
+ } catch (Exception e) {
+ log.error("Concurrent request error", e);
+ responses.add(null);
+ }
+ }).start();
+ }
+
+ assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
+ assertEquals(10, responses.size());
+ for (HttpResponse response: responses) {
+ assertNotNull(response);
+ assertEquals(200, response.statusCode());
+ assertEquals(body, response.body());
+ }
+ }
+}
diff --git a/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java b/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java
index 6df1e75..03affbe 100644
--- a/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/ConnectRetryTest.java
@@ -1,22 +1,24 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
import io.muserver.Http2ConfigBuilder;
import io.muserver.MuServer;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import com.hsbc.cranker.mucranker.CrankerRouter;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration;
-import static com.hsbc.cranker.connector.BaseEndToEndTest.waitForRegistration;
+import static com.hsbc.cranker.connector.BaseEndToEndTest.*;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
import static io.muserver.MuServerBuilder.httpServer;
import static io.muserver.MuServerBuilder.httpsServer;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -26,7 +28,6 @@
import static scaffolding.Action.swallowException;
import static scaffolding.AssertUtils.assertEventually;
import static scaffolding.StringUtils.randomAsciiStringOfLength;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
public class ConnectRetryTest {
@@ -43,8 +44,8 @@ public void after() {
if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS));
}
- @Test
- public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() throws Exception {
+ @RepeatedTest(3)
+ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain(RepetitionInfo repetitionInfo) throws Exception {
this.targetServer = httpServer()
.addHandler((request, response) -> {
@@ -54,7 +55,7 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(400)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.router = httpsServer()
.addHandler(crankerRouter.createRegistrationHandler())
@@ -62,7 +63,7 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols(repetitionInfo), 2, router);
String body = randomAsciiStringOfLength(100);
HttpResponse resp = testClient.send(HttpRequest.newBuilder()
@@ -80,7 +81,9 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr
assertThat(connector.routers().get(0).currentUnsuccessfulConnectionAttempts(), greaterThan(0));
- this.crankerRouter = crankerRouter().withConnectorMaxWaitInMillis(4000).start();
+ this.crankerRouter = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
this.router = httpsServer()
.withHttpsPort(originalPort)
.addHandler(crankerRouter.createRegistrationHandler())
@@ -101,20 +104,22 @@ public void ifTheRouterStopsThenTheConnectorWillReconnectWhenItStartsAgain() thr
}
- @Test
- void connectionErrorListenerIsTriggeredOnWssConnectionFailed() throws InterruptedException, IOException {
+ @RepeatedTest(3)
+ void connectionErrorListenerIsTriggeredOnWssConnectionFailed(RepetitionInfo repetitionInfo) throws InterruptedException, IOException {
AtomicInteger exceptionCount = new AtomicInteger(0);
int slidingWindow = 2;
this.targetServer = httpServer()
.addHandler((request, response) -> {
- response.write(request.readBodyAsString());
+ final String text = request.readBodyAsString();
+ response.write(text);
return true;
})
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000).start();
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
this.router = httpsServer()
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
@@ -123,6 +128,7 @@ void connectionErrorListenerIsTriggeredOnWssConnectionFailed() throws Interrupte
.start();
this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withTarget(targetServer.uri())
.withRoute("*")
@@ -137,11 +143,11 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti
})
.start();
- waitForRegistration("*", slidingWindow, crankerRouter);
+ waitForRegistration("*", connector.connectorId(), slidingWindow, crankerRouter);
assertThat(exceptionCount.get(), equalTo(0));
- verifyHttpRequestWroking();
+ verifyHttpRequestWorking();
int originalPort = router.uri().getPort();
crankerRouter.stop();
@@ -152,7 +158,7 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti
assertEventually(exceptionCount::get, greaterThan(0));
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.router = httpsServer()
.withHttpsPort(originalPort)
@@ -161,12 +167,12 @@ public void onSocketConnectionError(RouterRegistration router, Throwable excepti
.addHandler(crankerRouter.createHttpHandler())
.start();
- waitForRegistration("*", slidingWindow, crankerRouter);
+ waitForRegistration("*", connector.connectorId(), slidingWindow, crankerRouter);
- verifyHttpRequestWroking();
+ verifyHttpRequestWorking();
}
- private void verifyHttpRequestWroking() throws IOException, InterruptedException {
+ private void verifyHttpRequestWorking() throws IOException, InterruptedException {
String body = "hello";
HttpResponse resp = testClient.send(HttpRequest.newBuilder()
.method("POST", HttpRequest.BodyPublishers.ofString(body))
diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java
new file mode 100644
index 0000000..c4f213b
--- /dev/null
+++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorBuilderTest.java
@@ -0,0 +1,123 @@
+package com.hsbc.cranker.connector;
+
+import com.hsbc.cranker.mucranker.CrankerRouter;
+import io.muserver.ContentTypes;
+import io.muserver.Http2ConfigBuilder;
+import io.muserver.Method;
+import io.muserver.MuServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols;
+import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
+import static io.muserver.MuServerBuilder.httpsServer;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static scaffolding.Action.swallowException;
+
+class CrankerConnectorBuilderTest {
+
+ private final HttpClient http2Client = HttpUtils.createHttpClientBuilder(true)
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+
+ private final HttpClient http1Client = HttpUtils.createHttpClientBuilder(true)
+ .version(HttpClient.Version.HTTP_1_1)
+ .build();
+
+ private CrankerRouter crankerRouter;
+ private MuServer targetServer;
+ private MuServer crankerServer;
+ private CrankerConnector connector;
+
+ @AfterEach
+ public void after() {
+ if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS));
+ if (targetServer != null) swallowException(targetServer::stop);
+ if (crankerServer != null) swallowException(crankerServer::stop);
+ if (crankerRouter != null) swallowException(crankerRouter::stop);
+ }
+
+ @RepeatedTest(3)
+ void allowSlash() {
+ CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello").build();
+ CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello_123").build();
+ CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello-123").build();
+ CrankerConnectorBuilder.connector().withTarget(URI.create("http://localhost:1234")).withRoute("hello/123").build();
+ }
+
+ @RepeatedTest(3)
+ void testMaxHeadersSize_normal(RepetitionInfo repetitionInfo) throws IOException, InterruptedException {
+
+ setupServerForMaxHeaders(40000, preferredProtocols(repetitionInfo));
+
+ final String bigHeader = "b".repeat(18000);
+
+ HttpResponse resp1 = http1Client.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/test"))
+ .header("big-header", bigHeader)
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertEquals(200, resp1.statusCode());
+ assertEquals(bigHeader, resp1.headers().firstValue("big-header").orElse("not exist"));
+
+ HttpResponse resp2 = http2Client.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/test"))
+ .header("big-header", bigHeader)
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertEquals(200, resp2.statusCode());
+ assertEquals(bigHeader, resp2.headers().firstValue("big-header").orElse("not exist"));
+ }
+
+ @RepeatedTest(3)
+ void testMaxHeadersSize_exception(RepetitionInfo repetitionInfo) throws IOException, InterruptedException {
+
+ setupServerForMaxHeaders(40000, preferredProtocols(repetitionInfo));
+
+ final String bigHeader = "b".repeat(58000); // test header size from 2000 to 20000
+
+ HttpResponse resp1 = http1Client.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/test"))
+ .header("big-header", bigHeader)
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertEquals(431, resp1.statusCode());
+
+ HttpResponse resp2 = http2Client.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/test"))
+ .header("big-header", bigHeader)
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertEquals(431, resp2.statusCode());
+ }
+
+ private void setupServerForMaxHeaders(int maxHeadersSize, List preferredProtocols) {
+ this.targetServer = httpsServer()
+ .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true))
+ .withMaxHeadersSize(maxHeadersSize)
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
+ response.contentType(ContentTypes.TEXT_PLAIN_UTF8);
+ response.headers().set("big-header", request.headers().get("big-header", "nothing"));
+ })
+ .start();
+
+ this.crankerRouter = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+
+ this.crankerServer = httpsServer()
+ .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
+ .withMaxHeadersSize(maxHeadersSize)
+ .addHandler(crankerRouter.createRegistrationHandler())
+ .addHandler(crankerRouter.createHttpHandler())
+ .start();
+
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols, 2, this.crankerServer);
+ }
+}
diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java
index fa52e98..58937ca 100644
--- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java
+++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorHttp2Test.java
@@ -1,24 +1,27 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
import io.muserver.Http2ConfigBuilder;
import io.muserver.Method;
import io.muserver.MuServer;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import com.hsbc.cranker.mucranker.CrankerRouter;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols;
import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
import static io.muserver.MuServerBuilder.httpsServer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static scaffolding.Action.swallowException;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
public class CrankerConnectorHttp2Test {
@@ -38,20 +41,18 @@ public void after() {
if (crankerRouter != null) swallowException(crankerRouter::stop);
}
- @Test
- void canWorkWithHttp2MicroServiceAndHttp1Cranker() throws IOException, InterruptedException {
+ @RepeatedTest(3)
+ void canWorkWithHttp2MicroServiceAndHttp1Cranker(RepetitionInfo repetitionInfo) throws IOException, InterruptedException {
System.getProperties().setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());
this.targetServer = httpsServer()
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true))
- .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
- response.write("hello world");
- })
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world"))
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -60,7 +61,7 @@ void canWorkWithHttp2MicroServiceAndHttp1Cranker() throws IOException, Interrupt
.addHandler(crankerRouter.createHttpHandler())
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, preferredProtocols(repetitionInfo),2, this.routerServer);
HttpResponse response = http2Client.send(HttpRequest.newBuilder()
.uri(this.routerServer.uri().resolve("/test"))
diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java
new file mode 100644
index 0000000..569f60d
--- /dev/null
+++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorListenerTest.java
@@ -0,0 +1,129 @@
+package com.hsbc.cranker.connector;
+
+
+import com.hsbc.cranker.mucranker.CrankerRouter;
+import io.muserver.Http2ConfigBuilder;
+import io.muserver.Method;
+import io.muserver.MuServer;
+import io.muserver.RouteHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
+
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static com.hsbc.cranker.connector.BaseEndToEndTest.preferredProtocols;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
+import static io.muserver.MuServerBuilder.httpsServer;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static scaffolding.Action.swallowException;
+
+public class CrankerConnectorListenerTest {
+
+ private final HttpClient httpClient = HttpUtils.createHttpClientBuilder(true)
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ private CrankerRouter crankerRouter;
+ private MuServer targetServer;
+ private MuServer routerServer;
+ private CrankerConnector connector;
+
+ @BeforeEach
+ public void before() {
+
+ this.crankerRouter = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+
+ this.routerServer = httpsServer()
+ .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
+ .addHandler(crankerRouter.createRegistrationHandler())
+ .addHandler(crankerRouter.createHttpHandler())
+ .start();
+ }
+
+ @AfterEach
+ public void after() {
+ if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS));
+ if (targetServer != null) swallowException(targetServer::stop);
+ if (routerServer != null) swallowException(routerServer::stop);
+ if (crankerRouter != null) swallowException(crankerRouter::stop);
+ }
+
+ @RepeatedTest(3)
+ void testProxyEventListenerInvoked(RepetitionInfo repetitionInfo) throws Exception {
+
+ final RouteHandler handler = (request, response, pathParams) -> {
+ StringBuilder bodyBuilder = new StringBuilder();
+ for (Map.Entry header : request.headers()) {
+ bodyBuilder.append(header.getKey()).append(":").append(header.getValue()).append("\n");
+ }
+ bodyBuilder.append("\n");
+ bodyBuilder.append(request.readBodyAsString());
+ response.status(200);
+ response.write(bodyBuilder.toString());
+ };
+
+ this.targetServer = httpsServer()
+ .withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true))
+ .addHandler(Method.GET, "/test", handler)
+ .addHandler(Method.POST, "/test", handler)
+ .start();
+
+ this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withTarget(targetServer.uri())
+ .withRoute("*")
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(new MuServer[]{this.routerServer})
+ .map(s -> BaseEndToEndTest.registrationUri(s.uri()))
+ .collect(toList())))
+ .withSlidingWindowSize(2)
+ .withProxyEventListener(new ProxyEventListener() {
+ @Override
+ public HttpRequest beforeProxyToTarget(HttpRequest request, HttpRequest.Builder requestBuilder) {
+ // add extra header
+ final String clientHeader = request.headers().firstValue("x-client-header").orElseGet(() -> "");
+ requestBuilder.header("x-connector-header", "connector-value_" + clientHeader);
+ return requestBuilder.build();
+ }
+ })
+ .start();
+
+ BaseEndToEndTest.waitForRegistration("*", connector.connectorId(), 2, crankerRouter);
+
+ // GET
+ HttpResponse response = httpClient.send(HttpRequest.newBuilder()
+ .header("x-client-header", "client-value")
+ .uri(this.routerServer.uri().resolve("/test"))
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertThat(response.statusCode(), is(200));
+ final String body = response.body();
+ assertThat(body, containsString("x-client-header:client-value\n"));
+ assertThat(body, containsString("x-connector-header:connector-value_client-value\n"));
+
+ // POST
+ HttpResponse response_2 = httpClient.send(HttpRequest.newBuilder()
+ .header("x-client-header", "client-value")
+ .uri(this.routerServer.uri().resolve("/test"))
+ .method("POST", HttpRequest.BodyPublishers.ofString("this is request body string"))
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertThat(response_2.statusCode(), is(200));
+ final String body_2 = response_2.body();
+ assertThat(body_2, containsString("x-client-header:client-value\n"));
+ assertThat(body_2, containsString("x-connector-header:connector-value_client-value\n"));
+ assertThat(body_2, containsString("this is request body string"));
+
+ }
+
+}
diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java
index 1e473a3..1aa9637 100644
--- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorStopTest.java
@@ -1,31 +1,37 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
import io.muserver.*;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.hsbc.cranker.mucranker.CrankerRouter;
+import scaffolding.AssertUtils;
-import java.io.PrintWriter;
+import java.io.*;
+import java.net.Socket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static com.hsbc.cranker.connector.BaseEndToEndTest.registrationUri;
-import static com.hsbc.cranker.connector.BaseEndToEndTest.startConnectorAndWaitForRegistration;
+import static com.hsbc.cranker.connector.BaseEndToEndTest.*;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
+import static io.muserver.MuServerBuilder.httpServer;
import static io.muserver.MuServerBuilder.httpsServer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.*;
import static scaffolding.Action.swallowException;
import static scaffolding.AssertUtils.assertEventually;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
public class CrankerConnectorStopTest {
@@ -39,8 +45,8 @@ public class CrankerConnectorStopTest {
private MuServer routerServer;
private CrankerConnector connector;
- private AtomicInteger counter = new AtomicInteger(0);
- private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, r -> new Thread(r, "test-pool" + counter.incrementAndGet()));
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, r -> new Thread(r, "test-pool" + counter.incrementAndGet()));
@AfterEach
public void after() {
@@ -48,11 +54,11 @@ public void after() {
if (targetServer != null) swallowException(targetServer::stop);
if (routerServer != null) swallowException(routerServer::stop);
if (crankerRouter != null) swallowException(crankerRouter::stop);
- if (executorService != null) swallowException(executorService::shutdownNow);
+ swallowException(executorService::shutdownNow);
}
- @Test
- void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionException, InterruptedException, TimeoutException {
+ @RepeatedTest(3)
+ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout(RepetitionInfo repetitionInfo) {
AtomicInteger serverCounter = new AtomicInteger(0);
AtomicInteger clientCounter = new AtomicInteger(0);
@@ -71,7 +77,7 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -80,7 +86,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx
.addHandler(crankerRouter.createHttpHandler())
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer,
+ preferredProtocols(repetitionInfo),2, this.routerServer);
int requestCount = 3;
for (int i = 0; i < requestCount; i++) {
@@ -105,8 +112,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout() throws ExecutionEx
assertEventually(clientCounter::get, equalTo(requestCount));
}
- @Test
- void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws ExecutionException, InterruptedException, TimeoutException {
+ @RepeatedTest(3)
+ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery(RepetitionInfo repetitionInfo) {
AtomicInteger serverCounter = new AtomicInteger(0);
AtomicInteger clientCounter = new AtomicInteger(0);
@@ -120,14 +127,14 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E
for (int i = 0; i < 12; i++) {
writer.print(i + ",");
writer.flush();
- Thread.sleep(1000L);
+ Thread.sleep(100L);
}
}
})
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -136,8 +143,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E
.addHandler(crankerRouter.createHttpHandler())
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer);
-
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer,
+ preferredProtocols(repetitionInfo),2, this.routerServer);
executorService.submit(() -> {
try {
@@ -162,8 +169,8 @@ void requestOnTheFlyShouldCompleteSuccessfullyWithinTimeout_LongQuery() throws E
}
- @Test
- void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, InterruptedException, TimeoutException {
+ @RepeatedTest(3)
+ void getTimeoutExceptionIfExceedTimeout(RepetitionInfo repetitionInfo) {
AtomicInteger serverCounter = new AtomicInteger(0);
AtomicInteger serverExceptionCounter = new AtomicInteger(0);
@@ -180,7 +187,6 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted
asyncHandle.write(ByteBuffer.wrap("hello world".getBytes()));
asyncHandle.complete();
} catch (Throwable throwable) {
- System.out.println(throwable);
serverExceptionCounter.incrementAndGet();
}
}, 3, TimeUnit.SECONDS);
@@ -188,7 +194,7 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -197,7 +203,8 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted
.addHandler(crankerRouter.createHttpHandler())
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, this.routerServer);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer,
+ preferredProtocols(repetitionInfo),2, this.routerServer);
int requestCount = 3;
for (int i = 0; i < requestCount; i++) {
@@ -221,18 +228,16 @@ void getTimeoutExceptionIfExceedTimeout() throws ExecutionException, Interrupted
this.targetServer.stop();
}
- @Test
- public void returnFalseWhenCallingStopBeforeCallingStart() {
+ @RepeatedTest(3)
+ public void throwIllegalStateExceptionWhenCallingStopBeforeCallingStart(RepetitionInfo repetitionInfo) {
this.targetServer = httpsServer()
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true))
- .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
- response.write("hello world");
- })
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world"))
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -242,6 +247,7 @@ public void returnFalseWhenCallingStopBeforeCallingStart() {
.start();
this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withRouterUris(RegistrationUriSuppliers.fixedUris(URI.create("wss://localhost:1234")))
.withRoute("*")
@@ -249,23 +255,21 @@ public void returnFalseWhenCallingStopBeforeCallingStart() {
.withComponentName("cranker-connector-unit-test")
.build(); // not start
- // call before start
- assertFalse(() -> connector.stop(1, TimeUnit.SECONDS));
+
+ assertFalse(connector.stop(1, TimeUnit.SECONDS));
}
- @Test
- public void returnFalseWhenCallingStopMultipleTime() {
+ @RepeatedTest(3)
+ public void throwIllegalStateExceptionWhenCallingStopMultipleTime(RepetitionInfo repetitionInfo) {
this.targetServer = httpsServer()
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(true))
- .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
- response.write("hello world");
- })
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> response.write("hello world"))
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000)
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
this.routerServer = httpsServer()
@@ -275,6 +279,7 @@ public void returnFalseWhenCallingStopMultipleTime() {
.start();
this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri())))
.withRoute("*")
@@ -283,10 +288,162 @@ public void returnFalseWhenCallingStopMultipleTime() {
.start();
// call stop the first time
- connector.stop(10, TimeUnit.SECONDS);
+ assertTrue(connector.stop(5, TimeUnit.SECONDS));
+
+ // return false for the second time
+ assertFalse(connector.stop(1, TimeUnit.SECONDS));
+ }
+
+
+ @RepeatedTest(3)
+ public void clientDropEarlyCanNotifyMicroservice(RepetitionInfo repetitionInfo) throws IOException {
+
+ final ResponseInfo[] responseInfo = new ResponseInfo[1];
+ final AtomicBoolean serverReceived = new AtomicBoolean(false);
+
+ this.targetServer = httpServer()
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
+ // no response, just holding the tcp connection until client drop
+ final AsyncHandle asyncHandle = request.handleAsync();
+ asyncHandle.addResponseCompleteHandler(info -> {
+ log.info("http server response complete, info={}", info);
+ responseInfo[0] = info;
+ });
+ serverReceived.set(true);
+ asyncHandle.write(ByteBuffer.wrap("hello1".getBytes()));
+ })
+ .start();
+
+ log.info("http server started at {}", this.targetServer.uri());
+
+ this.crankerRouter = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+
+ this.routerServer = httpServer()
+ .withHttpPort(0)
+ .addHandler(crankerRouter.createRegistrationHandler())
+ .addHandler(crankerRouter.createHttpHandler())
+ .start();
+
+ this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri())))
+ .withRoute("*")
+ .withTarget(targetServer.uri())
+ .withComponentName("cranker-connector-unit-test")
+ .start();
+
+ waitForRegistration("*", connector.connectorId(), 2, crankerRouter);
- // second time will throw exception
- assertFalse(() -> connector.stop(1, TimeUnit.SECONDS));
+
+ String host = this.routerServer.uri().getHost();
+ int port = this.routerServer.uri().getPort();
+ String path = "/test";
+
+ try (Socket socket = new Socket(host, port);
+ OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)) {
+ writer.write("GET " + path + " HTTP/1.1\r\n");
+ writer.write("Host: " + host + "\r\n");
+ writer.write("User-Agent: Mozilla/5.0\r\n");
+ writer.write("Connection: Close\r\n");
+ writer.write("\r\n");
+ writer.flush();
+
+ assertEventually(serverReceived::get, is(true));
+
+ // client sending request and close it early
+ writer.close();
+ socket.close();
+
+ AssertUtils.assertEventually(() -> responseInfo[0] != null && !responseInfo[0].completedSuccessfully(), is(true), 10, 100);
+ }
+ }
+
+ @RepeatedTest(3)
+ public void connectorStopCanNotifyServiceAndClient(RepetitionInfo repetitionInfo) {
+
+ final ResponseInfo[] responseInfo = new ResponseInfo[1];
+ final AtomicBoolean serverReceived = new AtomicBoolean(false);
+
+ this.targetServer = httpServer()
+ .addHandler(Method.GET, "/test", (request, response, pathParams) -> {
+ // no response, just holding the tcp connection until client drop
+ final AsyncHandle asyncHandle = request.handleAsync();
+ asyncHandle.addResponseCompleteHandler(info -> {
+ log.info("http server response complete, info={}", info);
+ responseInfo[0] = info;
+ });
+ serverReceived.set(true);
+ asyncHandle.write(ByteBuffer.wrap("hello1".getBytes()));
+ })
+ .start();
+
+ log.info("http server started at {}", this.targetServer.uri());
+
+ this.crankerRouter = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+
+ this.routerServer = httpServer()
+ .withHttpPort(0)
+ .addHandler(crankerRouter.createRegistrationHandler())
+ .addHandler(crankerRouter.createHttpHandler())
+ .start();
+
+ this.connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(routerServer.uri())))
+ .withRoute("*")
+ .withTarget(targetServer.uri())
+ .withComponentName("cranker-connector-unit-test")
+ .start();
+
+ waitForRegistration("*", connector.connectorId(), 2, crankerRouter);
+
+ String host = this.routerServer.uri().getHost();
+ int port = this.routerServer.uri().getPort();
+ String path = "/test";
+
+ // start a client and wait for exception
+ AtomicBoolean isClientCompleted = new AtomicBoolean(false);
+ new Thread(() -> {
+ try (Socket socket = new Socket(host, port);
+ OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+ writer.write("GET " + path + " HTTP/1.1\r\n");
+ writer.write("Host: " + host + "\r\n");
+ writer.write("User-Agent: Mozilla/5.0\r\n");
+ writer.write("Connection: Close\r\n");
+ writer.write("\r\n");
+ writer.flush();
+
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ log.info(line);
+ }
+ isClientCompleted.set(true);
+ } catch (Exception exception) {
+ isClientCompleted.set(true);
+ }
+ }).start();
+
+ // assert request already arrived server
+ assertEventually(serverReceived::get, is(true));
+
+ // close connector while request still inflight
+ log.info("connector stopping");
+ final boolean isSuccess = connector.stop(1, TimeUnit.SECONDS);
+ log.info("connector stopped, isSuccess={}", isSuccess);
+ assertThat(isSuccess, is(false));
+
+ // microservice and client both aware
+ AssertUtils.assertEventually(() -> responseInfo[0] != null && !responseInfo[0].completedSuccessfully(), is(true), 10, 100);
+ AssertUtils.assertEventually(isClientCompleted::get, is(true));
}
+
}
diff --git a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java
index 1c0c495..2f7e33e 100644
--- a/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/CrankerConnectorTest.java
@@ -7,18 +7,23 @@
import io.muserver.handlers.ResourceHandlerBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import scaffolding.ByteUtils;
+import scaffolding.StringUtils;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.muserver.MuServerBuilder.httpServer;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -33,26 +38,34 @@ public class CrankerConnectorTest extends BaseEndToEndTest {
.addHandler((request, response) -> handler.handle(request, response))
.start();
- private final CrankerConnector connector = CrankerConnectorBuilder.connector()
- .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
- .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri())))
- .withRoute("*")
- .withTarget(targetServer.uri())
- .withComponentName("cranker-connector-unit-test")
- .start();
+ private CrankerConnector connector;
@BeforeEach
- public void waitForRegistration() {
- waitForRegistration("*", 2, crankerRouter);
+ void setUp(RepetitionInfo repetitionInfo) {
+ final List preferredProtocols = preferredProtocols(repetitionInfo);
+ connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols)
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri())))
+ .withRoute("*")
+ .withTarget(targetServer.uri())
+ .withComponentName("cranker-connector-unit-test")
+ .start();
+
+ waitForRegistration("*", connector.connectorId(), 2, crankerRouter);
+
+ assertEventually(
+ () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(),
+ equalTo(preferredProtocols.get(0)));
}
@AfterEach
public void stop() throws Exception {
- connector.stop(10, TimeUnit.SECONDS);
+ assertThat(connector.stop(10, TimeUnit.SECONDS), is(true));
targetServer.stop();
}
- @Test
+ @RepeatedTest(3)
public void postingBodiesWorks() throws Exception {
final String[] contentLength = new String[1];
handler = (request, response) -> {
@@ -74,8 +87,9 @@ public void postingBodiesWorks() throws Exception {
assertEventually(() -> contentLength[0], equalTo("100000"));
}
- @Test
+ @RepeatedTest(3)
public void getRequestsWork() throws Exception {
+
handler = (request, response) -> {
response.contentType(ContentTypes.TEXT_PLAIN_UTF8);
response.sendChunk("This ");
@@ -94,7 +108,47 @@ public void getRequestsWork() throws Exception {
}
}
- @Test
+ @RepeatedTest(3)
+ public void getRequestsWork_largeResponse() throws Exception {
+
+ final String text = StringUtils.randomStringOfLength(100000);
+ handler = (request, response) -> {
+ String agent = request.headers().get("user-agent");
+ response.contentType(ContentTypes.TEXT_PLAIN_UTF8);
+ response.write(text + agent);
+ return true;
+ };
+
+ for (int i = 0; i < 10; i++) {
+ HttpResponse resp = testClient.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/something"))
+ .header("user-agent", "cranker-connector-test-client-" + i)
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertThat(resp.statusCode(), is(200));
+ assertEquals(text + "cranker-connector-test-client-" + i, resp.body());
+ }
+ }
+
+
+ @RepeatedTest(3)
+ public void forwardHeaderSetCorrectly() throws Exception {
+
+ handler = (request, response) -> {
+ response.write(request.headers().get("forwarded"));
+ return true;
+ };
+
+ HttpResponse resp = testClient.send(HttpRequest.newBuilder()
+ .uri(crankerServer.uri().resolve("/something"))
+ .header("user-agent", "cranker-connector-test-client")
+ .build(), HttpResponse.BodyHandlers.ofString());
+ assertThat(resp.body(), containsString("by="));
+ assertThat(resp.body(), containsString("for="));
+ assertThat(resp.body(), containsString("host="));
+ assertThat(resp.body(), containsString("proto="));
+ }
+
+ @RepeatedTest(3)
public void ifTheTargetReturnsGzippedContentThenItIsProxiedCompressed() throws Exception {
String body = randomAsciiStringOfLength(100000);
handler = (request, response) -> {
@@ -111,7 +165,7 @@ public void ifTheTargetReturnsGzippedContentThenItIsProxiedCompressed() throws E
assertEquals(body, decompressedBody);
}
- @Test
+ @RepeatedTest(3)
public void largeResponsesWork() throws Exception {
String body = Files.readString(Path.of("src/test/resources/web/large-file.txt"));
handler = ResourceHandlerBuilder.classpathHandler("/web").build();
@@ -125,7 +179,7 @@ public void largeResponsesWork() throws Exception {
assertEquals(body, actual);
}
- @Test
+ @RepeatedTest(3)
public void toStringReturnsUsefulInfo() {
assertThat(connector.toString(), startsWith("CrankerConnector (" + connector.connectorId() + ") registered to: [RouterRegistration"));
}
diff --git a/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java b/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java
index e49bdc3..b7b261a 100644
--- a/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/DualRouterTest.java
@@ -1,23 +1,33 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
import io.muserver.MuHandler;
import io.muserver.MuServer;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import scaffolding.StringUtils;
-import com.hsbc.cranker.mucranker.CrankerRouter;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
import static io.muserver.ContextHandlerBuilder.context;
import static io.muserver.MuServerBuilder.httpServer;
+import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static scaffolding.Action.swallowException;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
+import static scaffolding.AssertUtils.assertEventually;
+
+
public class DualRouterTest extends BaseEndToEndTest {
@@ -31,20 +41,97 @@ public class DualRouterTest extends BaseEndToEndTest {
}
};
- private static final MuServer targetA1 = httpServer().addHandler(context("a").addHandler(textHandler)).start();
- private static final MuServer targetA2 = httpServer().addHandler(context("a").addHandler(textHandler)).start();
- private static final MuServer targetB = httpServer().addHandler(context("b").addHandler(textHandler)).start();
- private static final MuServer targetCatchAll = httpServer().addHandler(textHandler).start();
- private static final CrankerRouter router1 = crankerRouter().start();
- private static final MuServer routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start();
- private static final CrankerRouter router2 = crankerRouter().start();
- private static final MuServer routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start();
- private static final CrankerConnector connector1 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "a", targetA1, 2, routerServer1, routerServer2);
- private static final CrankerConnector connector2 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "b", targetB, 2, routerServer1, routerServer2);
- private static final CrankerConnector connector3 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "a", targetA2, 2, routerServer1, routerServer2);
- private static final CrankerConnector connector4 = BaseEndToEndTest.startConnectorAndWaitForRegistration(router1, "*", targetCatchAll, 2, routerServer1, routerServer2);
-
- @Test
+ private MuServer targetA1;
+ private MuServer targetA2;
+ private MuServer targetB;
+ private MuServer targetCatchAll;
+ private CrankerRouter router1;
+ private MuServer routerServer1;
+ private CrankerRouter router2;
+ private MuServer routerServer2;
+ private CrankerConnector connector1;
+ private CrankerConnector connector2;
+ private CrankerConnector connector3;
+ private CrankerConnector connector4;
+
+ private static CrankerConnector startConnector(String targetServiceName,
+ MuServer target,
+ List preferredProtocols,
+ int slidingWindowSize,
+ MuServer... registrationRouters) {
+ CrankerConnector connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols)
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withTarget(target.uri())
+ .withRoute(targetServiceName)
+ .withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(registrationRouters)
+ .map(s -> registrationUri(s.uri()))
+ .collect(toList())))
+ .withSlidingWindowSize(slidingWindowSize)
+ .start();
+
+ assertEventually(
+ () -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(),
+ equalTo(preferredProtocols.get(0)));
+
+ return connector;
+ }
+
+
+ @BeforeEach
+ void setUp(RepetitionInfo repetitionInfo) {
+ final List preferredProtocols = preferredProtocols(repetitionInfo);
+
+ targetA1 = httpServer().addHandler(context("a").addHandler(textHandler)).start();
+ targetA2 = httpServer().addHandler(context("a").addHandler(textHandler)).start();
+ targetB = httpServer().addHandler(context("b").addHandler(textHandler)).start();
+ targetCatchAll = httpServer().addHandler(textHandler).start();
+
+ router1 = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+ routerServer1 = httpServer()
+ .addHandler(router1.createRegistrationHandler())
+ .addHandler(router1.createHttpHandler()).start();
+
+ router2 = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+ routerServer2 = httpServer()
+ .addHandler(router2.createRegistrationHandler())
+ .addHandler(router2.createHttpHandler()).start();
+
+ connector1 = startConnector("a", targetA1,preferredProtocols, 2, routerServer1, routerServer2);
+ waitForRegistration("a", connector1.connectorId(), 2, router1, router2);
+
+ connector2 = startConnector("b", targetB, preferredProtocols,2, routerServer1, routerServer2);
+ waitForRegistration("b", connector2.connectorId(), 2, router1, router2);
+
+ connector3 = startConnector("a", targetA2, preferredProtocols,2, routerServer1, routerServer2);
+ waitForRegistration("a", connector3.connectorId(), 2, router1, router2);
+
+ connector4 = startConnector("*", targetCatchAll, preferredProtocols,2, routerServer1, routerServer2);
+ waitForRegistration("*", connector4.connectorId(), 2, router1, router2);
+
+ }
+
+ @AfterEach
+ public void stop() {
+ CrankerConnector[] connectors = {connector1, connector2, connector3, connector4};
+ for (CrankerConnector connector : connectors) {
+ swallowException(() -> connector.stop(30, TimeUnit.SECONDS));
+ }
+ swallowException(routerServer1::stop);
+ swallowException(routerServer2::stop);
+ swallowException(targetA1::stop);
+ swallowException(targetA2::stop);
+ swallowException(targetB::stop);
+ swallowException(targetCatchAll::stop);
+ swallowException(router1::stop);
+ swallowException(router2::stop);
+ }
+
+ @RepeatedTest(3)
public void canMakeGETRequestsToBothConnectorApp() throws Exception {
assertGetLargeHtmlWorks(routerServer1, "/a");
assertGetLargeHtmlWorks(routerServer1, "/b");
@@ -54,9 +141,9 @@ public void canMakeGETRequestsToBothConnectorApp() throws Exception {
assertGetLargeHtmlWorks(routerServer2, "");
}
- @Test
+ @RepeatedTest(3)
public void canMakeRequestWhenOneConnectorAppIsDown() throws Exception {
- connector1.stop(1, TimeUnit.MINUTES);
+ assertThat(connector1.stop(1, TimeUnit.MINUTES), is(true));
assertGetLargeHtmlWorks(routerServer1, "/a");
assertGetLargeHtmlWorks(routerServer1, "/b");
assertGetLargeHtmlWorks(routerServer1, "");
@@ -79,20 +166,4 @@ private void assertGetLargeHtmlWorks(MuServer routerServer, String prefix) throw
assertThat(resp.body(), is(TEXT));
}
- @AfterAll
- public static void stop() {
- CrankerConnector[] connectors = {connector1, connector2, connector3, connector4};
- for (CrankerConnector connector : connectors) {
- swallowException(() -> connector.stop(30, TimeUnit.SECONDS));
- }
- swallowException(routerServer1::stop);
- swallowException(routerServer2::stop);
- swallowException(targetA1::stop);
- swallowException(targetA2::stop);
- swallowException(targetB::stop);
- swallowException(targetCatchAll::stop);
- swallowException(router1::stop);
- swallowException(router2::stop);
- }
-
}
diff --git a/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java b/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java
index 6140cf1..f24dfc2 100644
--- a/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/RouterSupplierTest.java
@@ -1,24 +1,21 @@
package com.hsbc.cranker.connector;
+import com.hsbc.cranker.mucranker.CrankerRouter;
import io.muserver.Method;
import io.muserver.MuServer;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import com.hsbc.cranker.mucranker.CrankerRouter;
+import io.muserver.SsePublisher;
+import org.junit.jupiter.api.*;
+import scaffolding.SseTestClient;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
import static io.muserver.ContextHandlerBuilder.context;
import static io.muserver.MuServerBuilder.httpServer;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -26,27 +23,27 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static scaffolding.Action.swallowException;
import static scaffolding.AssertUtils.assertEventually;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
public class RouterSupplierTest extends BaseEndToEndTest {
private static final String route = "my-service";
- private static final MuServer target = httpServer().addHandler(context(route).addHandler(Method.GET, "/hello", (request, response, pathParams) -> response.write("Hello from target"))).start();
- private static final CrankerRouter router1 = crankerRouter().start();
- private static final MuServer routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start();
- private static final CrankerRouter router2 = crankerRouter().start();
- private static final MuServer routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start();
+ private MuServer target;
+ private CrankerRouter router1;
+ private MuServer routerServer1;
+ private CrankerRouter router2;
+ private MuServer routerServer2;
- @Test
- public void dnsLookupCanWorkForLocalhost() throws Exception {
+ @RepeatedTest(3)
+ public void dnsLookupCanWorkForLocalhost(RepetitionInfo repetitionInfo) throws Exception {
CrankerConnector connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
.withTarget(target.uri())
.withRoute(route)
.withRouterLookupByDNS(registrationUri(routerServer1), registrationUri(routerServer2))
.start();
- waitForRegistration(route, 2, router1);
- waitForRegistration(route, 2, router2);
+ waitForRegistration(route, connector.connectorId(), 2, router1);
+ waitForRegistration(route, connector.connectorId(), 2, router2);
for (URI routerUri : List.of(routerServer1.uri(), routerServer2.uri())) {
for (int i = 0; i < 4; i++) { // when resolving localhost, it may get two URLs (an IP4 and IP6), so just do some extra calls to make sure all are hit at least once
assertGetWorks(routerUri);
@@ -55,67 +52,136 @@ public void dnsLookupCanWorkForLocalhost() throws Exception {
assertStoppable(connector);
}
- RouterEventListener.ChangeData changeData;
- @Test
- public void routersCanBeDynamicallyAddedAndRemoved() throws Exception {
+ @RepeatedTest(3)
+ public void routersCanBeDynamicallyAddedAndRemoved(RepetitionInfo repetitionInfo) throws Exception {
+ final AtomicReference changeData = new AtomicReference<>();
URI reg1 = registrationUri(routerServer1);
URI reg2 = registrationUri(routerServer2);
List routerUris = new ArrayList<>(List.of(reg1, reg2));
+ final List preferredProtocols = preferredProtocols(repetitionInfo);
+ final int expectErrorCode = preferredProtocols.get(0).startsWith("cranker_1.0") ? 503 : 404;
CrankerConnectorImpl connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols)
+ .withSlidingWindowSize(2)
.withTarget(target.uri())
.withRoute(route)
.withRouterUris(() -> routerUris)
.withRouterRegistrationListener(new RouterEventListener() {
@Override
public void onRegistrationChanged(ChangeData data) {
- changeData = data;
+ changeData.set(data);
}
})
.start();
- waitForRegistration(route, 2, router1);
- waitForRegistration(route, 2, router2);
+ waitForRegistration(route, connector.connectorId(), 2, router1);
+ waitForRegistration(route, connector.connectorId(), 2, router2);
- assertThat(changeData.added(), hasSize(2));
- assertThat(changeData.removed(), hasSize(0));
- assertThat(changeData.unchanged(), hasSize(0));
+ assertThat(changeData.get().added(), hasSize(2));
+ assertThat(changeData.get().removed(), hasSize(0));
+ assertThat(changeData.get().unchanged(), hasSize(0));
+
+ assertGetWorks(routerServer1.uri());
+ assertGetWorks(routerServer2.uri());
+
+ final List registrations = connector.routers();
+ assertThat(registrations.size(), is(2));
+ assertThat(registrations.get(0).idleSocketSize(), is(2));
+ assertThat(registrations.get(1).idleSocketSize(), is(2));
routerUris.remove(reg2);
connector.updateRoutersAsync().get();
- assertThat(changeData.added(), hasSize(0));
- assertThat(changeData.removed(), hasSize(1));
- assertThat(changeData.removed().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/")));
- assertThat(changeData.unchanged(), hasSize(1));
- assertThat(changeData.unchanged().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/")));
+ assertThat(changeData.get().added(), hasSize(0));
+ assertThat(changeData.get().removed(), hasSize(1));
+ assertThat(changeData.get().removed().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/")));
+ assertThat(changeData.get().unchanged(), hasSize(1));
+ assertThat(changeData.get().unchanged().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/")));
assertGetWorks(routerServer1.uri());
- assertGetDoesNotWork(routerServer2.uri());
+ assertGetNotWorks(routerServer2.uri(), expectErrorCode);
+
+ final List registrationsAfterRemove2 = connector.routers();
+ assertThat(registrationsAfterRemove2.size(), is(1));
+ assertThat(registrationsAfterRemove2.get(0).idleSocketSize(), is(2));
routerUris.remove(reg1);
routerUris.add(reg2);
connector.updateRoutersAsync().get();
- assertThat(changeData.added(), hasSize(1));
- assertThat(changeData.added().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/")));
- assertThat(changeData.removed(), hasSize(1));
- assertThat(changeData.removed().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/")));
- assertThat(changeData.unchanged(), hasSize(0));
+ assertThat(changeData.get().added(), hasSize(1));
+ assertThat(changeData.get().added().get(0).registrationUri().resolve("/"), equalTo(reg2.resolve("/")));
+ assertThat(changeData.get().removed(), hasSize(1));
+ assertThat(changeData.get().removed().get(0).registrationUri().resolve("/"), equalTo(reg1.resolve("/")));
+ assertThat(changeData.get().unchanged(), hasSize(0));
- assertGetDoesNotWork(routerServer1.uri());
- waitForRegistration(route, 2, router2);
+ assertGetNotWorks(routerServer1.uri(), expectErrorCode);
+ waitForRegistration(route, connector.connectorId(), 2, router2);
assertGetWorks(routerServer2.uri());
assertStoppable(connector);
}
- @Test
+ @RepeatedTest(3)
+ public void routersCanBeUpdatedWhenUpdateRouteTaskTimeout(RepetitionInfo repetitionInfo) throws Exception {
+ final AtomicReference changeData = new AtomicReference<>();
+ URI reg1 = registrationUri(routerServer1);
+ URI reg2 = registrationUri(routerServer2);
+ List routerUris = new ArrayList<>(List.of(reg1, reg2));
+ CrankerConnectorImpl connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
+ .withTarget(target.uri())
+ .withRoute(route)
+ .withRouterUris(() -> routerUris)
+ .withRouterRegistrationListener(new RouterEventListener() {
+ @Override
+ public void onRegistrationChanged(ChangeData data) {
+ changeData.set(data);
+ }
+ })
+ .withRouterUpdateInterval(500, TimeUnit.MILLISECONDS)
+ .start();
+
+ waitForRegistration(route, connector.connectorId(),1, router1);
+ waitForRegistration(route, connector.connectorId(), 1, router2);
+
+ assertThat(changeData.get().added(), hasSize(2));
+ assertThat(changeData.get().removed(), hasSize(0));
+ assertThat(changeData.get().unchanged(), hasSize(0));
+
+ SseTestClient client = SseTestClient.startSse(routerServer2.uri().resolve("/my-service/sse/counter"));
+ routerUris.remove(reg2);
+ Thread.sleep(1000);
+
+ assertThat(changeData.get().added(), hasSize(0));
+ assertThat(changeData.get().removed(), hasSize(1));
+ assertThat(changeData.get().unchanged(), hasSize(1));
+
+ // clean it
+ changeData.set(new RouterEventListener.ChangeData(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
+ routerUris.remove(reg1);
+ routerUris.add(reg2);
+
+ // reach timeout 500ms
+ Thread.sleep(1000);
+
+ // routers can be updated
+ assertThat(changeData.get().added(), hasSize(1));
+ assertThat(changeData.get().removed(), hasSize(1));
+ assertThat(changeData.get().unchanged(), hasSize(0));
+
+ assertStoppable(connector);
+ client.stop();
+ }
+
+ @RepeatedTest(3)
@Disabled("Takes a minute to run so not normally run as part of build")
- public void nonExistantDomainsReturnErrorsToListener() throws Exception {
+ public void nonExistentDomainsReturnErrorsToListener(RepetitionInfo repetitionInfo) throws Exception {
AtomicReference received = new AtomicReference<>();
AtomicBoolean useNonExistantDomain = new AtomicBoolean(false);
var connector = (CrankerConnectorImpl) CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(preferredProtocols(repetitionInfo))
.withTarget(target.uri())
.withRoute(route)
.withRouterUris(() -> {
@@ -123,7 +189,7 @@ public void nonExistantDomainsReturnErrorsToListener() throws Exception {
// can't have an exception on the first lookup because that will get bubbled from .start()
return Set.of(URI.create("ws://localhost:9000"));
} else {
- return RegistrationUriSuppliers.dnsLookup(URI.create("ws://" + UUID.randomUUID() + ".example.org:9000")).get();
+ return RegistrationUriSuppliers.dnsLookup(URI.create("ws://" + UUID.randomUUID() + ".example.hsbc:9000")).get();
}
})
.withRouterRegistrationListener(new RouterEventListener() {
@@ -142,7 +208,6 @@ public void onRouterDnsLookupError(Throwable error) {
}
-
private void assertGetWorks(URI routerUri) throws java.io.IOException, InterruptedException {
HttpResponse resp = testClient.send(HttpRequest.newBuilder()
.uri(routerUri.resolve("/" + route + "/hello"))
@@ -151,13 +216,13 @@ private void assertGetWorks(URI routerUri) throws java.io.IOException, Interrupt
assertEquals("Hello from target", resp.body());
}
- private void assertGetDoesNotWork(URI routerUri) {
+ private void assertGetNotWorks(URI routerUri, int expectedStatusCode) {
assertEventually(() -> {
HttpResponse resp = testClient.send(HttpRequest.newBuilder()
.uri(routerUri.resolve("/" + route + "/hello"))
.build(), HttpResponse.BodyHandlers.ofString());
return resp.statusCode();
- }, equalTo(503));
+ }, equalTo(expectedStatusCode));
}
@@ -165,8 +230,36 @@ private void assertStoppable(CrankerConnector connector) {
Assertions.assertDoesNotThrow(() -> connector.stop(30, TimeUnit.SECONDS));
}
- @AfterAll
- public static void stop() {
+ @BeforeEach
+ public void start() {
+ this.target = httpServer()
+ .addHandler(context(route).addHandler(Method.GET, "/hello", (request, response, pathParams) -> response.write("Hello from target")))
+ .addHandler(Method.GET, "/my-service/sse/counter", (request, response, pathParams) -> {
+ SsePublisher publisher = SsePublisher.start(request, response);
+ for (int i1 = 0; i1 < 1000; i1++) {
+ try {
+ publisher.send("Number " + i1);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ // The user has probably disconnected so stopping
+ break;
+ }
+ }
+ publisher.close();
+ })
+ .start();
+ this.router1 = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+ this.routerServer1 = httpServer().addHandler(router1.createRegistrationHandler()).addHandler(router1.createHttpHandler()).start();
+ this.router2 = crankerRouter()
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
+ this.routerServer2 = httpServer().addHandler(router2.createRegistrationHandler()).addHandler(router2.createHttpHandler()).start();
+ }
+
+ @AfterEach
+ public void stop() {
swallowException(routerServer1::stop);
swallowException(routerServer2::stop);
swallowException(target::stop);
diff --git a/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java b/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java
index dba75cb..822e2de 100644
--- a/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java
+++ b/src/test/java/com/hsbc/cranker/connector/ServerSentEventTest.java
@@ -5,18 +5,21 @@
import io.muserver.MuServer;
import io.muserver.SsePublisher;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import org.junit.jupiter.api.Test;
import scaffolding.SseTestClient;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
import static io.muserver.MuServerBuilder.httpServer;
import static io.muserver.MuServerBuilder.httpsServer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static scaffolding.Action.swallowException;
-import static com.hsbc.cranker.mucranker.CrankerRouterBuilder.crankerRouter;
public class ServerSentEventTest extends BaseEndToEndTest {
@@ -33,8 +36,8 @@ public void after() {
if (connector != null) swallowException(() -> connector.stop(10, TimeUnit.SECONDS));
}
- @Test
- public void MuServer_NormalSseTest() throws Exception {
+ @RepeatedTest(3)
+ public void MuServer_NormalSseTest(RepetitionInfo repetitionInfo) throws Exception {
this.targetServer = httpServer()
.addHandler(Method.GET, "/sse/counter", (request, response, pathParams) -> {
@@ -47,7 +50,8 @@ public void MuServer_NormalSseTest() throws Exception {
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000).start();
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
this.router = httpsServer()
.addHandler(crankerRouter.createRegistrationHandler())
@@ -55,7 +59,8 @@ public void MuServer_NormalSseTest() throws Exception {
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer,
+ preferredProtocols(repetitionInfo),2, router);
this.client = SseTestClient.startSse(router.uri().resolve("/sse/counter"));
this.client.waitUntilClose(5, TimeUnit.SECONDS);
@@ -69,7 +74,7 @@ public void MuServer_NormalSseTest() throws Exception {
)));
}
- @Test
+ @Test
public void MuServer_TargetServerDownInMiddleTest_ClientTalkToTargetServer() throws Exception {
this.targetServer = httpServer()
@@ -94,8 +99,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToTargetServer() thr
)));
}
- @Test
- public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Exception {
+ @RepeatedTest(3)
+ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter(RepetitionInfo repetitionInfo) throws Exception {
this.targetServer = httpServer()
.addHandler(Method.GET, "/sse/counter", (request, response, pathParams) -> {
@@ -108,7 +113,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Ex
.start();
this.crankerRouter = crankerRouter()
- .withConnectorMaxWaitInMillis(4000).start();
+ .withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
+ .start();
this.router = httpsServer()
.addHandler(crankerRouter.createRegistrationHandler())
@@ -116,7 +122,8 @@ public void MuServer_TargetServerDownInMiddleTest_ClientTalkToRouter() throws Ex
.withHttp2Config(Http2ConfigBuilder.http2Config().enabled(false))
.start();
- this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer, 2, router);
+ this.connector = startConnectorAndWaitForRegistration(crankerRouter, "*", targetServer,
+ preferredProtocols(repetitionInfo), 2, router);
this.client = SseTestClient.startSse(router.uri().resolve("/sse/counter"));
this.client.waitUntilError(100, TimeUnit.SECONDS);
diff --git a/src/test/java/com/hsbc/cranker/connector/ManualTest.java b/src/test/java/manual/ManualTest.java
similarity index 96%
rename from src/test/java/com/hsbc/cranker/connector/ManualTest.java
rename to src/test/java/manual/ManualTest.java
index 787c5c6..20df11e 100644
--- a/src/test/java/com/hsbc/cranker/connector/ManualTest.java
+++ b/src/test/java/manual/ManualTest.java
@@ -1,5 +1,6 @@
-package com.hsbc.cranker.connector;
+package manual;
+import com.hsbc.cranker.connector.*;
import io.muserver.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,10 +29,7 @@ public static void main(String[] args) {
MuServer targetServer = muServer()
.withHttpsPort(12123)
.withHttp2Config(Http2ConfigBuilder.http2Enabled())
- .addHandler(Method.GET, "/hi", (request, response, pathParams) -> {
- System.out.println("received /hi");
- response.write("hi");
- })
+ .addHandler(Method.GET, "/hi", (request, response, pathParams) -> response.write("hi"))
.addHandler(Method.GET, "/health", (request, response, pathParams) -> {
response.contentType("text/plain;charset=utf-8");
for (HttpConnection con : request.server().activeConnections()) {
diff --git a/src/test/java/manual/RunLocalConnector.java b/src/test/java/manual/RunLocalConnector.java
new file mode 100644
index 0000000..bc8973d
--- /dev/null
+++ b/src/test/java/manual/RunLocalConnector.java
@@ -0,0 +1,55 @@
+package manual;
+
+import com.hsbc.cranker.connector.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_3;
+
+public class RunLocalConnector {
+
+ private static final Logger log = LoggerFactory.getLogger(RunLocalConnector.class);
+
+ static {
+ System.setProperty("jdk.internal.httpclient.disableHostnameVerification", "true");
+ }
+
+ public static void main(String[] args) {
+
+ final CrankerConnector connector = CrankerConnectorBuilder.connector()
+ .withPreferredProtocols(List.of(CRANKER_PROTOCOL_3))
+ .withDomain("127.0.0.1")
+ .withRouterUris(() -> List.of(URI.create("wss://localhost:12666")))
+ .withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
+ .withComponentName("local-test")
+ .withRoute("*")
+ .withTarget(URI.create("http://localhost:14444"))
+ .withRouterRegistrationListener(new RouterEventListener() {
+ public void onRegistrationChanged(ChangeData data) {
+ log.info("Router registration changed: " + data);
+ }
+ public void onSocketConnectionError(RouterRegistration router1, Throwable exception) {
+ log.warn("Error connecting to " + router1, exception);
+ }
+ })
+ .withProxyEventListener(new ProxyEventListener() {
+ @Override
+ public void onProxyError(HttpRequest request, Throwable error) {
+ log.warn("onProxyError, request="+ request, error);
+ }
+ })
+ .start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ final boolean stop = connector.stop(5, TimeUnit.SECONDS);
+ log.info("connector stop, state={}", stop);
+ }));
+
+ System.out.println("connector started");
+ }
+}