Skip to content

Commit

Permalink
Workaround hanging KM200
Browse files Browse the repository at this point in the history
It appears that the JDK's http client has a bug which causes it to hang
infinetely. This commit adds a workaround by wrapping the call into
an executor with a hard timeout.

See also: https://bugs.openjdk.org/browse/JDK-8258397
          https://bugs.openjdk.org/browse/JDK-8208693
          https://bugs.openjdk.org/browse/JDK-8254223
  • Loading branch information
malkusch committed Nov 7, 2023
1 parent 697ff98 commit 540e677
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 27 deletions.
6 changes: 4 additions & 2 deletions src/main/java/de/malkusch/km200/KM200.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import de.malkusch.km200.KM200Exception.ServerError;
import de.malkusch.km200.http.Http;
import de.malkusch.km200.http.JdkHttp;
import de.malkusch.km200.http.RetryHttp;
import de.malkusch.km200.http.SerializedHttp;
import de.malkusch.km200.http.UrlHttp;

/**
* This is an API for Bosch/Buderus/Junkers heaters with a KM200 gateway.
Expand Down Expand Up @@ -127,7 +127,9 @@ public KM200(String uri, int retries, Duration timeout, String gatewayPassword,
this.comm = new KM200Comm();

{
Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout);
// Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT,
// timeout);
Http http = new UrlHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout);

/*
* The KM200 itself is not thread safe. This proxy serializes all
Expand Down
35 changes: 31 additions & 4 deletions src/main/java/de/malkusch/km200/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,49 @@ sealed interface Request {
String path();

static record Get(String path) implements Request {

@Override
public String toString() {
return "GET " + path;
}
}

static record Post(String path, byte[] body) implements Request {

@Override
public String toString() {
return "POST " + path;
}
}
}

public static record Response(int status, byte[] body) {
}

protected abstract Response send(Request request) throws IOException, InterruptedException, KM200Exception;

public final Response get(String path) throws KM200Exception, IOException, InterruptedException {
return send(new Request.Get(path));
return exchange(new Request.Get(path));
}

public final Response post(String path, byte[] body) throws KM200Exception, IOException, InterruptedException {
return send(new Request.Post(path, body));
return exchange(new Request.Post(path, body));
}

protected abstract Response exchange(Request request) throws IOException, InterruptedException, KM200Exception;

static Response assertHttpOk(Request request, Response response) throws KM200Exception {
var status = response.status();
if (status >= 200 && status <= 299) {
return response;

} else {
throw switch (status) {
case 400 -> new KM200Exception.BadRequest(request + " was a bad request");
case 403 -> new KM200Exception.Forbidden(request + " is forbidden");
case 404 -> new KM200Exception.NotFound(request + " was not found");
case 423 -> new KM200Exception.Locked(request + " was locked");
case 500 -> new KM200Exception.ServerError(request + " resulted in a server error");
default -> new KM200Exception(request + " failed with response code " + status);
};
}
}
}
80 changes: 66 additions & 14 deletions src/main/java/de/malkusch/km200/http/JdkHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@

import static java.net.http.HttpClient.newBuilder;
import static java.net.http.HttpClient.Redirect.ALWAYS;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpRequest.BodyPublishers.ofByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.io.IOException;
import java.net.CookieManager;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import de.malkusch.km200.KM200Exception;
import de.malkusch.km200.http.Http.Request.Get;
Expand All @@ -22,37 +29,74 @@ public final class JdkHttp extends Http {
private final String uri;
private final String userAgent;
private final Duration timeout;
private final Duration hardTimeout;
private final ExecutorService executor;

public JdkHttp(String uri, String userAgent, Duration timeout) {
this.uri = uri;
this.userAgent = userAgent;
this.timeout = timeout;
this.hardTimeout = timeout.multipliedBy(2);

this.executor = Executors.newCachedThreadPool(r -> {
var thread = new Thread(r, "KM200 Http");
thread.setDaemon(true);
return thread;
});

this.client = newBuilder() //
.connectTimeout(timeout) //
.cookieHandler(new CookieManager()) //
.followRedirects(ALWAYS) //
.executor(executor) //
.version(HTTP_1_1) //
.build();
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
var httpRequest = httpRequest(request);
var response = client.send(httpRequest, BodyHandlers.ofByteArray());
var status = response.statusCode();
var response = send(httpRequest);
return assertHttpOk(request, response);
}

if (status >= 200 && status <= 299) {
return new Response(status, response.body());
private Response send(HttpRequest request) throws IOException, InterruptedException {
var response = client.send(request, BodyHandlers.ofByteArray());
return new Response(response.statusCode(), response.body());
}

} else {
throw switch (status) {
case 400 -> new KM200Exception.BadRequest("Bad request to " + request.path());
case 403 -> new KM200Exception.Forbidden(request.path() + " is forbidden");
case 404 -> new KM200Exception.NotFound(request.path() + " was not found");
case 423 -> new KM200Exception.Locked(request.path() + " was locked");
case 500 -> new KM200Exception.ServerError(request.path() + " resulted in a server error");
default -> new KM200Exception(request.path() + " failed with response code " + status);
};
private Response send2(HttpRequest request) throws IOException, InterruptedException {
try {
/*
* It appears that the JDK's http client has a bug which causes it
* to block infinitely. This is the async workaround with the hard
* timeout of CompletableFuture.get().
*
* https://bugs.openjdk.org/browse/JDK-8258397
* https://bugs.openjdk.org/browse/JDK-8208693
* https://bugs.openjdk.org/browse/JDK-8254223
*/
return client //
.sendAsync(request, BodyHandlers.ofByteArray()) //
.thenApply(it -> new Response(it.statusCode(), it.body())) //
.get(hardTimeout.toMillis(), MILLISECONDS);

} catch (TimeoutException e) {
throw new HttpTimeoutException(request.uri() + " timed out: " + e.getMessage());

} catch (ExecutionException e) {
if (e.getCause() instanceof IOException cause) {
throw cause;

} else if (e.getCause() instanceof InterruptedException cause) {
throw cause;

} else if (e.getCause() instanceof KM200Exception cause) {
throw cause;

} else {
throw new KM200Exception("Unexpected error for " + request.uri(), e);
}
}
}

