From c9a5354f24613a8edc8cd532d140213c55a99015 Mon Sep 17 00:00:00 2001
From: Neil Buesing <neil.buesing@objectpartners.com>
Date: Tue, 25 Dec 2018 15:32:13 -0600
Subject: [PATCH] adding PR https://github.com/HomeAdvisor/Kafdrop/pull/34 so
 upgrade to 0.11.0.3 of Kafka

---
 pom.xml                                       |  43 +-
 .../kafdrop/service/CuratorKafkaMonitor.java  | 431 +++++++++---------
 2 files changed, 245 insertions(+), 229 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1e73b39..973b4aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,14 +2,14 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
-   <groupId>com.homeadvisor.kafka</groupId>
+   <groupId>com.neilbuesing.kafka</groupId>
    <artifactId>kafdrop</artifactId>
    <version>2.0.6</version>
 
    <description>For when you have a Kaf(ka) cluster to monitor</description>
 
    <properties>
-      <spring.boot.version>1.3.6.RELEASE</spring.boot.version>
+      <spring.boot.version>1.5.10.RELEASE</spring.boot.version>
       <additionalparam>-Xdoclint:none</additionalparam>
       <curator.version>2.10.0</curator.version>
 
@@ -19,8 +19,8 @@
    </properties>
 
    <scm>
-       <connection>scm:git:git@github.com:HomeAdvisor/Kafdrop.git</connection>
-       <developerConnection>scm:git:git@github.com:HomeAdvisor/Kafdrop.git</developerConnection>
+       <connection>scm:git:git@github.com:nbuesing/Kafdrop.git</connection>
+       <developerConnection>scm:git:git@github.com:nbuesing/Kafdrop.git</developerConnection>
        <tag>HEAD</tag>
    </scm>
 
@@ -67,13 +67,29 @@
       <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
-         <version>3.4.8</version>
+         <version>3.4.13</version>
       </dependency>
+      <!--<dependency>-->
+         <!--<groupId>org.apache.kafka</groupId>-->
+         <!--<artifactId>kafka_2.9.2</artifactId>-->
+         <!--<version>0.8.2.2</version>-->
+      <!--</dependency>-->
       <dependency>
          <groupId>org.apache.kafka</groupId>
-         <artifactId>kafka_2.9.2</artifactId>
-         <version>0.8.2.2</version>
+         <artifactId>kafka_2.11</artifactId>
+         <version>0.11.0.3</version>
       </dependency>
+      <dependency>
+         <groupId>org.apache.kafka</groupId>
+         <artifactId>kafka-clients</artifactId>
+         <version>0.11.0.3</version>
+      </dependency>
+      <!--<dependency>-->
+         <!--<groupId>org.apache.kafka</groupId>-->
+         <!--<artifactId>kafka_2.12</artifactId>-->
+         <!--<version>0.10.2.2</version>-->
+      <!--</dependency>-->
+
       <dependency>
          <groupId>org.freemarker</groupId>
          <artifactId>freemarker</artifactId>
@@ -82,7 +98,7 @@
       <dependency>
          <groupId>org.springframework.retry</groupId>
          <artifactId>spring-retry</artifactId>
-         <version>1.1.3.RELEASE</version>
+         <version>1.2.2.RELEASE</version>
       </dependency>
       <dependency>
          <groupId>io.confluent</groupId>
@@ -92,12 +108,7 @@
       <dependency>
          <groupId>org.apache.avro</groupId>
          <artifactId>avro</artifactId>
-         <version>1.8.1</version>
-      </dependency>
-      <dependency>
-         <groupId>org.apache.kafka</groupId>
-         <artifactId>kafka-clients</artifactId>
-         <version>0.10.2.2</version>
+         <version>1.8.2</version>
       </dependency>
       <dependency>
          <groupId>com.google.code.findbugs</groupId>
@@ -121,6 +132,10 @@
             </exclusion>
          </exclusions>
       </dependency>
+      <dependency>
+         <groupId>org.springframework.boot</groupId>
+         <artifactId>spring-boot-starter-tomcat</artifactId>
+      </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-freemarker</artifactId>
diff --git a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
index 84df01b..9ff5b8f 100644
--- a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
+++ b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
@@ -24,13 +24,14 @@
 import com.homeadvisor.kafdrop.model.*;
 import com.homeadvisor.kafdrop.util.BrokerChannel;
 import com.homeadvisor.kafdrop.util.Version;
