Skip to content

Commit

Permalink
improving v1 connector socket close, and improving test cases for v3 …
Browse files Browse the repository at this point in the history
…coverage (#8)
  • Loading branch information
jayjlu authored Oct 9, 2023
1 parent 7133f03 commit eaf54b7
Show file tree
Hide file tree
Showing 15 changed files with 1,047 additions and 226 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hsbc.cranker</groupId>
<artifactId>mu-cranker-router</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.muserver</groupId>
<artifactId>mu-server</artifactId>
<version>0.74.1</version>
<version>0.74.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ public void close(State newState, int statusCode, Throwable error) {
if (webSocket != null && !webSocket.isOutputClosed()) {
webSocket.sendClose(statusCode, error != null ? error.getMessage() : "");
}
if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled()) {
if (responseFuture != null && !responseFuture.isDone() && !responseFuture.isCancelled()
&& statusCode != WebSocket.NORMAL_CLOSURE) {
responseFuture.cancel(true);
}
if (responseBodySubscription != null) {
Expand Down
98 changes: 63 additions & 35 deletions src/test/java/com/hsbc/cranker/connector/BaseEndToEndTest.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package com.hsbc.cranker.connector;

import com.hsbc.cranker.mucranker.CrankerRouter;
import com.hsbc.cranker.mucranker.CrankerRouterBuilder;
import io.muserver.MuServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.RepetitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hsbc.cranker.mucranker.CrankerRouter;
import com.hsbc.cranker.mucranker.CrankerRouterBuilder;
import scaffolding.AssertUtils;

import java.net.URI;
import java.net.http.HttpClient;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_1;
import static com.hsbc.cranker.connector.CrankerConnectorBuilder.CRANKER_PROTOCOL_3;
import static io.muserver.MuServerBuilder.muServer;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static scaffolding.AssertUtils.assertEventually;

public class BaseEndToEndTest {

private static final Logger log = LoggerFactory.getLogger(BaseEndToEndTest.class);

protected final HttpClient testClient = HttpUtils.createHttpClientBuilder(true).build();

protected CrankerRouter crankerRouter = CrankerRouterBuilder.crankerRouter().start();
protected CrankerRouter crankerRouter = CrankerRouterBuilder
.crankerRouter()
.withSupportedCrankerProtocols(List.of("cranker_3.0", "cranker_1.0"))
.start();
protected MuServer registrationServer = startRegistrationServer(0);
protected MuServer crankerServer = startCrankerServer(0);

Expand All @@ -38,45 +47,64 @@ protected static URI registrationUri(URI routerUri) {
}

public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter,
String targetServiceName,
MuServer target,
int slidingWindowSize,
MuServer... registrationRouters) {
CrankerConnector connector = startConnector(targetServiceName, target, slidingWindowSize, registrationRouters);
waitForRegistration(targetServiceName, 2, crankerRouter);
String targetServiceName,
MuServer target,
int slidingWindowSize,
MuServer... registrationRouters) {
return startConnectorAndWaitForRegistration(crankerRouter, targetServiceName, target, List.of(CRANKER_PROTOCOL_1), slidingWindowSize, registrationRouters);
}

public static CrankerConnector startConnectorAndWaitForRegistration(CrankerRouter crankerRouter,
String targetServiceName,
MuServer target,
List<String> preferredProtocols,
int slidingWindowSize,
MuServer... registrationRouters) {
CrankerConnector connector = CrankerConnectorBuilder.connector()
.withPreferredProtocols(preferredProtocols)
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withTarget(target.uri())
.withRoute(targetServiceName)
.withRouterUris(RegistrationUriSuppliers.fixedUris(Stream.of(registrationRouters)
.map(s -> registrationUri(s.uri()))
.collect(toList())))
.withSlidingWindowSize(slidingWindowSize)
.start();

waitForRegistration(targetServiceName, connector.connectorId(), 2, crankerRouter);

assertEventually(
() -> new ArrayList<>(connector.routers().get(0).idleSockets()).get(0).version(),
equalTo(preferredProtocols.get(0)));

return connector;
}

public static void waitForRegistration(String targetServiceName, int slidingWindow, CrankerRouter... crankerRouters) {
int attempts = 0;
public static void waitForRegistration(String targetServiceName, String connectorInstanceId, int slidingWindow, CrankerRouter... crankerRouters) {
final String serviceKey = targetServiceName.isEmpty() ? "*" : targetServiceName;
for (CrankerRouter crankerRouter : crankerRouters) {
while (crankerRouter.collectInfo().services()
AssertUtils.assertEventually(() -> crankerRouter.collectInfo().services()
.stream()
.anyMatch(service -> service.route().equals(serviceKey)
&& !service.connectors().isEmpty()
&& service.connectors()
.stream()
.noneMatch(service -> service.route().equals(serviceKey)
&& service.connectors().size() > 0
&& service.connectors().get(0).connections().size() >= slidingWindow)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (attempts++ == 100) throw new RuntimeException("Failed to register " + targetServiceName);
}
.anyMatch(connector -> connector.connectorInstanceID().equals(connectorInstanceId)
&& connector.connections().size() >= slidingWindow)
), is(true));
}
}

public static CrankerConnector startConnector(String targetServiceName, MuServer target, int slidingWindowSize, MuServer... registrationRouters) {
List<URI> uris = Stream.of(registrationRouters)
.map(s -> registrationUri(s.uri()))
.collect(toList());
return CrankerConnectorBuilder.connector()
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withTarget(target.uri())
.withRoute(targetServiceName)
.withRouterUris(RegistrationUriSuppliers.fixedUris(uris))
.withSlidingWindowSize(slidingWindowSize)
.start();
public static List<String> preferredProtocols(RepetitionInfo repetitionInfo) {
final int currentRepetition = repetitionInfo.getCurrentRepetition();
switch (currentRepetition) {
case 1:
return List.of(CRANKER_PROTOCOL_1);
case 2:
return List.of(CRANKER_PROTOCOL_3);
default:
return List.of(CRANKER_PROTOCOL_3, CRANKER_PROTOCOL_1);
}
}

@AfterEach
Expand Down
98 changes: 98 additions & 0 deletions src/test/java/com/hsbc/cranker/connector/ConcurrentUploadTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.hsbc.cranker.connector;

import io.muserver.MuHandler;
import io.muserver.MuServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.RepetitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.muserver.MuServerBuilder.httpServer;
import static org.junit.jupiter.api.Assertions.*;

public class ConcurrentUploadTest extends BaseEndToEndTest {

private static final Logger log = LoggerFactory.getLogger(ConcurrentUploadTest.class);

private volatile MuHandler handler = (request, response) -> false;

protected MuServer targetServer = httpServer()
.addHandler((request, response) -> handler.handle(request, response))
.start();

private CrankerConnector connector;

@BeforeEach
void setUp(RepetitionInfo repetitionInfo) {
connector = CrankerConnectorBuilder.connector()
.withPreferredProtocols(preferredProtocols(repetitionInfo))
.withHttpClient(CrankerConnectorBuilder.createHttpClient(true).build())
.withRouterUris(RegistrationUriSuppliers.fixedUris(registrationUri(registrationServer.uri())))
.withRoute("*")
.withTarget(targetServer.uri())
.withProxyEventListener(new ProxyEventListener() {
@Override
public void onProxyError(HttpRequest request, Throwable error) {
log.warn("onProxyError, request=" + request, error);
}
})
.withComponentName("cranker-connector-unit-test")
.start();

waitForRegistration("*", connector.connectorId(),2, crankerRouter);
}

@AfterEach
public void stop() throws Exception {
if (connector != null) assertTrue(connector.stop(10, TimeUnit.SECONDS));
if (targetServer != null) targetServer.stop();
}

@RepeatedTest(3)
public void postLargeBody() throws InterruptedException {

handler = (request, response) -> {
response.status(200);
response.write(request.readBodyAsString());
return true;
};

Queue<HttpResponse<String>> responses = new ConcurrentLinkedQueue<>();
CountDownLatch countDownLatch = new CountDownLatch(10);

final String body = "c".repeat(10 * 1000);
for(int i = 0; i < 10; i++) {
final int finalI = i;
new Thread(() -> {
try {
HttpResponse<String> resp = testClient.send(HttpRequest.newBuilder()
.method("POST", HttpRequest.BodyPublishers.ofString(body))
.uri(crankerServer.uri().resolve("/?task=" + finalI))
.build(), HttpResponse.BodyHandlers.ofString());
responses.add(resp);
countDownLatch.countDown();
} catch (Exception e) {
log.error("Concurrent request error", e);
responses.add(null);
}
}).start();
}

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertEquals(10, responses.size());
for (HttpResponse<String> response: responses) {
assertNotNull(response);
assertEquals(200, response.statusCode());
assertEquals(body, response.body());
}
}
}
Loading

0 comments on commit eaf54b7

Please sign in to comment.