Skip to content

Commit

Permalink
Use UrlConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
malkusch committed Nov 7, 2023
1 parent 39efec6 commit dc6ad0a
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 49 deletions.
6 changes: 4 additions & 2 deletions src/main/java/de/malkusch/km200/KM200.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import de.malkusch.km200.KM200Exception.ServerError;
import de.malkusch.km200.http.Http;
import de.malkusch.km200.http.JdkHttp;
import de.malkusch.km200.http.RetryHttp;
import de.malkusch.km200.http.SerializedHttp;
import de.malkusch.km200.http.UrlHttp;

/**
* This is an API for Bosch/Buderus/Junkers heaters with a KM200 gateway.
Expand Down Expand Up @@ -127,7 +127,9 @@ public KM200(String uri, int retries, Duration timeout, String gatewayPassword,
this.comm = new KM200Comm();

{
Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout);
// Http http = new JdkHttp(uri.replaceAll("/*$", ""), USER_AGENT,
// timeout);
Http http = new UrlHttp(uri.replaceAll("/*$", ""), USER_AGENT, timeout);

/*
* The KM200 itself is not thread safe. This proxy serializes all
Expand Down
35 changes: 31 additions & 4 deletions src/main/java/de/malkusch/km200/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,49 @@ sealed interface Request {
String path();

static record Get(String path) implements Request {

@Override
public String toString() {
return "GET " + path;
}
}

static record Post(String path, byte[] body) implements Request {

@Override
public String toString() {
return "POST " + path;
}
}
}

public static record Response(int status, byte[] body) {
}

protected abstract Response send(Request request) throws IOException, InterruptedException, KM200Exception;

public final Response get(String path) throws KM200Exception, IOException, InterruptedException {
return send(new Request.Get(path));
return exchange(new Request.Get(path));
}

public final Response post(String path, byte[] body) throws KM200Exception, IOException, InterruptedException {
return send(new Request.Post(path, body));
return exchange(new Request.Post(path, body));
}

protected abstract Response exchange(Request request) throws IOException, InterruptedException, KM200Exception;

static Response assertHttpOk(Request request, Response response) throws KM200Exception {
var status = response.status();
if (status >= 200 && status <= 299) {
return response;

} else {
throw switch (status) {
case 400 -> new KM200Exception.BadRequest(request + " was a bad request");
case 403 -> new KM200Exception.Forbidden(request + " is forbidden");
case 404 -> new KM200Exception.NotFound(request + " was not found");
case 423 -> new KM200Exception.Locked(request + " was locked");
case 500 -> new KM200Exception.ServerError(request + " resulted in a server error");
default -> new KM200Exception(request + " failed with response code " + status);
};
}
}
}
53 changes: 32 additions & 21 deletions src/main/java/de/malkusch/km200/http/JdkHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.net.http.HttpClient.newBuilder;
import static java.net.http.HttpClient.Redirect.ALWAYS;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpRequest.BodyPublishers.ofByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -10,11 +11,12 @@
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

import de.malkusch.km200.KM200Exception;
Expand All @@ -28,43 +30,42 @@ public final class JdkHttp extends Http {
private final String userAgent;
private final Duration timeout;
private final Duration hardTimeout;
private final ExecutorService executor;

public JdkHttp(String uri, String userAgent, Duration timeout) {
this.uri = uri;
this.userAgent = userAgent;
this.timeout = timeout;
this.hardTimeout = timeout.plus(timeout.dividedBy(2)); // 1.5 * timeout
this.hardTimeout = timeout.multipliedBy(2);

this.executor = Executors.newCachedThreadPool(r -> {
var thread = new Thread(r, "KM200 Http");
thread.setDaemon(true);
return thread;
});

this.client = newBuilder() //
.connectTimeout(timeout) //
.cookieHandler(new CookieManager()) //
.followRedirects(ALWAYS) //
.executor(executor) //
.version(HTTP_1_1) //
.build();
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
var httpRequest = httpRequest(request);
var response = send(httpRequest);
return assertHttpOk(request, response);
}

var status = response.statusCode();

if (status >= 200 && status <= 299) {
return new Response(status, response.body());

} else {
throw switch (status) {
case 400 -> new KM200Exception.BadRequest("Bad request to " + request.path());
case 403 -> new KM200Exception.Forbidden(request.path() + " is forbidden");
case 404 -> new KM200Exception.NotFound(request.path() + " was not found");
case 423 -> new KM200Exception.Locked(request.path() + " was locked");
case 500 -> new KM200Exception.ServerError(request.path() + " resulted in a server error");
default -> new KM200Exception(request.path() + " failed with response code " + status);
};
}
private Response send(HttpRequest request) throws IOException, InterruptedException {
var response = client.send(request, BodyHandlers.ofByteArray());
return new Response(response.statusCode(), response.body());
}

private HttpResponse<byte[]> send(HttpRequest request) throws IOException, InterruptedException {
private Response send2(HttpRequest request) throws IOException, InterruptedException {
try {
/*
* It appears that the JDK's http client has a bug which causes it
Expand All @@ -75,11 +76,13 @@ private HttpResponse<byte[]> send(HttpRequest request) throws IOException, Inter
* https://bugs.openjdk.org/browse/JDK-8208693
* https://bugs.openjdk.org/browse/JDK-8254223
*/
return client.sendAsync(request, BodyHandlers.ofByteArray()) //
return client //
.sendAsync(request, BodyHandlers.ofByteArray()) //
.thenApply(it -> new Response(it.statusCode(), it.body())) //
.get(hardTimeout.toMillis(), MILLISECONDS);

} catch (TimeoutException e) {
throw new HttpTimeoutException(request.uri() + " timed out");
throw new HttpTimeoutException(request.uri() + " timed out: " + e.getMessage());

} catch (ExecutionException e) {
if (e.getCause() instanceof IOException cause) {
Expand Down Expand Up @@ -115,4 +118,12 @@ private HttpRequest httpRequest(Request request) {

return builder.build();
}

/*
* @Override public void close() throws Exception { executor.shutdown(); if
* (executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { return; }
* executor.shutdownNow(); if
* (!executor.awaitTermination(timeout.toMillis(), MILLISECONDS)) { throw
* new IOException("Couldn't shutdown executor"); } }
*/
}
4 changes: 2 additions & 2 deletions src/main/java/de/malkusch/km200/http/RetryHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ RetryPolicy.<Response> builder() //
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
try {
return retry.get(() -> http.send(request));
return retry.get(() -> http.exchange(request));

} catch (FailsafeException e) {
if (e.getCause() instanceof IOException cause) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/de/malkusch/km200/http/SerializedHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public SerializedHttp(Http http) {
}

@Override
public Response send(Request request) throws IOException, InterruptedException, KM200Exception {
public Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
lock.lockInterruptibly();
try {
return http.send(request);
return http.exchange(request);

} finally {
lock.unlock();
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/de/malkusch/km200/http/UrlHttp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package de.malkusch.km200.http;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.http.HttpTimeoutException;
import java.time.Duration;

import de.malkusch.km200.KM200Exception;
import de.malkusch.km200.http.Http.Request.Post;

public final class UrlHttp extends Http {

private final String uri;
private final String userAgent;
private final int timeoutMillis;

public UrlHttp(String uri, String userAgent, Duration timeout) {
this.uri = uri;
this.userAgent = userAgent;
this.timeoutMillis = (int) timeout.toMillis();
}

@Override
protected Response exchange(Request request) throws IOException, InterruptedException, KM200Exception {
var connection = (HttpURLConnection) new URL(uri + request.path()).openConnection();
connection.setConnectTimeout(timeoutMillis);
connection.setReadTimeout(timeoutMillis);
connection.setRequestProperty("User-Agent", userAgent);

if (request instanceof Post) {
connection.setDoOutput(true);
connection.setRequestProperty("Accept", "application/json");
}

try {
connection.connect();

if (request instanceof Post post) {
try (var output = connection.getOutputStream()) {
output.write(post.body());
}
}

var status = connection.getResponseCode();
if (status == -1) {
throw new IOException(request + " received invalid HTTP response");
}

try (var input = connection.getErrorStream() != null ? connection.getErrorStream()
: connection.getInputStream()) {

var body = input.readAllBytes();
var response = new Response(status, body);
return assertHttpOk(request, response);
}

} catch (SocketTimeoutException e) {
throw new HttpTimeoutException(request + " timed out");

} finally {
connection.disconnect();
}
}
}
21 changes: 3 additions & 18 deletions src/test/java/de/malkusch/km200/KM200Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.lessThanOrExactly;
import static com.github.tomakehurst.wiremock.client.WireMock.notFound;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
Expand Down Expand Up @@ -316,29 +317,12 @@ public void updateShouldTimeoutResponseBody() throws Exception {
assertThrows(HttpTimeoutException.class, () -> km200.update("/update-timeout-body", 42));
}

@Test
public void queryShouldHardTimeout() throws Exception {
stubFor(get("/hard-timeout").willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200)));
var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT);

assertThrows(HttpTimeoutException.class, () -> km200.query("/hard-timeout"));
}

@Test
public void updateShouldHardTimeout() throws Exception {
stubFor(post("/hard-timeout-update")
.willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200)));
var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT);