-import kafka.api.ConsumerMetadataRequest;
+import kafka.api.GroupCoordinatorRequest;
 import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
 import kafka.common.ErrorMapping;
 import kafka.common.TopicAndPartition;
 import kafka.javaapi.*;
 import kafka.network.BlockingChannel;
+import kafka.server.ConfigType;
 import kafka.utils.ZKGroupDirs;
 import kafka.utils.ZKGroupTopicDirs;
 import kafka.utils.ZkUtils;
@@ -107,9 +108,9 @@ public void start() throws Exception
       backOffPolicy.setBackOffPeriod(properties.getRetry().getBackoffMillis());
 
       final SimpleRetryPolicy retryPolicy =
-         new SimpleRetryPolicy(properties.getRetry().getMaxAttempts(),
-                               ImmutableMap.of(InterruptedException.class, false,
-                                               Exception.class, true));
+        new SimpleRetryPolicy(properties.getRetry().getMaxAttempts(),
+          ImmutableMap.of(InterruptedException.class, false,
+            Exception.class, true));
 
       retryTemplate = new RetryTemplate();
       retryTemplate.setBackOffPolicy(backOffPolicy);
@@ -128,7 +129,7 @@ public void start() throws Exception
       });
       brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT);
 
-      topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true);
+      topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.getEntityConfigPath(ConfigType.Topic()), true);
       topicConfigPathCache.getListenable().addListener((f, e) -> {
          if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
          {
@@ -172,25 +173,25 @@ private String clientId()
    private void updateController()
    {
       Optional.ofNullable(controllerNodeCache.getCurrentData())
-         .map(data -> {
-            try
-            {
-               Map controllerData = objectMapper.reader(Map.class).readValue(data.getData());
-               return (Integer) controllerData.get("brokerid");
-            }
-            catch (IOException e)
-            {
-               LOG.error("Unable to read controller data", e);
-               return null;
-            }
-         })
-         .ifPresent(this::updateController);
+        .map(data -> {
+           try
+           {
+              Map controllerData = objectMapper.reader(Map.class).readValue(data.getData());
+              return (Integer) controllerData.get("brokerid");
+           }
+           catch (IOException e)
+           {
+              LOG.error("Unable to read controller data", e);
+              return null;
+           }
+        })
+        .ifPresent(this::updateController);
    }
 
    private void updateController(int brokerId)
    {
       brokerCache.values()
-         .forEach(broker -> broker.setController(broker.getId() == brokerId));
+        .forEach(broker -> broker.setController(broker.getId() == brokerId));
    }
 
    private void validateInitialized()
@@ -257,7 +258,7 @@ private BrokerChannel brokerChannel(Integer brokerId)
 
       Integer finalBrokerId = brokerId;
       BrokerVO broker = getBroker(brokerId)
-         .orElseThrow(() -> new BrokerNotFoundException("Broker " + finalBrokerId + " is not available"));
+                          .orElseThrow(() -> new BrokerNotFoundException("Broker " + finalBrokerId + " is not available"));
 
       return BrokerChannel.forBroker(broker.getHost(), broker.getPort());
    }
@@ -284,33 +285,33 @@ public ClusterSummaryVO getClusterSummary()
    @Override
    public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {
       final ClusterSummaryVO topicSummary = topics.stream()
-              .map(topic -> {
-                 ClusterSummaryVO summary = new ClusterSummaryVO();
-                 summary.setPartitionCount(topic.getPartitions().size());
-                 summary.setUnderReplicatedCount(topic.getUnderReplicatedPartitions().size());
-                 summary.setPreferredReplicaPercent(topic.getPreferredReplicaPercent());
-                 topic.getPartitions()
-                         .forEach(partition -> {
-                            if (partition.getLeader() != null) {
-                               summary.addBrokerLeaderPartition(partition.getLeader().getId());
-                            }
-                            if (partition.getPreferredLeader() != null) {
-                               summary.addBrokerPreferredLeaderPartition(partition.getPreferredLeader().getId());
-                            }
-                            partition.getReplicas()
-                                    .forEach(replica -> summary.addExpectedBrokerId(replica.getId()));
-                         });
-                 return summary;
-              })
-              .reduce((s1, s2) -> {
-                 s1.setPartitionCount(s1.getPartitionCount() + s2.getPartitionCount());
-                 s1.setUnderReplicatedCount(s1.getUnderReplicatedCount() + s2.getUnderReplicatedCount());
-                 s1.setPreferredReplicaPercent(s1.getPreferredReplicaPercent() + s2.getPreferredReplicaPercent());
-                 s2.getBrokerLeaderPartitionCount().forEach(s1::addBrokerLeaderPartition);
-                 s2.getBrokerPreferredLeaderPartitionCount().forEach(s1::addBrokerPreferredLeaderPartition);
-                 return s1;
-              })
-              .orElseGet(ClusterSummaryVO::new);
+                                              .map(topic -> {
+                                                 ClusterSummaryVO summary = new ClusterSummaryVO();
+                                                 summary.setPartitionCount(topic.getPartitions().size());
+                                                 summary.setUnderReplicatedCount(topic.getUnderReplicatedPartitions().size());
+                                                 summary.setPreferredReplicaPercent(topic.getPreferredReplicaPercent());
+                                                 topic.getPartitions()
+                                                   .forEach(partition -> {
+                                                      if (partition.getLeader() != null) {
+                                                         summary.addBrokerLeaderPartition(partition.getLeader().getId());
+                                                      }
+                                                      if (partition.getPreferredLeader() != null) {
+                                                         summary.addBrokerPreferredLeaderPartition(partition.getPreferredLeader().getId());
+                                                      }
+                                                      partition.getReplicas()
+                                                        .forEach(replica -> summary.addExpectedBrokerId(replica.getId()));
+                                                   });
+                                                 return summary;
+                                              })
+                                              .reduce((s1, s2) -> {
+                                                 s1.setPartitionCount(s1.getPartitionCount() + s2.getPartitionCount());
+                                                 s1.setUnderReplicatedCount(s1.getUnderReplicatedCount() + s2.getUnderReplicatedCount());
+                                                 s1.setPreferredReplicaPercent(s1.getPreferredReplicaPercent() + s2.getPreferredReplicaPercent());
+                                                 s2.getBrokerLeaderPartitionCount().forEach(s1::addBrokerLeaderPartition);
+                                                 s2.getBrokerPreferredLeaderPartitionCount().forEach(s1::addBrokerPreferredLeaderPartition);
+                                                 return s1;
+                                              })
+                                              .orElseGet(ClusterSummaryVO::new);
       topicSummary.setTopicCount(topics.size());
       topicSummary.setPreferredReplicaPercent(topicSummary.getPreferredReplicaPercent() / topics.size());
       return topicSummary;
@@ -321,8 +322,8 @@ public List<TopicVO> getTopics()
    {
       validateInitialized();
       return getTopicMetadata().values().stream()
-         .sorted(Comparator.comparing(TopicVO::getName))
-         .collect(Collectors.toList());
+               .sorted(Comparator.comparing(TopicVO::getName))
+               .collect(Collectors.toList());
    }
 
    @Override
@@ -331,14 +332,14 @@ public Optional<TopicVO> getTopic(String topic)
       validateInitialized();
       final Optional<TopicVO> topicVO = Optional.ofNullable(getTopicMetadata(topic).get(topic));
       topicVO.ifPresent(
-         vo -> {
-            getTopicPartitionSizes(vo, kafka.api.OffsetRequest.LatestTime())
-               .entrySet()
-               .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setSize(entry.getValue())));
-            getTopicPartitionSizes(vo, kafka.api.OffsetRequest.EarliestTime())
-               .entrySet()
-               .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setFirstOffset(entry.getValue())));
-         }
+        vo -> {
+           getTopicPartitionSizes(vo, kafka.api.OffsetRequest.LatestTime())
+             .entrySet()
+             .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setSize(entry.getValue())));
+           getTopicPartitionSizes(vo, kafka.api.OffsetRequest.EarliestTime())
+             .entrySet()
+             .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setFirstOffset(entry.getValue())));
+        }
       );
       return topicVO;
    }
