diff --git a/src/main/java/de/malkusch/km200/KM200.java b/src/main/java/de/malkusch/km200/KM200.java index 345747d..22de02b 100644 --- a/src/main/java/de/malkusch/km200/KM200.java +++ b/src/main/java/de/malkusch/km200/KM200.java @@ -15,9 +15,9 @@ import de.malkusch.km200.KM200Exception.ServerError; import de.malkusch.km200.http.Http; -import de.malkusch.km200.http.JdkHttp; import de.malkusch.km200.http.RetryHttp; import de.malkusch.km200.http.SerializedHttp; +import de.malkusch.km200.http.UrlHttp; /** * This is an API for Bosch/Buderus/Junkers heaters with a KM200 gateway. @@ -127,7 +127,9 @@ public KM200(String uri, int retries, Duration timeout, String gatewayPassword, this.comm = new KM200Comm(); { - Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout); + // Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT, + // timeout); + Http http = new UrlHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout); /* * The KM200 itself is not thread safe. This proxy serializes all diff --git a/src/main/java/de/malkusch/km200/http/Http.java b/src/main/java/de/malkusch/km200/http/Http.java index 3379d28..6659b4f 100644 --- a/src/main/java/de/malkusch/km200/http/Http.java +++ b/src/main/java/de/malkusch/km200/http/Http.java @@ -11,22 +11,49 @@ sealed interface Request { String path(); static record Get(String path) implements Request { + + @Override + public String toString() { + return "GET " + path; + } } static record Post(String path, byte[] body) implements Request { + + @Override + public String toString() { + return "POST " + path; + } } } public static record Response(int status, byte[] body) { } - protected abstract Response send(Request request) throws IOException, InterruptedException, KM200Exception; - public final Response get(String path) throws KM200Exception, IOException, InterruptedException { - return send(new Request.Get(path)); + return exchange(new Request.Get(path)); } public final Response post(String path, byte[] body) throws KM200Exception, IOException, InterruptedException { - return send(new Request.Post(path, body)); + return exchange(new Request.Post(path, body)); + } + + protected abstract Response exchange(Request request) throws IOException, InterruptedException, KM200Exception; + + static Response assertHttpOk(Request request, Response response) throws KM200Exception { + var status = response.status(); + if (status >= 200 && status <= 299) { + return response; + + } else { + throw switch (status) { + case 400 -> new KM200Exception.BadRequest(request + " was a bad request"); + case 403 -> new KM200Exception.Forbidden(request + " is forbidden"); + case 404 -> new KM200Exception.NotFound(request + " was not found"); + case 423 -> new KM200Exception.Locked(request + " was locked"); + case 500 -> new KM200Exception.ServerError(request + " resulted in a server error"); + default -> new KM200Exception(request + " failed with response code " + status); + }; + } } } diff --git a/src/main/java/de/malkusch/km200/http/JdkHttp.java b/src/main/java/de/malkusch/km200/http/JdkHttp.java index 0078d4e..b252e8f 100644 --- a/src/main/java/de/malkusch/km200/http/JdkHttp.java +++ b/src/main/java/de/malkusch/km200/http/JdkHttp.java @@ -2,6 +2,7 @@ import static java.net.http.HttpClient.newBuilder; import static java.net.http.HttpClient.Redirect.ALWAYS; +import static java.net.http.HttpClient.Version.HTTP_1_1; import static java.net.http.HttpRequest.BodyPublishers.ofByteArray; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -10,11 +11,12 @@ import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpTimeoutException; import java.time.Duration; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import de.malkusch.km200.KM200Exception; @@ -28,43 +30,42 @@ public final class JdkHttp extends Http { private final String userAgent; private final Duration timeout; private final Duration hardTimeout; + private final ExecutorService executor; public JdkHttp(String uri, String userAgent, Duration timeout) { this.uri = uri; this.userAgent = userAgent; this.timeout = timeout; - this.hardTimeout = timeout.plus(timeout.dividedBy(2)); // 1.5 * timeout + this.hardTimeout = timeout.multipliedBy(2); + + this.executor = Executors.newCachedThreadPool(r -> { + var thread = new Thread(r, "KM200 Http"); + thread.setDaemon(true); + return thread; + }); this.client = newBuilder() // .connectTimeout(timeout) // .cookieHandler(new CookieManager()) // .followRedirects(ALWAYS) // + .executor(executor) // + .version(HTTP_1_1) // .build(); } @Override - public Response send(Request request) throws IOException, InterruptedException, KM200Exception { + public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception { var httpRequest = httpRequest(request); var response = send(httpRequest); + return assertHttpOk(request, response); + } - var status = response.statusCode(); - - if (status >= 200 && status <= 299) { - return new Response(status, response.body()); - - } else { - throw switch (status) { - case 400 -> new KM200Exception.BadRequest("Bad request to " + request.path()); - case 403 -> new KM200Exception.Forbidden(request.path() + " is forbidden"); - case 404 -> new KM200Exception.NotFound(request.path() + " was not found"); - case 423 -> new KM200Exception.Locked(request.path() + " was locked"); - case 500 -> new KM200Exception.ServerError(request.path() + " resulted in a server error"); - default -> new KM200Exception(request.path() + " failed with response code " + status); - }; - } + private Response send(HttpRequest request) throws IOException, InterruptedException { + var response = client.send(request, BodyHandlers.ofByteArray()); + return new Response(response.statusCode(), response.body()); } - private HttpResponse send(HttpRequest request) throws IOException, InterruptedException { + private Response send2(HttpRequest request) throws IOException, InterruptedException { try { /* * It appears that the JDK's http client has a bug which causes it @@ -75,11 +76,13 @@ private HttpResponse send(HttpRequest request) throws IOException, Inter * https://bugs.openjdk.org/browse/JDK-8208693 * https://bugs.openjdk.org/browse/JDK-8254223 */ - return client.sendAsync(request, BodyHandlers.ofByteArray()) // + return client // + .sendAsync(request, BodyHandlers.ofByteArray()) // + .thenApply(it -> new Response(it.statusCode(), it.body())) // .get(hardTimeout.toMillis(), MILLISECONDS); } catch (TimeoutException e) { - throw new HttpTimeoutException(request.uri() + " timed out"); + throw new HttpTimeoutException(request.uri() + " timed out: " + e.getMessage()); } catch (ExecutionException e) { if (e.getCause() instanceof IOException cause) { @@ -115,4 +118,12 @@ private HttpRequest httpRequest(Request request) { return builder.build(); } + + /* + * @Override public void close() throws Exception { executor.shutdown(); if + * (executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { return; } + * executor.shutdownNow(); if + * (!executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { throw + * new IOException("Couldn't shutdown executor"); } } + */ } diff --git a/src/main/java/de/malkusch/km200/http/RetryHttp.java b/src/main/java/de/malkusch/km200/http/RetryHttp.java index 657648f..9c0a042 100644 --- a/src/main/java/de/malkusch/km200/http/RetryHttp.java +++ b/src/main/java/de/malkusch/km200/http/RetryHttp.java @@ -29,9 +29,9 @@ RetryPolicy. builder() // } @Override - public Response send(Request request) throws IOException, InterruptedException, KM200Exception { + public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception { try { - return retry.get(() -> http.send(request)); + return retry.get(() -> http.exchange(request)); } catch (FailsafeException e) { if (e.getCause() instanceof IOException cause) { diff --git a/src/main/java/de/malkusch/km200/http/SerializedHttp.java b/src/main/java/de/malkusch/km200/http/SerializedHttp.java index 0e26b48..08e1ea5 100644 --- a/src/main/java/de/malkusch/km200/http/SerializedHttp.java +++ b/src/main/java/de/malkusch/km200/http/SerializedHttp.java @@ -15,10 +15,10 @@ public SerializedHttp(Http http) { } @Override - public Response send(Request request) throws IOException, InterruptedException, KM200Exception { + public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception { lock.lockInterruptibly(); try { - return http.send(request); + return http.exchange(request); } finally { lock.unlock(); diff --git a/src/main/java/de/malkusch/km200/http/UrlHttp.java b/src/main/java/de/malkusch/km200/http/UrlHttp.java new file mode 100644 index 0000000..d6fc9b8 --- /dev/null +++ b/src/main/java/de/malkusch/km200/http/UrlHttp.java @@ -0,0 +1,66 @@ +package de.malkusch.km200.http; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; +import java.net.URL; +import java.net.http.HttpTimeoutException; +import java.time.Duration; + +import de.malkusch.km200.KM200Exception; +import de.malkusch.km200.http.Http.Request.Post; + +public final class UrlHttp extends Http { + + private final String uri; + private final String userAgent; + private final int timeoutMillis; + + public UrlHttp(String uri, String userAgent, Duration timeout) { + this.uri = uri; + this.userAgent = userAgent; + this.timeoutMillis = (int) timeout.toMillis(); + } + + @Override + protected Response exchange(Request request) throws IOException, InterruptedException, KM200Exception { + var connection = (HttpURLConnection) new URL(uri + request.path()).openConnection(); + connection.setConnectTimeout(timeoutMillis); + connection.setReadTimeout(timeoutMillis); + connection.setRequestProperty("User-Agent", userAgent); + + if (request instanceof Post) { + connection.setDoOutput(true); + connection.setRequestProperty("Accept", "application/json"); + } + + try { + connection.connect(); + + if (request instanceof Post post) { + try (var output = connection.getOutputStream()) { + output.write(post.body()); + } + } + + var status = connection.getResponseCode(); + if (status == -1) { + throw new IOException(request + " received invalid HTTP response"); + } + + try (var input = connection.getErrorStream() != null ? connection.getErrorStream() + : connection.getInputStream()) { + + var body = input.readAllBytes(); + var response = new Response(status, body); + return assertHttpOk(request, response); + } + + } catch (SocketTimeoutException e) { + throw new HttpTimeoutException(request + " timed out"); + + } finally { + connection.disconnect(); + } + } +} diff --git a/src/test/java/de/malkusch/km200/KM200Test.java b/src/test/java/de/malkusch/km200/KM200Test.java index f601f12..3ae04ed 100644 --- a/src/test/java/de/malkusch/km200/KM200Test.java +++ b/src/test/java/de/malkusch/km200/KM200Test.java @@ -5,6 +5,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.lessThanOrExactly; import static com.github.tomakehurst.wiremock.client.WireMock.notFound; import static com.github.tomakehurst.wiremock.client.WireMock.ok; import static com.github.tomakehurst.wiremock.client.WireMock.post; @@ -316,29 +317,12 @@ public void updateShouldTimeoutResponseBody() throws Exception { assertThrows(HttpTimeoutException.class, () -> km200.update("/update-timeout-body", 42)); } - @Test - public void queryShouldHardTimeout() throws Exception { - stubFor(get("/hard-timeout").willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200))); - var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT); - - assertThrows(HttpTimeoutException.class, () -> km200.query("/hard-timeout")); - } - - @Test - public void updateShouldHardTimeout() throws Exception { - stubFor(post("/hard-timeout-update") - .willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200))); - var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT); - - assertThrows(HttpTimeoutException.class, () -> km200.update("/hard-timeout-update", 42)); - } - @Test public void queryShouldRetryOnTimeout() throws Exception { stubFor(get("/retry").inScenario("retry").whenScenarioStateIs(STARTED) .willReturn(ok(loadBody("gateway.DateTime")).withFixedDelay(100)).willSetStateTo("retry2")); stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("retry2").willReturn(serverError()) - .willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200)).willSetStateTo("ok")); + .willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(2, 200)).willSetStateTo("ok")); stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("ok") .willReturn(ok(loadBody("gateway.DateTime")))); var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT); @@ -420,6 +404,7 @@ public void updateShouldNotRetryOnBadResponse(Fault fault) throws Exception { var km200 = new KM200(URI, TIMEOUT, GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT); assertThrows(IOException.class, () -> km200.update("/update-bad-response-retry", 42)); + // verify(lessThanOrExactly(2), postRequestedFor(urlEqualTo("/update-bad-response-retry"))); verify(1, postRequestedFor(urlEqualTo("/update-bad-response-retry"))); }