Expand All @@ -74,4 +118,12 @@ private HttpRequest httpRequest(Request request) {

return builder.build();
}

/*
* @Override public void close() throws Exception { executor.shutdown(); if
* (executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { return; }
* executor.shutdownNow(); if
* (!executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { throw
* new IOException("Couldn't shutdown executor"); } }
*/
}
4 changes: 2 additions & 2 deletions src/main/java/de/malkusch/km200/http/RetryHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ RetryPolicy.<Response> builder() //
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
try {
return retry.get(() -> http.send(request));
return retry.get(() -> http.exchange(request));

} catch (FailsafeException e) {
if (e.getCause() instanceof IOException cause) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/de/malkusch/km200/http/SerializedHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public SerializedHttp(Http http) {
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
lock.lockInterruptibly();
try {
return http.send(request);
return http.exchange(request);

} finally {
lock.unlock();
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/de/malkusch/km200/http/UrlHttp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package de.malkusch.km200.http;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpTimeoutException;
import java.time.Duration;

import de.malkusch.km200.KM200Exception;
import de.malkusch.km200.http.Http.Request.Post;

public final class UrlHttp extends Http {

private final String uri;
private final String userAgent;
private final int timeoutMillis;

/**
* Avoid undesired POST retries from UrlConnection
*
* {@link HttpClient}
*/
static {
System.setProperty("sun.net.http.retryPost", "false");
}

public UrlHttp(String uri, String userAgent, Duration timeout) {
this.uri = uri;
this.userAgent = userAgent;
this.timeoutMillis = (int) timeout.toMillis();
}

@Override
protected Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
var connection = (HttpURLConnection) new URL(uri + request.path()).openConnection();
connection.setConnectTimeout(timeoutMillis);
connection.setReadTimeout(timeoutMillis);
connection.setRequestProperty("User-Agent", userAgent);

if (request instanceof Post) {
connection.setDoOutput(true);
connection.setRequestProperty("Accept", "application/json");
}

try {
connection.connect();

if (request instanceof Post post) {
try (var output = connection.getOutputStream()) {
output.write(post.body());
}
}

var status = connection.getResponseCode();
if (status == -1) {
throw new IOException(request + " received invalid HTTP response");
}

try (var input = connection.getErrorStream() != null ? connection.getErrorStream()
: connection.getInputStream()) {

var body = input.readAllBytes();
var response = new Response(status, body);
return assertHttpOk(request, response);
}

} catch (SocketTimeoutException e) {
throw new HttpTimeoutException(request + " timed out");

} finally {
connection.disconnect();
}
}
}
8 changes: 5 additions & 3 deletions src/test/java/de/malkusch/km200/KM200Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,17 @@ public void updateShouldTimeoutResponseBody() throws Exception {
@Test
public void queryShouldRetryOnTimeout() throws Exception {
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs(STARTED)
.willReturn(notFound().withFixedDelay(100)).willSetStateTo("retry"));
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("retry")
.willReturn(ok(loadBody("gateway.DateTime")).withFixedDelay(100)).willSetStateTo("retry2"));
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("retry2").willReturn(serverError())
.willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(2, 200)).willSetStateTo("ok"));
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("ok")
.willReturn(ok(loadBody("gateway.DateTime"))));
var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT);

var dateTime = km200.queryString("/retry");

assertEquals("2021-09-21T10:49:25", dateTime);
verify(2, getRequestedFor(urlEqualTo("/retry")));
verify(3, getRequestedFor(urlEqualTo("/retry")));
}

@Test
Expand Down

0 comments on commit 540e677

Please sign in to comment.