@@ -348,8 +349,8 @@ private Map<String, TopicVO> getTopicMetadata(String... topics)
       if (kafkaVersion.compareTo(new Version(0, 9, 0)) >= 0)
       {
          return retryTemplate.execute(
-            context -> brokerChannel(null)
-               .execute(channel -> getTopicMetadata(channel, topics)));
+           context -> brokerChannel(null)
+                        .execute(channel -> getTopicMetadata(channel, topics)));
       }
       else
       {
@@ -357,11 +358,11 @@ private Map<String, TopicVO> getTopicMetadata(String... topics)
          if (topics == null || topics.length == 0)
          {
             topicStream =
-               Optional.ofNullable(
-                  topicTreeCache.getCurrentChildren(ZkUtils.BrokerTopicsPath()))
-                  .map(Map::keySet)
-                  .map(Collection::stream)
-                  .orElse(Stream.empty());
+              Optional.ofNullable(
+                topicTreeCache.getCurrentChildren(ZkUtils.BrokerTopicsPath()))
+                .map(Map::keySet)
+                .map(Collection::stream)
+                .orElse(Stream.empty());
          }
          else
          {
@@ -369,17 +370,17 @@ private Map<String, TopicVO> getTopicMetadata(String... topics)
          }
 
          return topicStream
-            .map(this::getTopicZkData)
-            .filter(Objects::nonNull)
-            .collect(Collectors.toMap(TopicVO::getName, topic -> topic));
+                  .map(this::getTopicZkData)
+                  .filter(Objects::nonNull)
+                  .collect(Collectors.toMap(TopicVO::getName, topic -> topic));
       }
    }
 
    private TopicVO getTopicZkData(String topic)
    {
       return Optional.ofNullable(topicTreeCache.getCurrentData(ZkUtils.getTopicPath(topic)))
-         .map(this::parseZkTopic)
-         .orElse(null);
+               .map(this::parseZkTopic)
+               .orElse(null);
    }
 
    public TopicVO parseZkTopic(ChildData input)
