From 932da95b730a460fea06776ae02798637db31b62 Mon Sep 17 00:00:00 2001 From: Luc Cary Date: Sun, 11 Nov 2018 13:04:27 -0500 Subject: [PATCH 01/14] Add avro message deserialization capability --- README.md | 30 +++++++++++ pom.xml | 35 ++++++++++--- .../kafdrop/controller/MessageController.java | 7 ++- .../kafdrop/service/MessageInspector.java | 50 +++++++++++++++++-- .../resources/templates/message-inspector.ftl | 2 +- 5 files changed, 111 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 168cc65..9373a02 100644 --- a/README.md +++ b/README.md @@ -89,3 +89,33 @@ Starting in version 2.0.0, Kafdrop sets CORS headers for all endpoints. You can You can also disable CORS entirely with the following configuration: cors.enabled=false + +## Avro Consumer + +References: +* http://cloudurable.com/blog/kafka-avro-schema-registry/index.html +* https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html +* https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java +* https://docs.confluent.io/3.0.0/installation.html#maven-repository-for-jars + +Java Docs: +* https://www.javadoc.io/doc/org.apache.kafka/kafka_2.9.2/0.8.0 +* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.8.2.2 +* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.10.2.0 +* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.10.2.2 + +Sources: +* https://github.com/apache/kafka/tree/0.8.2/core/src/main/scala/kafka/javaapi + +## Development + +Project setup for Eclipse: +``` +mvn dependency:tree +mvn eclipse:clean +mvn eclipse:eclipse +``` + +For run configurations, add a new Maven run configuration called 'Default Build' +with the project workspace as the root, and then add `clean package` as the goals +and hit apply. diff --git a/pom.xml b/pom.xml index cc226b5..37c2905 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,17 @@ HEAD + + + central + http://repo1.maven.org/maven2 + + + confluent + http://packages.confluent.io/maven/ + + + @@ -73,6 +84,21 @@ spring-retry 1.1.3.RELEASE + + io.confluent + kafka-avro-serializer + 3.2.1 + + + org.apache.avro + avro + 1.8.1 + + + org.apache.kafka + kafka-clients + 0.10.2.2 + @@ -152,13 +178,6 @@ 0.7.1 test - - - org.apache.kafka - kafka-clients - 0.8.2.2 - test - @@ -313,7 +332,7 @@ org.apache.kafka kafka-clients - 0.8.2.2 + 0.10.2.2 diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index b08fb09..e5a4c7c 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -68,7 +68,12 @@ public String viewMessageForm(@PathVariable("name") String topicName, if (messageForm.isEmpty()) { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); - defaultForm.setCount(1l); + + // Set Reasonable Defaults. Ideally last 10 messages. + defaultForm.setCount(10l); + defaultForm.setOffset(0l); + defaultForm.setPartition(0); + model.addAttribute("messageForm", defaultForm); } diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 5143bda..10c264b 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -29,6 +29,8 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; + +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,10 +40,16 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; + + @Service public class MessageInspector { @@ -83,7 +91,7 @@ public List getMessages(String topicName, int partitionId, long offse StreamSupport.stream(messageSet.spliterator(), false) .limit(count - messages.size()) .map(MessageAndOffset::message) - .map(this::createMessage) + .map(m -> createMessage(m, topicName)) .forEach(messages::add); currentOffset += messages.size() - oldSize; } @@ -92,7 +100,7 @@ public List getMessages(String topicName, int partitionId, long offse .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message) + private MessageVO createMessage(Message message, String topicName) { MessageVO vo = new MessageVO(); if (message.hasKey()) @@ -101,7 +109,8 @@ private MessageVO createMessage(Message message) } if (!message.isNull()) { - vo.setMessage(readString(message.payload())); + final String messageString = deserializeMessage(message.payload(), topicName); + vo.setMessage(messageString); } vo.setValid(message.isValid()); @@ -112,6 +121,41 @@ private MessageVO createMessage(Message message) return vo; } + // https://www.programcreek.com/java-api-examples/index.php?api=io.confluent.kafka.serializers.KafkaAvroDeserializer + private String deserializeMessage(ByteBuffer buffer, String topicName) + { + KafkaAvroDeserializer deserializer = getDeserializer(); + + // Convert byte buffer to byte array + byte[] bytes = convertToBytes(buffer); + + // TODO: use an actual JSON formatter to render results: + String result = deserializer + .deserialize(topicName, bytes) + .toString() + .replaceAll(",", ",\n"); + + return result; + } + + private KafkaAvroDeserializer getDeserializer() { + Map config = new HashMap<>(); + // TODO: parameterize schema registry URL + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + "http://localhost:8081"); //<----- Run Schema Registry on 8081 + + KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); + kafkaAvroDeserializer.configure(config, false); + return kafkaAvroDeserializer; + } + + private byte[] convertToBytes(ByteBuffer buffer) + { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes, 0, bytes.length); + return bytes; + } + private String readString(ByteBuffer buffer) { try diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 0f7d855..a09b84d 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -91,7 +91,7 @@ <#elseif !(spring.status.error) && !(messageForm.empty)> - No messages found in partition ${messageForm.partition} at offset ${messageForm.offset} + No messages found in partition ${(messageForm.partition)!"PARTITION_NOT_SET"} at offset ${messageForm.offset} From a9626132ca0c0208478e25bdd045bc799a798269 Mon Sep 17 00:00:00 2001 From: Luc Cary Date: Wed, 14 Nov 2018 11:26:15 -0500 Subject: [PATCH 02/14] Add configuration setting for schema registry --- .../config/SchemaRegistryConfiguration.java | 44 +++++++++++++++++++ .../kafdrop/controller/MessageController.java | 14 +++++- .../kafdrop/service/MessageInspector.java | 18 ++++---- 3 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java new file mode 100644 index 0000000..5f800e9 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -0,0 +1,44 @@ +package com.homeadvisor.kafdrop.config; + +import org.hibernate.validator.constraints.NotBlank; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Configuration +public class SchemaRegistryConfiguration { + + + @Component + @ConfigurationProperties(prefix = "schemaregistry") + public static class SchemaRegistryProperties + { + public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); + @NotBlank + private String connect; + + public String getConnect() + { + return connect; + } + + public void setConnect(String connect) + { + this.connect = connect; + } + + public List getConnectList() + { + return CONNECT_SEPARATOR.splitAsStream(this.connect) + .map(String::trim) + .filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index e5a4c7c..922ec9a 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.KafkaMonitor; @@ -51,6 +52,9 @@ public class MessageController @Autowired private MessageInspector messageInspector; + @Autowired + private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + /** * Human friendly view of reading messages. * @param topicName Name of topic @@ -81,13 +85,16 @@ public String viewMessageForm(@PathVariable("name") String topicName, .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + if (!messageForm.isEmpty() && !errors.hasErrors()) { model.addAttribute("messages", messageInspector.getMessages(topicName, messageForm.getPartition(), messageForm.getOffset(), - messageForm.getCount())); + messageForm.getCount(), + schemaRegistryUrl)); } return "message-inspector"; @@ -125,12 +132,15 @@ public String viewMessageForm(@PathVariable("name") String topicName, } else { + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + List messages = new ArrayList<>(); List vos = messageInspector.getMessages( topicName, partition, offset, - count); + count, + schemaRegistryUrl); if(vos != null) { diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 10c264b..63d10aa 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -58,7 +58,7 @@ public class MessageInspector @Autowired private KafkaMonitor kafkaMonitor; - public List getMessages(String topicName, int partitionId, long offset, long count) + public List getMessages(String topicName, int partitionId, long offset, long count, String schemaRegistryUrl) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -91,7 +91,7 @@ public List getMessages(String topicName, int partitionId, long offse StreamSupport.stream(messageSet.spliterator(), false) .limit(count - messages.size()) .map(MessageAndOffset::message) - .map(m -> createMessage(m, topicName)) + .map(m -> createMessage(m, topicName, schemaRegistryUrl)) .forEach(messages::add); currentOffset += messages.size() - oldSize; } @@ -100,7 +100,7 @@ public List getMessages(String topicName, int partitionId, long offse .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message, String topicName) + private MessageVO createMessage(Message message, String topicName, String schemaRegistryUrl) { MessageVO vo = new MessageVO(); if (message.hasKey()) @@ -109,7 +109,7 @@ private MessageVO createMessage(Message message, String topicName) } if (!message.isNull()) { - final String messageString = deserializeMessage(message.payload(), topicName); + final String messageString = deserializeMessage(message.payload(), topicName, schemaRegistryUrl); vo.setMessage(messageString); } @@ -122,9 +122,9 @@ private MessageVO createMessage(Message message, String topicName) } // https://www.programcreek.com/java-api-examples/index.php?api=io.confluent.kafka.serializers.KafkaAvroDeserializer - private String deserializeMessage(ByteBuffer buffer, String topicName) + private String deserializeMessage(ByteBuffer buffer, String topicName, String schemaRegistryUrl) { - KafkaAvroDeserializer deserializer = getDeserializer(); + KafkaAvroDeserializer deserializer = getDeserializer(schemaRegistryUrl); // Convert byte buffer to byte array byte[] bytes = convertToBytes(buffer); @@ -138,11 +138,11 @@ private String deserializeMessage(ByteBuffer buffer, String topicName) return result; } - private KafkaAvroDeserializer getDeserializer() { + private KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl) { Map config = new HashMap<>(); // TODO: parameterize schema registry URL - config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - "http://localhost:8081"); //<----- Run Schema Registry on 8081 + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + //schemaRegistryUrl"http://localhost:8081"); //<----- Run Schema Registry on 8081 KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); From 8f6697253fd71f6ea1188f4909c7ceef1997354f Mon Sep 17 00:00:00 2001 From: Luc Cary Date: Wed, 14 Nov 2018 11:26:27 -0500 Subject: [PATCH 03/14] Add usage instructions --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 9373a02..ad036f7 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,11 @@ Then open a browser and navigate to http://localhost:9000. The port can be overr --server.port= ``` +Additionally, you can configure a schema registry connection with: +``` + --schemaregistry.connect=http://localhost:8081 +``` + ## Running with Docker Note for Mac Users: You need to convert newline formatting of the kafdrop.sh file *before* running this command: From 46dfcdbc53671c78f37a74fa23edf2471bd9405a Mon Sep 17 00:00:00 2001 From: Luc Cary Date: Wed, 14 Nov 2018 11:26:48 -0500 Subject: [PATCH 04/14] Remove list-parsing code from schema registry config parser --- .../kafdrop/config/SchemaRegistryConfiguration.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java index 5f800e9..1a44c36 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -5,9 +5,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.regex.Pattern; -import java.util.stream.Collectors; @Configuration public class SchemaRegistryConfiguration { @@ -17,7 +14,6 @@ public class SchemaRegistryConfiguration { @ConfigurationProperties(prefix = "schemaregistry") public static class SchemaRegistryProperties { - public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); @NotBlank private String connect; @@ -31,14 +27,6 @@ public void setConnect(String connect) this.connect = connect; } - public List getConnectList() - { - return CONNECT_SEPARATOR.splitAsStream(this.connect) - .map(String::trim) - .filter(s -> s.length() > 0) - .collect(Collectors.toList()); - } - } } From a08b3ccf2467db11aed828b8eaa724b40861ff58 Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 11:32:49 -0500 Subject: [PATCH 05/14] make config setting optional --- .../kafdrop/config/SchemaRegistryConfiguration.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java index 1a44c36..4b61b1f 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -1,6 +1,5 @@ package com.homeadvisor.kafdrop.config; -import org.hibernate.validator.constraints.NotBlank; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; @@ -9,12 +8,10 @@ @Configuration public class SchemaRegistryConfiguration { - @Component @ConfigurationProperties(prefix = "schemaregistry") public static class SchemaRegistryProperties { - @NotBlank private String connect; public String getConnect() From 9439427a7dc765a426119dc27af193725e738cb7 Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 11:33:31 -0500 Subject: [PATCH 06/14] Revert "Remove list-parsing code from schema registry config parser" This reverts commit 46dfcdbc53671c78f37a74fa23edf2471bd9405a. --- .../config/SchemaRegistryConfiguration.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java index 4b61b1f..d44fefe 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -4,6 +4,9 @@ import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; @Configuration public class SchemaRegistryConfiguration { @@ -12,6 +15,9 @@ public class SchemaRegistryConfiguration { @ConfigurationProperties(prefix = "schemaregistry") public static class SchemaRegistryProperties { + + public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); + private String connect; public String getConnect() @@ -24,6 +30,14 @@ public void setConnect(String connect) this.connect = connect; } + public List getConnectList() + { + return CONNECT_SEPARATOR.splitAsStream(this.connect) + .map(String::trim) + .filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } + } } From 04c363a109f5f5493e9e7fb801998a061d4e9bbe Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 11:49:40 -0500 Subject: [PATCH 07/14] make schema registry and deserialization optional --- pom.xml | 5 ++++ .../kafdrop/service/MessageInspector.java | 23 ++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 37c2905..9a693a1 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,11 @@ kafka-clients 0.10.2.2 + + com.google.code.findbugs + jsr305 + 3.0.2 + diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 63d10aa..9c228ee 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -31,6 +31,7 @@ import kafka.message.MessageAndOffset; import org.apache.avro.generic.GenericRecord; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,11 +39,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -58,7 +55,12 @@ public class MessageInspector @Autowired private KafkaMonitor kafkaMonitor; - public List getMessages(String topicName, int partitionId, long offset, long count, String schemaRegistryUrl) + public List getMessages( + String topicName, + int partitionId, + long offset, + long count, + @Nullable String schemaRegistryUrl) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -100,7 +102,7 @@ public List getMessages(String topicName, int partitionId, long offse .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message, String topicName, String schemaRegistryUrl) + private MessageVO createMessage(Message message, String topicName, @Nullable String schemaRegistryUrl) { MessageVO vo = new MessageVO(); if (message.hasKey()) @@ -109,7 +111,12 @@ private MessageVO createMessage(Message message, String topicName, String schema } if (!message.isNull()) { - final String messageString = deserializeMessage(message.payload(), topicName, schemaRegistryUrl); + final String messageString; + if (schemaRegistryUrl == null) { + messageString = readString(message.payload()); + } else { + messageString = deserializeMessage(message.payload(), topicName, schemaRegistryUrl); + } vo.setMessage(messageString); } From d7e992f94707c6d03b06801fab86fec5b8e7b774 Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 12:04:57 -0500 Subject: [PATCH 08/14] Add GSON dependency to pretty print JSON results --- pom.xml | 5 ++++ .../kafdrop/service/MessageInspector.java | 24 +++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index 9a693a1..1e73b39 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,11 @@ jsr305 3.0.2 + + com.google.code.gson + gson + 2.8.5 + diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 9c228ee..6b2fec8 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -18,6 +18,10 @@ package com.homeadvisor.kafdrop.service; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicPartitionVO; import com.homeadvisor.kafdrop.model.TopicVO; @@ -31,7 +35,6 @@ import kafka.message.MessageAndOffset; import org.apache.avro.generic.GenericRecord; -import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,6 +46,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; + import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; @@ -136,21 +141,20 @@ private String deserializeMessage(ByteBuffer buffer, String topicName, String sc // Convert byte buffer to byte array byte[] bytes = convertToBytes(buffer); - // TODO: use an actual JSON formatter to render results: - String result = deserializer - .deserialize(topicName, bytes) - .toString() - .replaceAll(",", ",\n"); + return formatJsonMessage(deserializer.deserialize(topicName, bytes).toString()); + } - return result; + private String formatJsonMessage(String jsonMessage) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(jsonMessage); + String formattedJsonMessage = gson.toJson(element); + return formattedJsonMessage; } private KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl) { Map config = new HashMap<>(); - // TODO: parameterize schema registry URL config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - //schemaRegistryUrl"http://localhost:8081"); //<----- Run Schema Registry on 8081 - KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); return kafkaAvroDeserializer; From 3f7cee50e4d9e99555a17eab185aa4ba1329c546 Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 12:17:18 -0500 Subject: [PATCH 09/14] Remove local readme notes --- README.md | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/README.md b/README.md index ad036f7..fd75f29 100644 --- a/README.md +++ b/README.md @@ -94,33 +94,3 @@ Starting in version 2.0.0, Kafdrop sets CORS headers for all endpoints. You can You can also disable CORS entirely with the following configuration: cors.enabled=false - -## Avro Consumer - -References: -* http://cloudurable.com/blog/kafka-avro-schema-registry/index.html -* https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html -* https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java -* https://docs.confluent.io/3.0.0/installation.html#maven-repository-for-jars - -Java Docs: -* https://www.javadoc.io/doc/org.apache.kafka/kafka_2.9.2/0.8.0 -* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.8.2.2 -* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.10.2.0 -* https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/0.10.2.2 - -Sources: -* https://github.com/apache/kafka/tree/0.8.2/core/src/main/scala/kafka/javaapi - -## Development - -Project setup for Eclipse: -``` -mvn dependency:tree -mvn eclipse:clean -mvn eclipse:eclipse -``` - -For run configurations, add a new Maven run configuration called 'Default Build' -with the project workspace as the root, and then add `clean package` as the goals -and hit apply. From 2f5621a6e39e5f96465492f132405bd7ea2ef380 Mon Sep 17 00:00:00 2001 From: "Luc Cary (CU)" Date: Wed, 14 Nov 2018 12:27:51 -0500 Subject: [PATCH 10/14] remove unnecessary comment and add note about requirements in readme --- README.md | 4 ++++ .../com/homeadvisor/kafdrop/controller/MessageController.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fd75f29..92374d5 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ Kafdrop is a UI for monitoring Apache Kafka clusters. The tool displays informat * Kafka (0.8.1 or 0.8.2 is known to work) * Zookeeper (3.4.5 or later) +Optional, additional integration: + +* Schema Registry + ## Building After cloning the repository, building should just be a matter of running a standard Maven build: diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index 922ec9a..fb4e074 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -73,7 +73,6 @@ public String viewMessageForm(@PathVariable("name") String topicName, { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); - // Set Reasonable Defaults. Ideally last 10 messages. defaultForm.setCount(10l); defaultForm.setOffset(0l); defaultForm.setPartition(0); From d353fa1a7f29bd0a9a080eb89690d2d038d8b6a1 Mon Sep 17 00:00:00 2001 From: lcary Date: Wed, 14 Nov 2018 12:29:24 -0500 Subject: [PATCH 11/14] Add Avro message deserialization capability (#1) * Add avro message deserialization capability * Add GSON dependency to pretty print JSON results --- README.md | 9 +++ pom.xml | 45 +++++++++--- .../config/SchemaRegistryConfiguration.java | 43 ++++++++++++ .../kafdrop/controller/MessageController.java | 20 +++++- .../kafdrop/service/MessageInspector.java | 69 +++++++++++++++++-- .../resources/templates/message-inspector.ftl | 2 +- 6 files changed, 169 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java diff --git a/README.md b/README.md index 168cc65..92374d5 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ Kafdrop is a UI for monitoring Apache Kafka clusters. The tool displays informat * Kafka (0.8.1 or 0.8.2 is known to work) * Zookeeper (3.4.5 or later) +Optional, additional integration: + +* Schema Registry + ## Building After cloning the repository, building should just be a matter of running a standard Maven build: @@ -30,6 +34,11 @@ Then open a browser and navigate to http://localhost:9000. The port can be overr --server.port= ``` +Additionally, you can configure a schema registry connection with: +``` + --schemaregistry.connect=http://localhost:8081 +``` + ## Running with Docker Note for Mac Users: You need to convert newline formatting of the kafdrop.sh file *before* running this command: diff --git a/pom.xml b/pom.xml index cc226b5..1e73b39 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,17 @@ HEAD + + + central + http://repo1.maven.org/maven2 + + + confluent + http://packages.confluent.io/maven/ + + + @@ -73,6 +84,31 @@ spring-retry 1.1.3.RELEASE + + io.confluent + kafka-avro-serializer + 3.2.1 + + + org.apache.avro + avro + 1.8.1 + + + org.apache.kafka + kafka-clients + 0.10.2.2 + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.google.code.gson + gson + 2.8.5 + @@ -152,13 +188,6 @@ 0.7.1 test - - - org.apache.kafka - kafka-clients - 0.8.2.2 - test - @@ -313,7 +342,7 @@ org.apache.kafka kafka-clients - 0.8.2.2 + 0.10.2.2 diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java new file mode 100644 index 0000000..d44fefe --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -0,0 +1,43 @@ +package com.homeadvisor.kafdrop.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Configuration +public class SchemaRegistryConfiguration { + + @Component + @ConfigurationProperties(prefix = "schemaregistry") + public static class SchemaRegistryProperties + { + + public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); + + private String connect; + + public String getConnect() + { + return connect; + } + + public void setConnect(String connect) + { + this.connect = connect; + } + + public List getConnectList() + { + return CONNECT_SEPARATOR.splitAsStream(this.connect) + .map(String::trim) + .filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index b08fb09..fb4e074 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.KafkaMonitor; @@ -51,6 +52,9 @@ public class MessageController @Autowired private MessageInspector messageInspector; + @Autowired + private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + /** * Human friendly view of reading messages. * @param topicName Name of topic @@ -68,7 +72,11 @@ public String viewMessageForm(@PathVariable("name") String topicName, if (messageForm.isEmpty()) { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); - defaultForm.setCount(1l); + + defaultForm.setCount(10l); + defaultForm.setOffset(0l); + defaultForm.setPartition(0); + model.addAttribute("messageForm", defaultForm); } @@ -76,13 +84,16 @@ public String viewMessageForm(@PathVariable("name") String topicName, .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + if (!messageForm.isEmpty() && !errors.hasErrors()) { model.addAttribute("messages", messageInspector.getMessages(topicName, messageForm.getPartition(), messageForm.getOffset(), - messageForm.getCount())); + messageForm.getCount(), + schemaRegistryUrl)); } return "message-inspector"; @@ -120,12 +131,15 @@ public String viewMessageForm(@PathVariable("name") String topicName, } else { + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + List messages = new ArrayList<>(); List vos = messageInspector.getMessages( topicName, partition, offset, - count); + count, + schemaRegistryUrl); if(vos != null) { diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 5143bda..6b2fec8 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -18,6 +18,10 @@ package com.homeadvisor.kafdrop.service; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicPartitionVO; import com.homeadvisor.kafdrop.model.TopicVO; @@ -29,6 +33,8 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; + +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,12 +42,16 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; + + @Service public class MessageInspector { @@ -50,7 +60,12 @@ public class MessageInspector @Autowired private KafkaMonitor kafkaMonitor; - public List getMessages(String topicName, int partitionId, long offset, long count) + public List getMessages( + String topicName, + int partitionId, + long offset, + long count, + @Nullable String schemaRegistryUrl) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -83,7 +98,7 @@ public List getMessages(String topicName, int partitionId, long offse StreamSupport.stream(messageSet.spliterator(), false) .limit(count - messages.size()) .map(MessageAndOffset::message) - .map(this::createMessage) + .map(m -> createMessage(m, topicName, schemaRegistryUrl)) .forEach(messages::add); currentOffset += messages.size() - oldSize; } @@ -92,7 +107,7 @@ public List getMessages(String topicName, int partitionId, long offse .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message) + private MessageVO createMessage(Message message, String topicName, @Nullable String schemaRegistryUrl) { MessageVO vo = new MessageVO(); if (message.hasKey()) @@ -101,7 +116,13 @@ private MessageVO createMessage(Message message) } if (!message.isNull()) { - vo.setMessage(readString(message.payload())); + final String messageString; + if (schemaRegistryUrl == null) { + messageString = readString(message.payload()); + } else { + messageString = deserializeMessage(message.payload(), topicName, schemaRegistryUrl); + } + vo.setMessage(messageString); } vo.setValid(message.isValid()); @@ -112,6 +133,40 @@ private MessageVO createMessage(Message message) return vo; } + // https://www.programcreek.com/java-api-examples/index.php?api=io.confluent.kafka.serializers.KafkaAvroDeserializer + private String deserializeMessage(ByteBuffer buffer, String topicName, String schemaRegistryUrl) + { + KafkaAvroDeserializer deserializer = getDeserializer(schemaRegistryUrl); + + // Convert byte buffer to byte array + byte[] bytes = convertToBytes(buffer); + + return formatJsonMessage(deserializer.deserialize(topicName, bytes).toString()); + } + + private String formatJsonMessage(String jsonMessage) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(jsonMessage); + String formattedJsonMessage = gson.toJson(element); + return formattedJsonMessage; + } + + private KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl) { + Map config = new HashMap<>(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); + kafkaAvroDeserializer.configure(config, false); + return kafkaAvroDeserializer; + } + + private byte[] convertToBytes(ByteBuffer buffer) + { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes, 0, bytes.length); + return bytes; + } + private String readString(ByteBuffer buffer) { try diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 0f7d855..a09b84d 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -91,7 +91,7 @@ <#elseif !(spring.status.error) && !(messageForm.empty)> - No messages found in partition ${messageForm.partition} at offset ${messageForm.offset} + No messages found in partition ${(messageForm.partition)!"PARTITION_NOT_SET"} at offset ${messageForm.offset} From 4c079edccbada1cb9cb34e458cb1726a5cd40d7c Mon Sep 17 00:00:00 2001 From: lcary Date: Wed, 12 Dec 2018 21:55:19 -0500 Subject: [PATCH 12/14] Move deserialization logic into MessageDeserializer classes (#2) * Move deserialization logic into MessageDeserializer classes * Use enums to select deserializer type in message inspector view * Clean up MessageController to remove comments and unnecessary decorators --- .../kafdrop/controller/MessageController.java | 54 ++++++++++- .../service/AvroMessageDeserializer.java | 53 +++++++++++ .../service/DefaultMessageDeserializer.java | 15 +++ .../kafdrop/service/MessageDeserializer.java | 10 ++ .../kafdrop/service/MessageInspector.java | 95 ++----------------- .../homeadvisor/kafdrop/util/ByteUtils.java | 48 ++++++++++ .../resources/templates/message-inspector.ftl | 10 ++ 7 files changed, 195 insertions(+), 90 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index fb4e074..62957a5 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -23,7 +23,10 @@ import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; +import com.homeadvisor.kafdrop.service.AvroMessageDeserializer; +import com.homeadvisor.kafdrop.service.DefaultMessageDeserializer; import com.homeadvisor.kafdrop.service.KafkaMonitor; +import com.homeadvisor.kafdrop.service.MessageDeserializer; import com.homeadvisor.kafdrop.service.MessageInspector; import com.homeadvisor.kafdrop.service.TopicNotFoundException; import io.swagger.annotations.ApiOperation; @@ -55,6 +58,10 @@ public class MessageController @Autowired private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + public static enum DeserializerType { + DEFAULT, AVRO; + } + /** * Human friendly view of reading messages. * @param topicName Name of topic @@ -76,6 +83,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, defaultForm.setCount(10l); defaultForm.setOffset(0l); defaultForm.setPartition(0); + defaultForm.setDeserializer(DeserializerType.DEFAULT); model.addAttribute("messageForm", defaultForm); } @@ -84,16 +92,20 @@ public String viewMessageForm(@PathVariable("name") String topicName, .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); - final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + model.addAttribute("defaultDeserializerType", DeserializerType.DEFAULT); + model.addAttribute("deserializerTypes", DeserializerType.values()); if (!messageForm.isEmpty() && !errors.hasErrors()) { + final MessageDeserializer deserializer = getDeserializer( + topicName, messageForm.getDeserializer()); + model.addAttribute("messages", messageInspector.getMessages(topicName, messageForm.getPartition(), messageForm.getOffset(), messageForm.getCount(), - schemaRegistryUrl)); + deserializer)); } return "message-inspector"; @@ -131,7 +143,8 @@ public String viewMessageForm(@PathVariable("name") String topicName, } else { - final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + // Currently, only default deserialization supported via JSON API. + final MessageDeserializer deserializer = new DefaultMessageDeserializer(); List messages = new ArrayList<>(); List vos = messageInspector.getMessages( @@ -139,7 +152,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, partition, offset, count, - schemaRegistryUrl); + deserializer); if(vos != null) { @@ -150,6 +163,19 @@ public String viewMessageForm(@PathVariable("name") String topicName, } } + private MessageDeserializer getDeserializer(String topicName, DeserializerType deserializerType) { + final MessageDeserializer deserializer; + + if (deserializerType == DeserializerType.AVRO) { + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl); + } else { + deserializer = new DefaultMessageDeserializer(); + } + + return deserializer; + } + /** * Encapsulates offset data for a single partition. */ @@ -180,11 +206,19 @@ public static class PartitionOffsetInfo @JsonProperty("lastOffset") private Long count; - public PartitionOffsetInfo(int partition, long offset, long count) + private DeserializerType deserializer; + + public PartitionOffsetInfo(int partition, long offset, long count, DeserializerType deserializer) { this.partition = partition; this.offset = offset; this.count = count; + this.deserializer = deserializer; + } + + public PartitionOffsetInfo(int partition, long offset, long count) + { + this(partition, offset, count, DeserializerType.DEFAULT); } public PartitionOffsetInfo() @@ -227,5 +261,15 @@ public void setCount(Long count) { this.count = count; } + + public DeserializerType getDeserializer() + { + return deserializer; + } + + public void setDeserializer(DeserializerType deserializer) + { + this.deserializer = deserializer; + } } } diff --git a/src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java new file mode 100644 index 0000000..fed67d9 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java @@ -0,0 +1,53 @@ +package com.homeadvisor.kafdrop.service; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import com.homeadvisor.kafdrop.util.ByteUtils; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; + + +public class AvroMessageDeserializer implements MessageDeserializer { + + private String topicName; + private String schemaRegistryUrl; + + public AvroMessageDeserializer(String topicName, String schemaRegistryUrl) { + this.topicName = topicName; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + @Override + public String deserializeMessage(ByteBuffer buffer) { + KafkaAvroDeserializer deserializer = getDeserializer(); + + // Convert byte buffer to byte array + byte[] bytes = ByteUtils.convertToByteArray(buffer); + + return formatJsonMessage(deserializer.deserialize(topicName, bytes).toString()); + } + + private String formatJsonMessage(String jsonMessage) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(jsonMessage); + String formattedJsonMessage = gson.toJson(element); + return formattedJsonMessage; + } + + private KafkaAvroDeserializer getDeserializer() { + Map config = new HashMap<>(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); + kafkaAvroDeserializer.configure(config, false); + return kafkaAvroDeserializer; + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java new file mode 100644 index 0000000..7fd5b7e --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java @@ -0,0 +1,15 @@ +package com.homeadvisor.kafdrop.service; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import com.homeadvisor.kafdrop.util.ByteUtils; + +public class DefaultMessageDeserializer implements MessageDeserializer { + + @Override + public String deserializeMessage(ByteBuffer buffer) { + return ByteUtils.readString(buffer); + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java new file mode 100644 index 0000000..a7a9b0a --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java @@ -0,0 +1,10 @@ +package com.homeadvisor.kafdrop.service; + +import java.nio.ByteBuffer; + + +public interface MessageDeserializer { + + public String deserializeMessage(ByteBuffer buffer); + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 6b2fec8..9ef39f5 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -18,14 +18,12 @@ package com.homeadvisor.kafdrop.service; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicPartitionVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.util.BrokerChannel; +import com.homeadvisor.kafdrop.util.ByteUtils; + import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.FetchResponse; @@ -34,7 +32,6 @@ import kafka.message.Message; import kafka.message.MessageAndOffset; -import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,12 +39,12 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import javax.annotation.Nullable; - import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; @@ -65,7 +62,7 @@ public List getMessages( int partitionId, long offset, long count, - @Nullable String schemaRegistryUrl) + MessageDeserializer deserializer) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -98,7 +95,7 @@ public List getMessages( StreamSupport.stream(messageSet.spliterator(), false) .limit(count - messages.size()) .map(MessageAndOffset::message) - .map(m -> createMessage(m, topicName, schemaRegistryUrl)) + .map(m -> createMessage(m, deserializer)) .forEach(messages::add); currentOffset += messages.size() - oldSize; } @@ -107,21 +104,16 @@ public List getMessages( .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message, String topicName, @Nullable String schemaRegistryUrl) + private MessageVO createMessage(Message message, MessageDeserializer deserializer) { MessageVO vo = new MessageVO(); if (message.hasKey()) { - vo.setKey(readString(message.key())); + vo.setKey(ByteUtils.readString(message.key())); } if (!message.isNull()) { - final String messageString; - if (schemaRegistryUrl == null) { - messageString = readString(message.payload()); - } else { - messageString = deserializeMessage(message.payload(), topicName, schemaRegistryUrl); - } + final String messageString = deserializer.deserializeMessage(message.payload()); vo.setMessage(messageString); } @@ -133,71 +125,4 @@ private MessageVO createMessage(Message message, String topicName, @Nullable Str return vo; } - // https://www.programcreek.com/java-api-examples/index.php?api=io.confluent.kafka.serializers.KafkaAvroDeserializer - private String deserializeMessage(ByteBuffer buffer, String topicName, String schemaRegistryUrl) - { - KafkaAvroDeserializer deserializer = getDeserializer(schemaRegistryUrl); - - // Convert byte buffer to byte array - byte[] bytes = convertToBytes(buffer); - - return formatJsonMessage(deserializer.deserialize(topicName, bytes).toString()); - } - - private String formatJsonMessage(String jsonMessage) { - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - JsonParser parser = new JsonParser(); - JsonElement element = parser.parse(jsonMessage); - String formattedJsonMessage = gson.toJson(element); - return formattedJsonMessage; - } - - private KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl) { - Map config = new HashMap<>(); - config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); - kafkaAvroDeserializer.configure(config, false); - return kafkaAvroDeserializer; - } - - private byte[] convertToBytes(ByteBuffer buffer) - { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes, 0, bytes.length); - return bytes; - } - - private String readString(ByteBuffer buffer) - { - try - { - return new String(readBytes(buffer), "UTF-8"); - } - catch (UnsupportedEncodingException e) - { - return ""; - } - } - - private byte[] readBytes(ByteBuffer buffer) - { - return readBytes(buffer, 0, buffer.limit()); - } - - private byte[] readBytes(ByteBuffer buffer, int offset, int size) - { - byte[] dest = new byte[size]; - if (buffer.hasArray()) - { - System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size); - } - else - { - buffer.mark(); - buffer.get(dest); - buffer.reset(); - } - return dest; - } - } diff --git a/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java b/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java new file mode 100644 index 0000000..e6d5a2c --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java @@ -0,0 +1,48 @@ +package com.homeadvisor.kafdrop.util; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class ByteUtils { + + public static String readString(ByteBuffer buffer) + { + try + { + return new String(readBytes(buffer), "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + return ""; + } + } + + private static byte[] readBytes(ByteBuffer buffer) + { + return readBytes(buffer, 0, buffer.limit()); + } + + private static byte[] readBytes(ByteBuffer buffer, int offset, int size) + { + byte[] dest = new byte[size]; + if (buffer.hasArray()) + { + System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size); + } + else + { + buffer.mark(); + buffer.get(dest); + buffer.reset(); + } + return dest; + } + + public static byte[] convertToByteArray(ByteBuffer buffer) + { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes, 0, bytes.length); + return bytes; + } + +} diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index a09b84d..67d835c 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -31,6 +31,7 @@