assertThrows(HttpTimeoutException.class, () -> km200.update("/hard-timeout-update", 42));
}

@Test
public void queryShouldRetryOnTimeout() throws Exception {
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs(STARTED)
.willReturn(ok(loadBody("gateway.DateTime")).withFixedDelay(100)).willSetStateTo("retry2"));
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("retry2").willReturn(serverError())
.willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(10, 200)).willSetStateTo("ok"));
.willReturn(ok(loadBody("gateway.DateTime")).withChunkedDribbleDelay(2, 200)).willSetStateTo("ok"));
stubFor(get("/retry").inScenario("retry").whenScenarioStateIs("ok")
.willReturn(ok(loadBody("gateway.DateTime"))));
var km200 = new KM200(URI, Duration.ofMillis(50), GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT);
Expand Down Expand Up @@ -420,6 +404,7 @@ public void updateShouldNotRetryOnBadResponse(Fault fault) throws Exception {
var km200 = new KM200(URI, TIMEOUT, GATEWAY_PASSWORD, PRIVATE_PASSWORD, SALT);

assertThrows(IOException.class, () -> km200.update("/update-bad-response-retry", 42));
// verify(lessThanOrExactly(2), postRequestedFor(urlEqualTo("/update-bad-response-retry")));
verify(1, postRequestedFor(urlEqualTo("/update-bad-response-retry")));
}

Expand Down

0 comments on commit dc6ad0a

Please sign in to comment.