@@ -389,12 +390,12 @@ public TopicVO parseZkTopic(ChildData input)
          final TopicVO topic = new TopicVO(StringUtils.substringAfterLast(input.getPath(), "/"));
 
          final TopicRegistrationVO topicRegistration =
-            objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData());
+           objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData());
 
          topic.setConfig(
-            Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
-               .map(this::readTopicConfig)
-               .orElse(Collections.emptyMap()));
+           Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName()))
+             .map(this::readTopicConfig)
+             .orElse(Collections.emptyMap()));
 
          for (Map.Entry<Integer, List<Integer>> entry : topicRegistration.getReplicas().entrySet())
          {
@@ -406,14 +407,14 @@ public TopicVO parseZkTopic(ChildData input)
             final Optional<TopicPartitionStateVO> partitionState = partitionState(topic.getName(), partition.getId());
 
             partitionBrokerIds.stream()
-               .map(brokerId -> {
-                  TopicPartitionVO.PartitionReplica replica = new TopicPartitionVO.PartitionReplica();
-                  replica.setId(brokerId);
-                  replica.setInService(partitionState.map(ps -> ps.getIsr().contains(brokerId)).orElse(false));
-                  replica.setLeader(partitionState.map(ps -> brokerId == ps.getLeader()).orElse(false));
-                  return replica;
-               })
-               .forEach(partition::addReplica);
+              .map(brokerId -> {
+                 TopicPartitionVO.PartitionReplica replica = new TopicPartitionVO.PartitionReplica();
+                 replica.setId(brokerId);
+                 replica.setInService(partitionState.map(ps -> ps.getIsr().contains(brokerId)).orElse(false));
+                 replica.setLeader(partitionState.map(ps -> brokerId == ps.getLeader()).orElse(false));
+                 return replica;
+              })
+              .forEach(partition::addReplica);
 
             topic.addPartition(partition);
          }
@@ -431,21 +432,21 @@ public TopicVO parseZkTopic(ChildData input)
    private Map<String, TopicVO> getTopicMetadata(BlockingChannel channel, String... topics)
    {
       final TopicMetadataRequest request =
-         new TopicMetadataRequest((short) 0, 0, clientId(), Arrays.asList(topics));
+        new TopicMetadataRequest((short) 0, 0, clientId(), Arrays.asList(topics));
 
       LOG.debug("Sending topic metadata request: {}", request);
 
       channel.send(request);
       final kafka.api.TopicMetadataResponse underlyingResponse =
-         kafka.api.TopicMetadataResponse.readFrom(channel.receive().buffer());
+        kafka.api.TopicMetadataResponse.readFrom(channel.receive().payload());
 
       LOG.debug("Received topic metadata response: {}", underlyingResponse);
 
       TopicMetadataResponse response = new TopicMetadataResponse(underlyingResponse);
       return response.topicsMetadata().stream()
-         .filter(tmd -> tmd.errorCode() == ErrorMapping.NoError())
-         .map(this::processTopicMetadata)
-         .collect(Collectors.toMap(TopicVO::getName, t -> t));
+               .filter(tmd -> tmd.errorCode() == ErrorMapping.NoError())
+               .map(this::processTopicMetadata)
+               .collect(Collectors.toMap(TopicVO::getName, t -> t));
    }
 
    private TopicVO processTopicMetadata(TopicMetadata tmd)