Topic Messages: ${topic.name}

<#assign selectedPartition=messageForm.partition!0?number> +<#assign selectedDeserializer=messageForm.deserializer!defaultDeserializerType>
<#assign curPartition=topic.getPartition(selectedPartition).get()> @@ -69,6 +70,15 @@
+
+ + +
+ From e738ca994db7a957de2f93ce687cb2f28fef6cdb Mon Sep 17 00:00:00 2001 From: lcary Date: Wed, 12 Dec 2018 23:00:14 -0500 Subject: [PATCH 13/14] Refactor deserializers to utils and add default message format (#3) * Refactor deserializers to utils directory and add default message format config * Update readme for new message format config setting --- README.md | 8 +++- .../config/MessageFormatConfiguration.java | 42 +++++++++++++++++ .../config/SchemaRegistryConfiguration.java | 45 +++++++++--------- .../kafdrop/controller/MessageController.java | 46 ++++++++++--------- .../kafdrop/service/MessageInspector.java | 1 + .../AvroMessageDeserializer.java | 3 +- .../DefaultMessageDeserializer.java | 4 +- .../MessageDeserializer.java | 2 +- .../kafdrop/util/MessageFormat.java | 5 ++ .../resources/templates/message-inspector.ftl | 10 ++-- 10 files changed, 111 insertions(+), 55 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java rename src/main/java/com/homeadvisor/kafdrop/{service => util}/AvroMessageDeserializer.java (95%) rename src/main/java/com/homeadvisor/kafdrop/{service => util}/DefaultMessageDeserializer.java (75%) rename src/main/java/com/homeadvisor/kafdrop/{service => util}/MessageDeserializer.java (76%) create mode 100644 src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java diff --git a/README.md b/README.md index 92374d5..183f0bf 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,17 @@ Then open a browser and navigate to http://localhost:9000. The port can be overr --server.port= ``` -Additionally, you can configure a schema registry connection with: +Additionally, you can optionally configure a schema registry connection with: ``` --schemaregistry.connect=http://localhost:8081 ``` +Finally, a default message format (e.g. to deserialize Avro messages) can optionally be configured as follows: +``` + --message.format=AVRO +``` +Valid format values are "DEFAULT" and "AVRO". This setting can also be configured at the topic level via dropdown when viewing messages. + ## Running with Docker Note for Mac Users: You need to convert newline formatting of the kafdrop.sh file *before* running this command: diff --git a/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java new file mode 100644 index 0000000..8bdc9a1 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java @@ -0,0 +1,42 @@ +package com.homeadvisor.kafdrop.config; + +import javax.annotation.PostConstruct; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import com.homeadvisor.kafdrop.util.MessageFormat; + + +@Configuration +public class MessageFormatConfiguration { + + @Component + @ConfigurationProperties(prefix = "message") + public static class MessageFormatProperties + { + + private MessageFormat format; + + @PostConstruct + public void init() { + // Set a default message format if not configured. + if (format == null) { + format = MessageFormat.DEFAULT; + } + } + + public MessageFormat getFormat() + { + return format; + } + + public void setFormat(MessageFormat format) + { + this.format = format; + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java index d44fefe..78d7119 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -8,36 +8,37 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; + @Configuration public class SchemaRegistryConfiguration { - @Component - @ConfigurationProperties(prefix = "schemaregistry") - public static class SchemaRegistryProperties - { + @Component + @ConfigurationProperties(prefix = "schemaregistry") + public static class SchemaRegistryProperties + { - public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); + public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); - private String connect; + private String connect; - public String getConnect() - { - return connect; - } + public String getConnect() + { + return connect; + } - public void setConnect(String connect) - { - this.connect = connect; - } + public void setConnect(String connect) + { + this.connect = connect; + } - public List getConnectList() - { - return CONNECT_SEPARATOR.splitAsStream(this.connect) - .map(String::trim) - .filter(s -> s.length() > 0) - .collect(Collectors.toList()); - } + public List getConnectList() + { + return CONNECT_SEPARATOR.splitAsStream(this.connect) + .map(String::trim) + .filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } - } + } } diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index 62957a5..13210d5 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -20,15 +20,18 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.homeadvisor.kafdrop.config.MessageFormatConfiguration; import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; -import com.homeadvisor.kafdrop.service.AvroMessageDeserializer; -import com.homeadvisor.kafdrop.service.DefaultMessageDeserializer; import com.homeadvisor.kafdrop.service.KafkaMonitor; -import com.homeadvisor.kafdrop.service.MessageDeserializer; import com.homeadvisor.kafdrop.service.MessageInspector; import com.homeadvisor.kafdrop.service.TopicNotFoundException; +import com.homeadvisor.kafdrop.util.AvroMessageDeserializer; +import com.homeadvisor.kafdrop.util.DefaultMessageDeserializer; +import com.homeadvisor.kafdrop.util.MessageDeserializer; +import com.homeadvisor.kafdrop.util.MessageFormat; + import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; @@ -56,11 +59,10 @@ public class MessageController private MessageInspector messageInspector; @Autowired - private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + private MessageFormatConfiguration.MessageFormatProperties messageFormatProperties; - public static enum DeserializerType { - DEFAULT, AVRO; - } + @Autowired + private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; /** * Human friendly view of reading messages. @@ -76,6 +78,8 @@ public String viewMessageForm(@PathVariable("name") String topicName, BindingResult errors, Model model) { + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + if (messageForm.isEmpty()) { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); @@ -83,7 +87,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, defaultForm.setCount(10l); defaultForm.setOffset(0l); defaultForm.setPartition(0); - defaultForm.setDeserializer(DeserializerType.DEFAULT); + defaultForm.setFormat(defaultFormat); model.addAttribute("messageForm", defaultForm); } @@ -92,13 +96,13 @@ public String viewMessageForm(@PathVariable("name") String topicName, .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); - model.addAttribute("defaultDeserializerType", DeserializerType.DEFAULT); - model.addAttribute("deserializerTypes", DeserializerType.values()); + model.addAttribute("defaultFormat", defaultFormat); + model.addAttribute("messageFormats", MessageFormat.values()); if (!messageForm.isEmpty() && !errors.hasErrors()) { final MessageDeserializer deserializer = getDeserializer( - topicName, messageForm.getDeserializer()); + topicName, messageForm.getFormat()); model.addAttribute("messages", messageInspector.getMessages(topicName, @@ -163,10 +167,10 @@ public String viewMessageForm(@PathVariable("name") String topicName, } } - private MessageDeserializer getDeserializer(String topicName, DeserializerType deserializerType) { + private MessageDeserializer getDeserializer(String topicName, MessageFormat format) { final MessageDeserializer deserializer; - if (deserializerType == DeserializerType.AVRO) { + if (format == MessageFormat.AVRO) { final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl); } else { @@ -206,19 +210,19 @@ public static class PartitionOffsetInfo @JsonProperty("lastOffset") private Long count; - private DeserializerType deserializer; + private MessageFormat format; - public PartitionOffsetInfo(int partition, long offset, long count, DeserializerType deserializer) + public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) { this.partition = partition; this.offset = offset; this.count = count; - this.deserializer = deserializer; + this.format = format; } public PartitionOffsetInfo(int partition, long offset, long count) { - this(partition, offset, count, DeserializerType.DEFAULT); + this(partition, offset, count, MessageFormat.DEFAULT); } public PartitionOffsetInfo() @@ -262,14 +266,14 @@ public void setCount(Long count) this.count = count; } - public DeserializerType getDeserializer() + public MessageFormat getFormat() { - return deserializer; + return format; } - public void setDeserializer(DeserializerType deserializer) + public void setFormat(MessageFormat format) { - this.deserializer = deserializer; + this.format = format; } } } diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 9ef39f5..a2d19a2 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -23,6 +23,7 @@ import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.util.BrokerChannel; import com.homeadvisor.kafdrop.util.ByteUtils; +import com.homeadvisor.kafdrop.util.MessageDeserializer; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; diff --git a/src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java similarity index 95% rename from src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java rename to src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java index fed67d9..2f50d32 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/AvroMessageDeserializer.java +++ b/src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java @@ -1,10 +1,9 @@ -package com.homeadvisor.kafdrop.service; +package com.homeadvisor.kafdrop.util; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; -import com.homeadvisor.kafdrop.util.ByteUtils; import java.nio.ByteBuffer; import java.util.HashMap; diff --git a/src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java similarity index 75% rename from src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java rename to src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java index 7fd5b7e..09a4279 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/DefaultMessageDeserializer.java +++ b/src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java @@ -1,10 +1,8 @@ -package com.homeadvisor.kafdrop.service; +package com.homeadvisor.kafdrop.util; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import com.homeadvisor.kafdrop.util.ByteUtils; - public class DefaultMessageDeserializer implements MessageDeserializer { @Override diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java similarity index 76% rename from src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java rename to src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java index a7a9b0a..7dc012d 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageDeserializer.java +++ b/src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java @@ -1,4 +1,4 @@ -package com.homeadvisor.kafdrop.service; +package com.homeadvisor.kafdrop.util; import java.nio.ByteBuffer; diff --git a/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java b/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java new file mode 100644 index 0000000..715a6d9 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java @@ -0,0 +1,5 @@ +package com.homeadvisor.kafdrop.util; + +public enum MessageFormat { + DEFAULT, AVRO; +} diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 67d835c..ed96755 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -31,7 +31,7 @@

Topic Messages: ${topic.name}

<#assign selectedPartition=messageForm.partition!0?number> -<#assign selectedDeserializer=messageForm.deserializer!defaultDeserializerType> +<#assign selectedFormat=messageForm.format!defaultFormat>
<#assign curPartition=topic.getPartition(selectedPartition).get()> @@ -71,10 +71,10 @@
- - + <#list messageFormats as f> +
From 2c935cba5c4057ee144dc28cfdbca22d5f15b943 Mon Sep 17 00:00:00 2001 From: Luc Cary Date: Wed, 12 Dec 2018 23:27:45 -0500 Subject: [PATCH 14/14] Remove unused imports from MessageInspector --- .../com/homeadvisor/kafdrop/service/MessageInspector.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 34b5411..a2d19a2 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -18,10 +18,6 @@ package com.homeadvisor.kafdrop.service; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicPartitionVO; import com.homeadvisor.kafdrop.model.TopicVO; @@ -44,7 +40,9 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport;