Skip to content

Commit

Permalink
Merge pull request #19 from confluentinc/performance-test-error-handling
Browse files Browse the repository at this point in the history
Check for errors during performance test by checking HTTP status.
  • Loading branch information
ewencp committed Jan 22, 2015
2 parents 5c275f7 + 8c07eb2 commit e36e7a2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.rest.entities.ErrorMessage;

public class ConsumerPerformance extends AbstractPerformanceTest {

Expand All @@ -41,6 +42,8 @@ public class ConsumerPerformance extends AbstractPerformanceTest {
String deleteUrl;
long consumedRecords = 0;

private final ObjectMapper jsonDeserializer = new ObjectMapper();

public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.out.println(
Expand Down Expand Up @@ -122,22 +125,27 @@ private <T> T request(String target, String method, byte[] entity, String entity
if (entity != null) {
connection.setRequestProperty("Content-Type", Versions.KAFKA_MOST_SPECIFIC_DEFAULT);
connection.setRequestProperty("Content-Length", entityLength);
connection.setDoInput(true);
}

connection.setDoInput(true);
connection.setUseCaches(false);
if (method != "DELETE") {
connection.setDoOutput(true);
}

if (entity != null) {
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(entity);
os.flush();
os.close();
}

if (method != "DELETE") {
int responseStatus = connection.getResponseCode();
if (responseStatus >= 400) {
InputStream es = connection.getErrorStream();
ErrorMessage errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class);
es.close();
throw new RuntimeException(
String.format("Unexpected HTTP error status %d for %s request to %s: %s",
responseStatus, method, target, errorMessage.getMessage()));
}
if (responseStatus != HttpURLConnection.HTTP_NO_CONTENT) {
InputStream is = connection.getInputStream();
T result = serializer.readValue(is, responseFormat);
is.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.TopicProduceRecord;
import io.confluent.kafkarest.entities.TopicProduceRequest;
import io.confluent.rest.entities.ErrorMessage;

public class ProducerPerformance extends AbstractPerformanceTest {

Expand All @@ -41,6 +42,8 @@ public class ProducerPerformance extends AbstractPerformanceTest {
byte[] requestEntity;
byte[] buffer;

private final ObjectMapper jsonDeserializer = new ObjectMapper();

public static void main(String[] args) throws Exception {
if (args.length < 6) {
System.out.println(
Expand Down Expand Up @@ -105,6 +108,15 @@ protected void doIteration(PerformanceStats.Callback cb) {
os.flush();
os.close();

int responseStatus = connection.getResponseCode();
if (responseStatus >= 400) {
InputStream es = connection.getErrorStream();
ErrorMessage errorMessage = jsonDeserializer.readValue(es, ErrorMessage.class);
es.close();
throw new RuntimeException(
String.format("Unexpected HTTP error status %d: %s",
responseStatus, errorMessage.getMessage()));
}
InputStream is = connection.getInputStream();
while (is.read(buffer) > 0) {
// Ignore output, just make sure we actually receive it
Expand Down

0 comments on commit e36e7a2

Please sign in to comment.