@@ -453,14 +454,14 @@ private TopicVO processTopicMetadata(TopicMetadata tmd)
       TopicVO topic = new TopicVO(tmd.topic());
 
       topic.setConfig(
-         Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
-            .map(this::readTopicConfig)
-            .orElse(Collections.emptyMap()));
+        Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName()))
+          .map(this::readTopicConfig)
+          .orElse(Collections.emptyMap()));
 
       topic.setPartitions(
-         tmd.partitionsMetadata().stream()
-            .map((pmd) -> parsePartitionMetadata(tmd.topic(), pmd))
-            .collect(Collectors.toMap(TopicPartitionVO::getId, p -> p))
+        tmd.partitionsMetadata().stream()
+          .map((pmd) -> parsePartitionMetadata(tmd.topic(), pmd))
+          .collect(Collectors.toMap(TopicPartitionVO::getId, p -> p))
       );
       return topic;
    }
@@ -475,14 +476,14 @@ private TopicPartitionVO parsePartitionMetadata(String topic, PartitionMetadata
 
       final List<Integer> isr = getIsr(topic, pmd);
       pmd.replicas().stream()
-         .map(replica -> new TopicPartitionVO.PartitionReplica(replica.id(), isr.contains(replica.id()), false))
-         .forEach(partition::addReplica);
+        .map(replica -> new TopicPartitionVO.PartitionReplica(replica.id(), isr.contains(replica.id()), false))
+        .forEach(partition::addReplica);
       return partition;
    }
 
    private List<Integer> getIsr(String topic, PartitionMetadata pmd)
    {
-      return pmd.isr().stream().map(Broker::id).collect(Collectors.toList());
+      return pmd.isr().stream().map(BrokerEndPoint::id).collect(Collectors.toList());
    }
 
    private Map<String, Object> readTopicConfig(ChildData d)
@@ -500,11 +501,11 @@ private Map<String, Object> readTopicConfig(ChildData d)
 
 
    private Optional<TopicPartitionStateVO> partitionState(String topicName, int partitionId)
-      throws IOException
+     throws IOException
    {
       final Optional<byte[]> partitionData = Optional.ofNullable(topicTreeCache.getCurrentData(
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topicName, partitionId)))
-              .map(ChildData::getData);
+        ZkUtils.getTopicPartitionLeaderAndIsrPath(topicName, partitionId)))
+                                               .map(ChildData::getData);
       if (partitionData.isPresent())
       {
          return Optional.ofNullable(objectMapper.reader(TopicPartitionStateVO.class).readValue(partitionData.get()));
@@ -527,8 +528,8 @@ public List<ConsumerVO> getConsumers(final TopicVO topic)
    {
       validateInitialized();
       return getConsumerStream(topic)
-         .filter(consumer -> consumer.getTopic(topic.getName()) != null)
-         .collect(Collectors.toList());
+               .filter(consumer -> consumer.getTopic(topic.getName()) != null)
+               .collect(Collectors.toList());
    }
 
    @Override
@@ -540,10 +541,10 @@ public List<ConsumerVO> getConsumers(final String topic)
    private Stream<ConsumerVO> getConsumerStream(TopicVO topic)
    {
       return consumerTreeCache.getCurrentChildren(ZkUtils.ConsumersPath()).keySet().stream()
-         .map(g -> getConsumerByTopic(g, topic))
-         .filter(Optional::isPresent)
-         .map(Optional::get)
-         .sorted(Comparator.comparing(ConsumerVO::getGroupId));
+               .map(g -> getConsumerByTopic(g, topic))
+               .filter(Optional::isPresent)
+               .map(Optional::get)
+               .sorted(Comparator.comparing(ConsumerVO::getGroupId));
    }
 
    @Override
@@ -569,11 +570,11 @@ public Optional<ConsumerVO> getConsumerByTopic(String groupId, TopicVO topic)
 
       // todo: get number of threads in each instance (subscription -> topic -> # threads)
       Optional.ofNullable(consumerTreeCache.getCurrentChildren(groupDirs.consumerRegistryDir()))
-         .ifPresent(
-            children ->
-               children.keySet().stream()
-                  .map(id -> readConsumerRegistration(groupDirs, id))
-                  .forEach(consumer::addActiveInstance));
+        .ifPresent(
+          children ->
+            children.keySet().stream()
+              .map(id -> readConsumerRegistration(groupDirs, id))
+              .forEach(consumer::addActiveInstance));
 
       Stream<String> topicStream = null;
 
@@ -591,19 +592,19 @@ public Optional<ConsumerVO> getConsumerByTopic(String groupId, TopicVO topic)
       else
       {
          topicStream = Optional.ofNullable(
-            consumerTreeCache.getCurrentChildren(groupDirs.consumerGroupDir() + "/owners"))
-            .map(Map::keySet)
-            .map(Collection::stream)
-            .orElse(Stream.empty());
+           consumerTreeCache.getCurrentChildren(groupDirs.consumerGroupDir() + "/owners"))
+                         .map(Map::keySet)
+                         .map(Collection::stream)
+                         .orElse(Stream.empty());
       }
 
       topicStream
