Skip to content

Commit

Permalink
Forward topic from console consumer to deserializer (apache#5704)
Browse files Browse the repository at this point in the history
Some deserializer needs the topic name to be able to correctly deserialize the payload of the message.
Console consumer works great with Deserializer<String> however it calls deserializer with topic set as null.
This breaks the API and the topic information is available in the ConsumerRecord.

Reviewers: Manikumar Reddy <[email protected]>, Chia-Ping Tsai <[email protected]>, Gardner Vickers <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
mchataigner authored and junrao committed Nov 29, 2018
1 parent 2c305dc commit fb9f2d8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(lineSeparator)
}

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
}
Expand All @@ -527,12 +527,12 @@ class DefaultMessageFormatter extends MessageFormatter {
}

if (printKey) {
write(keyDeserializer, key)
write(keyDeserializer, key, topic)
writeSeparator(printValue)
}

if (printValue) {
write(valueDeserializer, value)
write(valueDeserializer, value, topic)
output.write(lineSeparator)
}
}
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.tools

import java.io.PrintStream

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Deserializer
import org.hamcrest.CoreMatchers
import org.junit.Test
import org.junit.Assert.assertThat
import org.scalatest.mockito.MockitoSugar

class CustomDeserializer extends Deserializer[String] {
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
}

override def deserialize(topic: String, data: Array[Byte]): String = {
assertThat("topic must not be null", topic, CoreMatchers.notNullValue())
new String(data)
}

override def close(): Unit = {
}
}

class CustomDeserializerTest extends MockitoSugar {

@Test
def checkDeserializerTopicIsNotNull(): Unit = {
val formatter = new DefaultMessageFormatter()
formatter.keyDeserializer = Some(new CustomDeserializer)

formatter.writeTo(new ConsumerRecord("topic_test", 1, 1l, "key".getBytes, "value".getBytes), mock[PrintStream])

formatter.close()
}
}

0 comments on commit fb9f2d8

Please sign in to comment.