Skip to content

Commit

Permalink
Check for errors during performance test by checking HTTP status.
Browse files Browse the repository at this point in the history
Previously ProducerPerformance read, but ignored the response entity and
ConsumerPerformance didn't read the response entity when the HTTP method was
DELETE, so both had conditions where they could miss errors. Although this
doesn't check for specific, expected statuses, checking for any >= 400 should
catch any important errors.
  • Loading branch information
ewencp committed Jan 14, 2015
1 parent eb95474 commit 8c07eb2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.rest.entities.ErrorMessage;

public class ConsumerPerformance extends AbstractPerformanceTest {

Expand All @@ -39,6 +40,8 @@ public class ConsumerPerformance extends AbstractPerformanceTest {
String deleteUrl;
long consumedRecords = 0;

private final ObjectMapper jsonDeserializer = new ObjectMapper();

public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.out.println(
Expand Down Expand Up @@ -120,22 +123,27 @@ private <T> T request(String target, String method, byte[] entity, String entity
if (entity != null) {
connection.setRequestProperty("Content-Type", Versions.KAFKA_MOST_SPECIFIC_DEFAULT);
connection.setRequestProperty("Content-Length", entityLength);
connection.setDoInput(true);
}

connection.setDoInput(true);
connection.setUseCaches(false);
if (method != "DELETE") {
connection.setDoOutput(true);
}

if (entity != null) {
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(entity);
os.flush();
os.close();
}

if (method != "DELETE") {
int responseStatus = connection.getResponseCode();
if (responseStatus >= 400) {
InputStream es = connection.getErrorStream();
ErrorMessage errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class);
es.close();
throw new RuntimeException(
String.format("Unexpected HTTP error status %d for %s request to %s: %s",
responseStatus, method, target, errorMessage.getMessage()));
}
if (responseStatus != HttpURLConnection.HTTP_NO_CONTENT) {
InputStream is = connection.getInputStream();
T result = serializer.readValue(is, responseFormat);
is.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.TopicProduceRecord;
import io.confluent.kafkarest.entities.TopicProduceRequest;
import io.confluent.rest.entities.ErrorMessage;

public class ProducerPerformance extends AbstractPerformanceTest {

Expand All @@ -39,6 +40,8 @@ public class ProducerPerformance extends AbstractPerformanceTest {
byte[] requestEntity;
byte[] buffer;

private final ObjectMapper jsonDeserializer = new ObjectMapper();

public static void main(String[] args) throws Exception {
if (args.length < 6) {
System.out.println(
Expand Down Expand Up @@ -103,6 +106,15 @@ protected void doIteration(PerformanceStats.Callback cb) {
os.flush();
os.close();

int responseStatus = connection.getResponseCode();
if (responseStatus >= 400) {
InputStream es = connection.getErrorStream();
ErrorMessage errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class);
es.close();
throw new RuntimeException(
String.format("Unexpected HTTP error status %d: %s",
responseStatus, errorMessage.getMessage()));
}
InputStream is = connection.getInputStream();
while (is.read(buffer) > 0) {
// Ignore output, just make sure we actually receive it
Expand Down

0 comments on commit 8c07eb2

Please sign in to comment.