-         .map(ConsumerTopicVO::new)
-         .forEach(consumerTopic -> {
-            getConsumerPartitionStream(groupId, consumerTopic.getTopic(), topic)
-               .forEach(consumerTopic::addOffset);
-            consumer.addTopic(consumerTopic);
-         });
+        .map(ConsumerTopicVO::new)
+        .forEach(consumerTopic -> {
+           getConsumerPartitionStream(groupId, consumerTopic.getTopic(), topic)
+             .forEach(consumerTopic::addOffset);
+           consumer.addTopic(consumerTopic);
+        });
 
       return Optional.of(consumer);
    }
@@ -644,24 +645,24 @@ private Stream<ConsumerPartitionVO> getConsumerPartitionStream(String groupId,
          Map<Integer, Long> consumerOffsets = getConsumerOffsets(groupId, topic);
 
          return topic.getPartitions().stream()
-            .map(partition -> {
-               int partitionId = partition.getId();
+                  .map(partition -> {
+                     int partitionId = partition.getId();
 
-               final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
-               consumerPartition.setOwner(
-                  Optional.ofNullable(
-                     consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
-                     .map(data -> new String(data.getData()))
-                     .orElse(null));
+                     final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
+                     consumerPartition.setOwner(
+                       Optional.ofNullable(
+                         consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
+                         .map(data -> new String(data.getData()))
+                         .orElse(null));
 
-               consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));
+                     consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));
 
-               final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
-               consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
-               consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
+                     final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
+                     consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
+                     consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
 
-               return consumerPartition;
-            });
+                     return consumerPartition;
+                  });
       }
       else
       {
@@ -678,15 +679,15 @@ private Map<Integer, Long> getConsumerOffsets(String groupId, TopicVO topic)
          // for both and assume that the largest offset is the correct one.
 
          ForkJoinTask<Map<Integer, Long>> kafkaTask =
-            threadPool.submit(() -> getConsumerOffsets(groupId, topic, false));
+           threadPool.submit(() -> getConsumerOffsets(groupId, topic, false));
 
          ForkJoinTask<Map<Integer, Long>> zookeeperTask =
-            threadPool.submit(() -> getConsumerOffsets(groupId, topic, true));
+           threadPool.submit(() -> getConsumerOffsets(groupId, topic, true));
 
          Map<Integer, Long> zookeeperOffsets = zookeeperTask.get();
          Map<Integer, Long> kafkaOffsets = kafkaTask.get();
          zookeeperOffsets.entrySet()
-            .forEach(entry -> kafkaOffsets.merge(entry.getKey(), entry.getValue(), Math::max));
+           .forEach(entry -> kafkaOffsets.merge(entry.getKey(), entry.getValue(), Math::max));
          return kafkaOffsets;
       }
       catch (InterruptedException ex)
