diff --git a/README.md b/README.md index 168cc65..183f0bf 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ Kafdrop is a UI for monitoring Apache Kafka clusters. The tool displays informat * Kafka (0.8.1 or 0.8.2 is known to work) * Zookeeper (3.4.5 or later) +Optional, additional integration: + +* Schema Registry + ## Building After cloning the repository, building should just be a matter of running a standard Maven build: @@ -30,6 +34,17 @@ Then open a browser and navigate to http://localhost:9000. The port can be overr --server.port= ``` +Additionally, you can optionally configure a schema registry connection with: +``` + --schemaregistry.connect=http://localhost:8081 +``` + +Finally, a default message format (e.g. to deserialize Avro messages) can optionally be configured as follows: +``` + --message.format=AVRO +``` +Valid format values are "DEFAULT" and "AVRO". This setting can also be configured at the topic level via dropdown when viewing messages. + ## Running with Docker Note for Mac Users: You need to convert newline formatting of the kafdrop.sh file *before* running this command: diff --git a/pom.xml b/pom.xml index cc226b5..1e73b39 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,17 @@ HEAD + + + central + http://repo1.maven.org/maven2 + + + confluent + http://packages.confluent.io/maven/ + + + @@ -73,6 +84,31 @@ spring-retry 1.1.3.RELEASE + + io.confluent + kafka-avro-serializer + 3.2.1 + + + org.apache.avro + avro + 1.8.1 + + + org.apache.kafka + kafka-clients + 0.10.2.2 + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.google.code.gson + gson + 2.8.5 + @@ -152,13 +188,6 @@ 0.7.1 test - - - org.apache.kafka - kafka-clients - 0.8.2.2 - test - @@ -313,7 +342,7 @@ org.apache.kafka kafka-clients - 0.8.2.2 + 0.10.2.2 diff --git a/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java new file mode 100644 index 0000000..8bdc9a1 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/MessageFormatConfiguration.java @@ -0,0 +1,42 @@ +package com.homeadvisor.kafdrop.config; + +import javax.annotation.PostConstruct; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import com.homeadvisor.kafdrop.util.MessageFormat; + + +@Configuration +public class MessageFormatConfiguration { + + @Component + @ConfigurationProperties(prefix = "message") + public static class MessageFormatProperties + { + + private MessageFormat format; + + @PostConstruct + public void init() { + // Set a default message format if not configured. + if (format == null) { + format = MessageFormat.DEFAULT; + } + } + + public MessageFormat getFormat() + { + return format; + } + + public void setFormat(MessageFormat format) + { + this.format = format; + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java new file mode 100644 index 0000000..78d7119 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/SchemaRegistryConfiguration.java @@ -0,0 +1,44 @@ +package com.homeadvisor.kafdrop.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +@Configuration +public class SchemaRegistryConfiguration { + + @Component + @ConfigurationProperties(prefix = "schemaregistry") + public static class SchemaRegistryProperties + { + + public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*"); + + private String connect; + + public String getConnect() + { + return connect; + } + + public void setConnect(String connect) + { + this.connect = connect; + } + + public List getConnectList() + { + return CONNECT_SEPARATOR.splitAsStream(this.connect) + .map(String::trim) + .filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index b08fb09..5325147 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -20,11 +20,18 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.homeadvisor.kafdrop.config.MessageFormatConfiguration; +import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.KafkaMonitor; import com.homeadvisor.kafdrop.service.MessageInspector; import com.homeadvisor.kafdrop.service.TopicNotFoundException; +import com.homeadvisor.kafdrop.util.AvroMessageDeserializer; +import com.homeadvisor.kafdrop.util.DefaultMessageDeserializer; +import com.homeadvisor.kafdrop.util.MessageDeserializer; +import com.homeadvisor.kafdrop.util.MessageFormat; + import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; @@ -51,6 +58,12 @@ public class MessageController @Autowired private MessageInspector messageInspector; + @Autowired + private MessageFormatConfiguration.MessageFormatProperties messageFormatProperties; + + @Autowired + private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + /** * Human friendly view of reading messages. * @param topicName Name of topic @@ -65,10 +78,17 @@ public String viewMessageForm(@PathVariable("name") String topicName, BindingResult errors, Model model) { + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + if (messageForm.isEmpty()) { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); - defaultForm.setCount(1l); + + defaultForm.setCount(10l); + defaultForm.setOffset(0l); + defaultForm.setPartition(0); + defaultForm.setFormat(defaultFormat); + model.addAttribute("messageForm", defaultForm); } @@ -76,13 +96,21 @@ public String viewMessageForm(@PathVariable("name") String topicName, .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); + model.addAttribute("defaultFormat", defaultFormat); + model.addAttribute("messageFormats", MessageFormat.values()); + if (!messageForm.isEmpty() && !errors.hasErrors()) { + final MessageDeserializer deserializer = getDeserializer( + topicName, messageForm.getFormat()); + model.addAttribute("messages", messageInspector.getMessages(topicName, messageForm.getPartition(), messageForm.getOffset(), - messageForm.getCount())); + messageForm.getCount(), + deserializer)); + } return "message-inspector"; @@ -120,12 +148,16 @@ public String viewMessageForm(@PathVariable("name") String topicName, } else { + // Currently, only default deserialization supported via JSON API. + final MessageDeserializer deserializer = new DefaultMessageDeserializer(); + List messages = new ArrayList<>(); List vos = messageInspector.getMessages( topicName, partition, offset, - count); + count, + deserializer); if(vos != null) { @@ -136,6 +168,19 @@ public String viewMessageForm(@PathVariable("name") String topicName, } } + private MessageDeserializer getDeserializer(String topicName, MessageFormat format) { + final MessageDeserializer deserializer; + + if (format == MessageFormat.AVRO) { + final String schemaRegistryUrl = schemaRegistryProperties.getConnect(); + deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl); + } else { + deserializer = new DefaultMessageDeserializer(); + } + + return deserializer; + } + /** * Encapsulates offset data for a single partition. */ @@ -166,11 +211,19 @@ public static class PartitionOffsetInfo @JsonProperty("lastOffset") private Long count; - public PartitionOffsetInfo(int partition, long offset, long count) + private MessageFormat format; + + public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) { this.partition = partition; this.offset = offset; this.count = count; + this.format = format; + } + + public PartitionOffsetInfo(int partition, long offset, long count) + { + this(partition, offset, count, MessageFormat.DEFAULT); } public PartitionOffsetInfo() @@ -213,5 +266,15 @@ public void setCount(Long count) { this.count = count; } + + public MessageFormat getFormat() + { + return format; + } + + public void setFormat(MessageFormat format) + { + this.format = format; + } } } diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index 5143bda..a2d19a2 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -22,6 +22,9 @@ import com.homeadvisor.kafdrop.model.TopicPartitionVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.util.BrokerChannel; +import com.homeadvisor.kafdrop.util.ByteUtils; +import com.homeadvisor.kafdrop.util.MessageDeserializer; + import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.FetchResponse; @@ -29,6 +32,7 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,6 +46,10 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; + + @Service public class MessageInspector { @@ -50,7 +58,12 @@ public class MessageInspector @Autowired private KafkaMonitor kafkaMonitor; - public List getMessages(String topicName, int partitionId, long offset, long count) + public List getMessages( + String topicName, + int partitionId, + long offset, + long count, + MessageDeserializer deserializer) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -83,7 +96,7 @@ public List getMessages(String topicName, int partitionId, long offse StreamSupport.stream(messageSet.spliterator(), false) .limit(count - messages.size()) .map(MessageAndOffset::message) - .map(this::createMessage) + .map(m -> createMessage(m, deserializer)) .forEach(messages::add); currentOffset += messages.size() - oldSize; } @@ -92,16 +105,17 @@ public List getMessages(String topicName, int partitionId, long offse .orElseGet(Collections::emptyList); } - private MessageVO createMessage(Message message) + private MessageVO createMessage(Message message, MessageDeserializer deserializer) { MessageVO vo = new MessageVO(); if (message.hasKey()) { - vo.setKey(readString(message.key())); + vo.setKey(ByteUtils.readString(message.key())); } if (!message.isNull()) { - vo.setMessage(readString(message.payload())); + final String messageString = deserializer.deserializeMessage(message.payload()); + vo.setMessage(messageString); } vo.setValid(message.isValid()); @@ -112,37 +126,4 @@ private MessageVO createMessage(Message message) return vo; } - private String readString(ByteBuffer buffer) - { - try - { - return new String(readBytes(buffer), "UTF-8"); - } - catch (UnsupportedEncodingException e) - { - return ""; - } - } - - private byte[] readBytes(ByteBuffer buffer) - { - return readBytes(buffer, 0, buffer.limit()); - } - - private byte[] readBytes(ByteBuffer buffer, int offset, int size) - { - byte[] dest = new byte[size]; - if (buffer.hasArray()) - { - System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size); - } - else - { - buffer.mark(); - buffer.get(dest); - buffer.reset(); - } - return dest; - } - } diff --git a/src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java new file mode 100644 index 0000000..2f50d32 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/AvroMessageDeserializer.java @@ -0,0 +1,52 @@ +package com.homeadvisor.kafdrop.util; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; + + +public class AvroMessageDeserializer implements MessageDeserializer { + + private String topicName; + private String schemaRegistryUrl; + + public AvroMessageDeserializer(String topicName, String schemaRegistryUrl) { + this.topicName = topicName; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + @Override + public String deserializeMessage(ByteBuffer buffer) { + KafkaAvroDeserializer deserializer = getDeserializer(); + + // Convert byte buffer to byte array + byte[] bytes = ByteUtils.convertToByteArray(buffer); + + return formatJsonMessage(deserializer.deserialize(topicName, bytes).toString()); + } + + private String formatJsonMessage(String jsonMessage) { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(jsonMessage); + String formattedJsonMessage = gson.toJson(element); + return formattedJsonMessage; + } + + private KafkaAvroDeserializer getDeserializer() { + Map config = new HashMap<>(); + config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); + kafkaAvroDeserializer.configure(config, false); + return kafkaAvroDeserializer; + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java b/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java new file mode 100644 index 0000000..e6d5a2c --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/ByteUtils.java @@ -0,0 +1,48 @@ +package com.homeadvisor.kafdrop.util; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class ByteUtils { + + public static String readString(ByteBuffer buffer) + { + try + { + return new String(readBytes(buffer), "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + return ""; + } + } + + private static byte[] readBytes(ByteBuffer buffer) + { + return readBytes(buffer, 0, buffer.limit()); + } + + private static byte[] readBytes(ByteBuffer buffer, int offset, int size) + { + byte[] dest = new byte[size]; + if (buffer.hasArray()) + { + System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size); + } + else + { + buffer.mark(); + buffer.get(dest); + buffer.reset(); + } + return dest; + } + + public static byte[] convertToByteArray(ByteBuffer buffer) + { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes, 0, bytes.length); + return bytes; + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java new file mode 100644 index 0000000..09a4279 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/DefaultMessageDeserializer.java @@ -0,0 +1,13 @@ +package com.homeadvisor.kafdrop.util; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class DefaultMessageDeserializer implements MessageDeserializer { + + @Override + public String deserializeMessage(ByteBuffer buffer) { + return ByteUtils.readString(buffer); + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java b/src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java new file mode 100644 index 0000000..7dc012d --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/MessageDeserializer.java @@ -0,0 +1,10 @@ +package com.homeadvisor.kafdrop.util; + +import java.nio.ByteBuffer; + + +public interface MessageDeserializer { + + public String deserializeMessage(ByteBuffer buffer); + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java b/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java new file mode 100644 index 0000000..715a6d9 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/MessageFormat.java @@ -0,0 +1,5 @@ +package com.homeadvisor.kafdrop.util; + +public enum MessageFormat { + DEFAULT, AVRO; +} diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 0f7d855..ed96755 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -31,6 +31,7 @@

Topic Messages: ${topic.name}

<#assign selectedPartition=messageForm.partition!0?number> +<#assign selectedFormat=messageForm.format!defaultFormat>
<#assign curPartition=topic.getPartition(selectedPartition).get()> @@ -69,6 +70,15 @@
+
+ + +
+ @@ -91,7 +101,7 @@ <#elseif !(spring.status.error) && !(messageForm.empty)> - No messages found in partition ${messageForm.partition} at offset ${messageForm.offset} + No messages found in partition ${(messageForm.partition)!"PARTITION_NOT_SET"} at offset ${messageForm.offset}