Skip to content

Commit

Permalink
improve proxy listener handling (#1)
Browse files Browse the repository at this point in the history
improve proxy listener handling, bug fix for v3 when response has no body, and make test stable.
  • Loading branch information
jayjlu authored Sep 14, 2023
1 parent a82556f commit 6bfefa2
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 372 deletions.
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.muserver</groupId>
<artifactId>mu-server</artifactId>
<version>0.73.6</version>
<version>0.74.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20230227</version>
<version>20230618</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -90,7 +90,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.3</version>
<version>5.10.0</version>
<scope>test</scope>
</dependency>

Expand All @@ -109,7 +109,7 @@
<dependency>
<groupId>com.hsbc.cranker</groupId>
<artifactId>cranker-connector</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
<scope>test</scope>
</dependency>

Expand Down
97 changes: 50 additions & 47 deletions src/main/java/com/hsbc/cranker/mucranker/CrankerMuHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class CrankerMuHandler implements MuHandler {
private static final String ANY_DOMAIN = "*";

static final Set<String> HOP_BY_HOP = new HashSet<>(asList(
"keep-alive", "transfer-encoding",
"te", "connection", "trailer", "upgrade",
"proxy-authorization",
"proxy-authenticate"
"keep-alive", "transfer-encoding",
"te", "connection", "trailer", "upgrade",
"proxy-authorization",
"proxy-authenticate"
));
static final Set<String> REPRESSED;
static final String MU_ID = "muid";
Expand All @@ -43,11 +43,11 @@ class CrankerMuHandler implements MuHandler {
List<String> doNotForwardToTarget = new ArrayList<>();
doNotForwardToTarget.addAll(HOP_BY_HOP);
doNotForwardToTarget.addAll(asList(
// expect is already handled by mu server, so if it's forwarded it will break stuff
"expect",
// expect is already handled by mu server, so if it's forwarded it will break stuff
"expect",

// Headers that mucranker will overwrite
"forwarded", "x-forwarded-by", "x-forwarded-for", "x-forwarded-host", "x-forwarded-proto", "x-forwarded-port", "x-forwarded-server", "via"
// Headers that mucranker will overwrite
"forwarded", "x-forwarded-by", "x-forwarded-for", "x-forwarded-host", "x-forwarded-proto", "x-forwarded-port", "x-forwarded-server", "via"
));
REPRESSED = new HashSet<>(doNotForwardToTarget);
}
Expand Down Expand Up @@ -130,43 +130,43 @@ private boolean distributeTraffic(MuRequest clientRequest, MuResponse clientResp

private boolean dispatchV1(MuRequest clientRequest, MuResponse clientResponse, String target, boolean useCatchAll, AsyncHandle asyncHandle) {
webSocketFarm.acquireSocket(target, useCatchAll, clientRequest, clientResponse,
(crankedSocket, waitTimeInMillis) -> sendRequestOverWebSocket(clientRequest, clientResponse, asyncHandle, crankedSocket, waitTimeInMillis),
(statusCode, waitTimeInMillis, header, body) -> {
sendSimpleResponse(clientResponse, asyncHandle, statusCode, header, body);
if (!proxyListeners.isEmpty()) {
ProxyInfo proxyInfo = new ErrorProxyInfo(target, clientRequest, clientResponse, waitTimeInMillis);
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onFailureToAcquireProxySocket(proxyInfo);
(crankedSocket, waitTimeInMillis) -> sendRequestOverWebSocket(clientRequest, clientResponse, asyncHandle, crankedSocket, waitTimeInMillis),
(statusCode, waitTimeInMillis, header, body) -> {
sendSimpleResponse(clientResponse, asyncHandle, statusCode, header, body);
if (!proxyListeners.isEmpty()) {
ProxyInfo proxyInfo = new ErrorProxyInfo(target, clientRequest, clientResponse, waitTimeInMillis);
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onFailureToAcquireProxySocket(proxyInfo);
}
}
}
});
});
return true;
}

private boolean dispatchV3(MuRequest clientRequest, MuResponse clientResponse, String domain, String target, boolean useCatchAll, AsyncHandle asyncHandle) {
final WebSocketFarmV3 webSocketFarmV3 = webSocketFarmV3Holder.getWebSocketFarmV3(domain);
if (webSocketFarmV3 == null) {
sendSimpleResponse(clientResponse, asyncHandle, 503,
"503 Service Unavailable",
"V3 connector not available for domain");
"503 Service Unavailable",
"V3 connector not available for domain");
return true;
}
webSocketFarmV3.getWebSocket(target, useCatchAll)
.whenComplete((routerSocketV3, throwable) -> {
if (routerSocketV3 == null || throwable != null) {
sendSimpleResponse(clientResponse, asyncHandle, 503,
"503 Service Unavailable",
"V3 connector not available");
if (!proxyListeners.isEmpty()) {
ProxyInfo proxyInfo = new ErrorProxyInfo(target, clientRequest, clientResponse, 0);
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onFailureToAcquireProxySocket(proxyInfo);
.whenComplete((routerSocketV3, throwable) -> {
if (routerSocketV3 == null || throwable != null) {
sendSimpleResponse(clientResponse, asyncHandle, 503,
"503 Service Unavailable",
"V3 connector not available");
if (!proxyListeners.isEmpty()) {
ProxyInfo proxyInfo = new ErrorProxyInfo(target, clientRequest, clientResponse, 0);
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onFailureToAcquireProxySocket(proxyInfo);
}
}
return;
}
return;
}
routerSocketV3.sendRequestOverWebSocketV3(clientRequest, clientResponse);
});
routerSocketV3.sendRequestOverWebSocketV3(clientRequest, clientResponse);
});
return true;
}

Expand Down Expand Up @@ -209,13 +209,16 @@ private void sendRequestOverWebSocket(MuRequest clientRequest, MuResponse client
@Override
public void onDataReceived(ByteBuffer buffer, DoneCallback callback) {
try {
crankedSocket.sendData(buffer, callback);

if (!proxyListeners.isEmpty()) {
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onRequestBodyChunkSentToTarget(crankedSocket, buffer);
final int position = buffer.position();
final DoneCallback doneWrapper = error -> {
if (error == null && !proxyListeners.isEmpty()) {
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onRequestBodyChunkSentToTarget(crankedSocket, buffer.position(position));
}
}
}
callback.onComplete(error);
};
crankedSocket.sendData(buffer, doneWrapper);
} catch (Exception e) {
onError(e);
}
Expand Down Expand Up @@ -269,7 +272,7 @@ public void onError(Throwable t) {
crankedSocket.socketSessionClose();
} catch (Throwable ei) {
log.error("Fail to close crankedSocket, routerName=" + crankedSocket.route + ", routerSocketID=" + crankedSocket.routerSocketID
+ " \n" + ei.getMessage());
+ " \n" + ei.getMessage());
} finally {
asyncHandle.complete();
}
Expand All @@ -279,7 +282,7 @@ public void onError(Throwable t) {
static void handleException(MuRequest clientRequest, MuResponse clientResponse, AsyncHandle asyncHandle, Throwable e) {
final Object muId = clientRequest.attribute(MU_ID);
log.error(String.format("Error setting up. ErrorID=%s, request.uri=%s, request.startTime=%s, response.hasStartedSendingData=%s, response.status=%s, response.state=%s",
muId, clientRequest.uri(), clientRequest.startTime(), clientResponse.hasStartedSendingData(), clientResponse.status(), clientResponse.responseState()), e);
muId, clientRequest.uri(), clientRequest.startTime(), clientResponse.hasStartedSendingData(), clientResponse.status(), clientResponse.responseState()), e);
try {
if (!clientResponse.hasStartedSendingData()) {
clientResponse.status(500);
Expand All @@ -306,15 +309,15 @@ static void sendSimpleResponse(MuResponse response, AsyncHandle asyncHandle, int
response.headers().remove("content-length");
response.contentType(ContentTypes.TEXT_HTML_UTF8);
String html = "<html><head><title>" + Mutils.htmlEncode(header) + "</title><body>"
+ "<h1>" + Mutils.htmlEncode(header) + "</h1><p>"
+ htmlBody + "</p></body></html>";
+ "<h1>" + Mutils.htmlEncode(header) + "</h1><p>"
+ htmlBody + "</p></body></html>";
asyncHandle.write(Mutils.toByteBuffer(html), throwable -> {
if (throwable == null) {
asyncHandle.complete();
} else {
asyncHandle.complete(throwable);
if (throwable == null) {
asyncHandle.complete();
} else {
asyncHandle.complete(throwable);
}
}
}
);
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/hsbc/cranker/mucranker/ProxyListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public interface ProxyListener {
* This will be called many times if the body has been fragmented
*
* @param info Info about the request and response.
* @param chunk Request body data. This chunk of ByteBuffer has already consumed before call.
* So please use {@link ByteBuffer#asReadOnlyBuffer()} to Creates a new, read-only byte buffer that shares this buffer's content.
* @param chunk Request body data which already been sent to target successfully.
*
*/
default void onRequestBodyChunkSentToTarget(ProxyInfo info, ByteBuffer chunk) {};
Expand All @@ -115,8 +114,7 @@ public interface ProxyListener {
* This will be called many times if the body has been fragmented
*
* @param info Info about the request and response.
* @param chunk Response body data. This chunk of ByteBuffer has already consumed before call.
* So please use {@link ByteBuffer#asReadOnlyBuffer()} to Creates a new, read-only byte buffer that shares this buffer's content.
* @param chunk Response body data received from the target.
*
*/
default void onResponseBodyChunkReceivedFromTarget(ProxyInfo info, ByteBuffer chunk) {};
Expand Down
32 changes: 20 additions & 12 deletions src/main/java/com/hsbc/cranker/mucranker/RouterSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onClientClosed(int statusCode, String reason) throws Exception {
if (statusCode == 1000) {
asyncHandle.complete();
} else {
log.info("Closing client request early due to cranker wss connection close with status code {}", statusCode);
log.info("Closing client request early due to cranker wss connection close with status code {} {}", statusCode, reason);
asyncHandle.complete(new RuntimeException("Upstream Server Error"));
}
} catch (IllegalStateException e) {
Expand Down Expand Up @@ -219,20 +219,28 @@ public void onBinary(ByteBuffer byteBuffer, boolean isLast, DoneCallback doneCal
log.debug("routerName=" + route + ", routerSocketID=" + routerSocketID +
", sending " + len + " bytes to client");
}
final int position = byteBuffer.position();
asyncHandle.write(byteBuffer, errorIfAny -> {
if (errorIfAny == null) {
bytesSent.addAndGet(len);
} else {
log.info("routerName=" + route + ", routerSocketID=" + routerSocketID +
", could not write to client response (maybe the user closed their browser)" +
" so will cancel the request. Error message: " + errorIfAny.getMessage());
}
doneCallback.onComplete(errorIfAny); // if error not null, then onError will be called
try {
if (errorIfAny == null) {
bytesSent.addAndGet(len);
} else {
log.info("routerName=" + route + ", routerSocketID=" + routerSocketID +
", could not write to client response (maybe the user closed their browser)" +
" so will cancel the request. Error message: " + errorIfAny.getMessage());
}

if (!proxyListeners.isEmpty()) {
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onResponseBodyChunkReceivedFromTarget(this, byteBuffer);
if (!proxyListeners.isEmpty()) {
for (ProxyListener proxyListener : proxyListeners) {
proxyListener.onResponseBodyChunkReceivedFromTarget(this, byteBuffer.position(position));
}
}
} catch (Throwable throwable) {
log.warn("something wrong after sending bytes to cranker", throwable);
} finally {
// if error not null, then onError will be called
// this call will release ByteBuffer and pull new ByteBuffer
doneCallback.onComplete(errorIfAny);
}
});
}
Expand Down
Loading

0 comments on commit 6bfefa2

Please sign in to comment.