@@ -705,8 +706,8 @@ private Map<Integer, Long> getConsumerOffsets(String groupId,
                                                  boolean zookeeperOffsets)
    {
       return retryTemplate.execute(
-         context -> brokerChannel(zookeeperOffsets ? null : offsetManagerBroker(groupId))
-            .execute(channel -> getConsumerOffsets(channel, groupId, topic, zookeeperOffsets)));
+        context -> brokerChannel(zookeeperOffsets ? null : offsetManagerBroker(groupId))
+                     .execute(channel -> getConsumerOffsets(channel, groupId, topic, zookeeperOffsets)));
    }
 
    /**
@@ -729,27 +730,27 @@ private Map<Integer, Long> getConsumerOffsets(BlockingChannel channel,
    {
 
       final OffsetFetchRequest request = new OffsetFetchRequest(
-         groupId,
-         topic.getPartitions().stream()
-            .map(p -> new TopicAndPartition(topic.getName(), p.getId()))
-            .collect(Collectors.toList()),
-         (short) (zookeeperOffsets ? 0 : 1), 0, // version 0 = zookeeper offsets, 1 = kafka offsets
-         clientId());
+        groupId,
+        topic.getPartitions().stream()
+          .map(p -> new TopicAndPartition(topic.getName(), p.getId()))
+          .collect(Collectors.toList()),
+        (short) (zookeeperOffsets ? 0 : 1), 0, // version 0 = zookeeper offsets, 1 = kafka offsets
+        clientId());
 
       LOG.debug("Sending consumer offset request: {}", request);
 
       channel.send(request.underlying());
 
       final kafka.api.OffsetFetchResponse underlyingResponse =
-         kafka.api.OffsetFetchResponse.readFrom(channel.receive().buffer());
+        kafka.api.OffsetFetchResponse.readFrom(channel.receive().payload());
 
       LOG.debug("Received consumer offset response: {}", underlyingResponse);
 
       OffsetFetchResponse response = new OffsetFetchResponse(underlyingResponse);
 
       return response.offsets().entrySet().stream()
-         .filter(entry -> entry.getValue().error() == ErrorMapping.NoError())
-         .collect(Collectors.toMap(entry -> entry.getKey().partition(), entry -> entry.getValue().offset()));
+               .filter(entry -> entry.getValue().error().code() == ErrorMapping.NoError())
+               .collect(Collectors.toMap(entry -> entry.getKey().partition(), entry -> entry.getValue().offset()));
    }
 
    /**
@@ -758,22 +759,22 @@ private Map<Integer, Long> getConsumerOffsets(BlockingChannel channel,
    private Integer offsetManagerBroker(String groupId)
    {
       return retryTemplate.execute(
-         context ->
-            brokerChannel(null)
-               .execute(channel -> offsetManagerBroker(channel, groupId))
+        context ->
+          brokerChannel(null)
+            .execute(channel -> offsetManagerBroker(channel, groupId))
       );
    }
 
    private Integer offsetManagerBroker(BlockingChannel channel, String groupId)
    {
-      final ConsumerMetadataRequest request =
-         new ConsumerMetadataRequest(groupId, (short) 0, 0, clientId());
+      final GroupCoordinatorRequest request =
+        new GroupCoordinatorRequest(groupId, (short) 0, 0, clientId());
 
       LOG.debug("Sending consumer metadata request: {}", request);
 
       channel.send(request);
-      ConsumerMetadataResponse response =
-         ConsumerMetadataResponse.readFrom(channel.receive().buffer());
+      GroupCoordinatorResponse response =
+        GroupCoordinatorResponse.readFrom(channel.receive().payload());
 
       LOG.debug("Received consumer metadata response: {}", response);
 
@@ -792,48 +793,48 @@ private Map<Integer, Long> getTopicPartitionSizes(TopicVO topic, long time)
          PartitionOffsetRequestInfo requestInfo = new PartitionOffsetRequestInfo(time, 1);
 
          return threadPool.submit(() ->
-               topic.getPartitions().parallelStream()
-                  .filter(p -> p.getLeader() != null)
-                  .collect(Collectors.groupingBy(p -> p.getLeader().getId())) // Group partitions by leader broker id
-                  .entrySet().parallelStream()
-                  .map(entry -> {
-                     final Integer brokerId = entry.getKey();
-                     final List<TopicPartitionVO> brokerPartitions = entry.getValue();
-                     try
-                     {
-                        // Get the size of the partitions for a topic from the leader.
-                        final OffsetResponse offsetResponse =
-                           sendOffsetRequest(brokerId, topic, requestInfo, brokerPartitions);
-
-
-                        // Build a map of partitionId -> topic size from the response
-                        return brokerPartitions.stream()
-                           .collect(Collectors.toMap(TopicPartitionVO::getId,
-                                                     partition -> Optional.ofNullable(
-                                                        offsetResponse.offsets(topic.getName(), partition.getId()))
-                                                        .map(Arrays::stream)
-                                                        .orElse(LongStream.empty())
-                                                        .findFirst()
-                                                        .orElse(-1L)));
-                     }
-                     catch (Exception ex)
-                     {
-                        LOG.error("Unable to get partition log size for topic {} partitions ({})",
-                                  topic.getName(),
-                                  brokerPartitions.stream()
-                                     .map(TopicPartitionVO::getId)
-                                     .map(String::valueOf)
-                                     .collect(Collectors.joining(",")),
-                                  ex);
-
-                        // Map each partition to -1, indicating we got an error
-                        return brokerPartitions.stream().collect(Collectors.toMap(TopicPartitionVO::getId, tp -> -1L));
-                     }
-                  })
-                  .map(Map::entrySet)
-                  .flatMap(Collection::stream)
-                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
-            .get();
+                                    topic.getPartitions().parallelStream()
+                                      .filter(p -> p.getLeader() != null)
+                                      .collect(Collectors.groupingBy(p -> p.getLeader().getId())) // Group partitions by leader broker id
+                                      .entrySet().parallelStream()
+                                      .map(entry -> {
+                                         final Integer brokerId = entry.getKey();
+                                         final List<TopicPartitionVO> brokerPartitions = entry.getValue();
+                                         try
+                                         {
+                                            // Get the size of the partitions for a topic from the leader.
+                                            final OffsetResponse offsetResponse =
+                                              sendOffsetRequest(brokerId, topic, requestInfo, brokerPartitions);
+
+
+                                            // Build a map of partitionId -> topic size from the response
+                                            return brokerPartitions.stream()
+                                                     .collect(Collectors.toMap(TopicPartitionVO::getId,
+                                                       partition -> Optional.ofNullable(
+                                                         offsetResponse.offsets(topic.getName(), partition.getId()))
+                                                                      .map(Arrays::stream)
+                                                                      .orElse(LongStream.empty())
+                                                                      .findFirst()
+                                                                      .orElse(-1L)));
+                                         }
+                                         catch (Exception ex)
+                                         {
+                                            LOG.error("Unable to get partition log size for topic {} partitions ({})",
+                                              topic.getName(),
+                                              brokerPartitions.stream()
+                                                .map(TopicPartitionVO::getId)
+                                                .map(String::valueOf)
+                                                .collect(Collectors.joining(",")),
+                                              ex);
+
+                                            // Map each partition to -1, indicating we got an error
+                                            return brokerPartitions.stream().collect(Collectors.toMap(TopicPartitionVO::getId, tp -> -1L));
+                                         }
+                                      })
+                                      .map(Map::entrySet)
+                                      .flatMap(Collection::stream)
+                                      .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
+                  .get();
       }
       catch (InterruptedException e)
       {
@@ -851,27 +852,27 @@ private OffsetResponse sendOffsetRequest(Integer brokerId, TopicVO topic,
                                             List<TopicPartitionVO> brokerPartitions)
    {
       final OffsetRequest offsetRequest = new OffsetRequest(
-         brokerPartitions.stream()
-            .collect(Collectors.toMap(
-               partition -> new TopicAndPartition(topic.getName(), partition.getId()),
-               partition -> requestInfo)),
-         (short) 0, clientId());
+        brokerPartitions.stream()
+          .collect(Collectors.toMap(
+            partition -> new TopicAndPartition(topic.getName(), partition.getId()),
+            partition -> requestInfo)),
+        (short) 0, clientId());
 
       LOG.debug("Sending offset request: {}", offsetRequest);
 
       return retryTemplate.execute(
-         context ->
-            brokerChannel(brokerId)
-               .execute(channel ->
-                        {
-                           channel.send(offsetRequest.underlying());
-                           final kafka.api.OffsetResponse underlyingResponse =
-                              kafka.api.OffsetResponse.readFrom(channel.receive().buffer());
+        context ->
+          brokerChannel(brokerId)
+            .execute(channel ->
+            {
+               channel.send(offsetRequest.underlying());
+               final kafka.api.OffsetResponse underlyingResponse =
+                 kafka.api.OffsetResponse.readFrom(channel.receive().payload());
 
-                           LOG.debug("Received offset response: {}", underlyingResponse);
+               LOG.debug("Received offset response: {}", underlyingResponse);
 
-                           return new OffsetResponse(underlyingResponse);
-                        }));
+               return new OffsetResponse(underlyingResponse);
+            }));
    }
 
    private class BrokerListener implements PathChildrenCacheListener
@@ -897,8 +898,8 @@ public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event)
             case INITIALIZED:
             {
                brokerPathCache.getCurrentData().stream()
-                  .map(BrokerListener.this::parseBroker)
-                  .forEach(CuratorKafkaMonitor.this::addBroker);
+                 .map(BrokerListener.this::parseBroker)
+                 .forEach(CuratorKafkaMonitor.this::addBroker);
                break;
             }
          }
@@ -926,4 +927,4 @@ private BrokerVO parseBroker(ChildData input)
       }
    }
 
-}
+}
\ No newline at end of file