Skip to content

Commit

Permalink
refactor(worker): minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
saig0 committed Aug 20, 2021
1 parent a00eb81 commit c7afad3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 58 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Example BPMN with service task:
* `url` - the url to invoke
* optional custom headers:
* `method` - the HTTP method to use (default: `GET`, allowed: `post` | `get` | `put` | `delete` | `patch`)
* `contentType` - the type of the request body (default: `application/json`, allowed: any valid HTTP content type)
* `accept` - the type of the response body that is accepted (default: `application/json`, allowed: `application/json`, `text/plain`)
* `statusCodeCompletion` - Status codes that lead to completion of the service task (default: `1xx,2xx`, allowed: comma separated list of codes including 1xx, 2xx, 3xx, 4xx and 5xx)
* `statusCodeFailure` - Status codes that lead to the job failing (default: `3xx,4xx,5xx`, allowed: comma separated list of codes including 1xx, 2xx, 3xx, 4xx and 5xx)
* `errorCodePath` - path expression (dot notation) to extract the error code of a failed response body (e.g. `error.code`). If the error code is present then a BPMN error is thrown with this code instead of failing the job. Otherwise, that leads to the job failing.
Expand Down
81 changes: 33 additions & 48 deletions src/main/java/io/zeebe/http/HttpJobHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,54 +127,48 @@ private HttpRequest buildRequest(ConfigurationMaps configurationMaps) {
final String method = getMethod(configurationMaps);
final HttpRequest.BodyPublisher bodyPublisher = getBodyPublisher(configurationMaps);

final var contentType = getContentType(configurationMaps).orElse("application/json");
final var accept = getAccept(configurationMaps).orElse("application/json");

final HttpRequest.Builder builder =
HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(CONNECTION_TIMEOUT)
.header("Content-Type", contentType)
.header("Accept", accept)
.method(method, bodyPublisher);

getAuthorization(configurationMaps).ifPresent(auth -> builder.header("Authorization", auth));

// if no accept or content type header then default to json
getContentType(configurationMaps)
.ifPresentOrElse(contentType -> builder.header("Content-Type", contentType),
() -> builder.header("Content-Type", "application/json"));
getAccept(configurationMaps)
.ifPresentOrElse(accept -> builder.header("Accept", accept),
() -> builder.header("Accept", "application/json"));

return builder.build();
}

private String getUrl(ConfigurationMaps configMaps) {
private Optional<String> getConfig(final ConfigurationMaps configMaps,
final String parameterUrl) {
return configMaps
.getString(PARAMETER_URL)
.map(url -> placeholderProcessor.process(url, configMaps.getConfig()))
.getString(parameterUrl)
.map(url -> placeholderProcessor.process(url, configMaps.getConfig()));
}

private String getUrl(ConfigurationMaps configMaps) {
return getConfig(configMaps, PARAMETER_URL)
.orElseThrow(() -> new RuntimeException("Missing required parameter: " + PARAMETER_URL));
}

private Optional<String> getAuthorization(ConfigurationMaps configMaps) {
return configMaps
.getString(PARAMETER_AUTHORIZATION)
.map(auth -> placeholderProcessor.process(auth, configMaps.getConfig()));
return getConfig(configMaps, PARAMETER_AUTHORIZATION);
}

private Optional<String> getContentType(ConfigurationMaps configMaps) {
return configMaps
.getString(PARAMETER_CONTENT_TYPE)
.map(contentType -> placeholderProcessor.process(contentType, configMaps.getConfig()));
return getConfig(configMaps, PARAMETER_CONTENT_TYPE);
}

private Optional<String> getAccept(ConfigurationMaps configMaps) {
return configMaps
.getString(PARAMETER_ACCEPT)
.map(accept -> placeholderProcessor.process(accept, configMaps.getConfig()));
return getConfig(configMaps, PARAMETER_ACCEPT);
}

private String getMethod(ConfigurationMaps configMaps) {
return configMaps
.getString(PARAMETER_METHOD)
.map(method -> placeholderProcessor.process(method, configMaps.getConfig()))
return getConfig(configMaps, PARAMETER_METHOD)
.map(String::toUpperCase)
.orElse("GET");
}
Expand Down Expand Up @@ -222,36 +216,27 @@ private boolean checkIfCodeMatches(String statusCode, String matchCodePattern) {
|| (statusCode.startsWith("5") && matchCodePattern.contains("5xx"));
}

private Map<String, Object> processResponse(ActivatedJob job,
HttpResponse<String> response, HttpRequest request) {
private Map<String, Object> processResponse(ActivatedJob job,
HttpResponse<String> response, HttpRequest request) {

final Map<String, Object> result = new java.util.HashMap<>();
int statusCode = response.statusCode();
result.put("statusCode", statusCode);
Optional<String> respBody = Optional.ofNullable(response.body())
.filter(body -> !body.isEmpty());
String acceptValue = request.headers().firstValue("Accept").orElse(null);
// If accepting plain text
if (hasContentTypeHeader(response.headers(), "text/plain") &&
("text/plain".equals(acceptValue))) {
respBody.ifPresent(body -> result.put("body", body));
} else {
// Assuming json by default
respBody.map(this::bodyToObject)

Optional.ofNullable(response.body())
.filter(body -> !body.isEmpty())
.map(body -> {
if (request.headers().allValues("Accept").contains("text/plain") &&
response.headers().allValues("Content-Type").contains("text/plain")
) {
return body;
} else {
return bodyToObject(body);
}
})
.ifPresent(body -> result.put("body", body));
}
return result;
}

private boolean hasContentTypeHeader(HttpHeaders headers, String contentTypeHeader) {
boolean hasContentTypeHeader = false;
try {
hasContentTypeHeader = Optional.ofNullable(headers.firstValue("Content-Type"))
.filter(contentType -> contentType.get().contains(contentTypeHeader))
.isPresent();
}catch(Exception e) {
System.out.println(e.toString());
}
return hasContentTypeHeader;
return result;
}

private Object bodyToObject(String body) {
Expand Down
22 changes: 12 additions & 10 deletions src/test/java/io/zeebe/http/ProcessIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,28 @@ public void testGetAcceptPlainTextResponse() {
}

@Test
public void failsIfDoesNotAcceptResponseType() {
public void testPostContentTypePlainText() {
stubFor(
post(urlEqualTo("/api"))
.willReturn(aResponse().withHeader("Content-Type", "text/plain")
.withBody("This is text")));
.willReturn(aResponse().withStatus(200)));

final var processInstance =
createInstance(
serviceTask ->
serviceTask
.zeebeTaskHeader("url", WIRE_MOCK_RULE.baseUrl() + "/api")
.zeebeTaskHeader("method", "POST"));
.zeebeTaskHeader("method", "POST")
.zeebeTaskHeader("contentType", "text/plain"),
Map.of("body", "This is text"));

final var recorderJob =
RecordingExporter.jobRecords(JobIntent.FAILED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.getFirst();
ZeebeTestRule.assertThat(processInstance)
.isEnded()
.hasVariable("statusCode", 200);

assertThat(recorderJob.getValue().getErrorMessage()).isNotNull()
.contains("Failed to deserialize response body from JSON");
WIRE_MOCK_RULE.verify(
postRequestedFor(urlEqualTo("/api"))
.withHeader("Content-Type", equalTo("text/plain"))
.withRequestBody(equalTo("This is text")));
}

@Test
Expand Down

0 comments on commit c7afad3

Please sign in to comment.