From f54e2d74cbd6d4af8c1052e8d7a1dbb6d84b17df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E5=89=91=E6=A1=A5?= Date: Fri, 23 Nov 2018 17:48:58 +0800 Subject: [PATCH] check whether comsumer subscript the topic or not when divide partitions between consumers --- consumergroup/consumer_group.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) mode change 100644 => 100755 consumergroup/consumer_group.go diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go old mode 100644 new mode 100755 index a4e0d5c..dba378c --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -333,7 +333,18 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con return } - dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders) + topicConsumers := []*kazoo.ConsumergroupInstance{} + for _, consumer := range cg.consumers{ + reg, _ := consumer.Registration() + for sub, _ := range reg.Subscription{ + if sub == topic{ + topicConsumers = append(topicConsumers, consumer) + break + } + } + } + + dividedPartitions := dividePartitionsBetweenConsumers(topicConsumers, partitionLeaders) myPartitions := dividedPartitions[cg.instance.ID] cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))