Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Bug? Consuming stops or hangs at certain time. #110

Open
windless0530 opened this issue Dec 1, 2016 · 0 comments
Open

Bug? Consuming stops or hangs at certain time. #110

windless0530 opened this issue Dec 1, 2016 · 0 comments

Comments

@windless0530
Copy link

windless0530 commented Dec 1, 2016

I have met a problem:

Code written using wvanbergen/kafka consumes messages successfully at first, but after receiving 872 messages, the consuming loop ended. I am sure all 872 messages are processed successfully by another goroutine.

Then I restarted my program, still no message could be consumed.

While at this time, on kafka manager, the consumer offset stops and never grows, while the log size and total lag num are constantly growing.

If I use kafka consuming test script kafka-console-consumer.sh, with the same zk/consumer group, all messages could be successfully consumed.

So I could just consider it as a bug...

BTW, the code is displayed as following (The "received %d msg" log printed at first but now never prints again, and the log "Message terminated." never ever printed even once):

BTW again, the same thing happened last week, and I stopped my program at that time. Today I ran my program again, at first everything was right, but then stopped again.

`consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}

go func() {
	for err := range consumer.Errors() {
		log.Printf("receive consumer error: %s\n", err)
		if consumer.Closed() {
			consumer, _ = consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
		}
	}
}()

eventCount := 0
offsets := make(map[string]map[int32]int64)
go func() {
	log.Infof("Consumer group: close = %t, %s", consumer.Closed(), consumer)
	for message := range consumer.Messages() {
		if offsets[message.Topic] == nil {
			offsets[message.Topic] = make(map[int32]int64)
		}
		eventCount += 1
		if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
			log.Printf("unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
		}

		log.Infof("received %d msg", eventCount)
		p.recordRecv()
		p.msgChan <- message.Value

		offsets[message.Topic][message.Partition] = message.Offset
		consumer.CommitUpto(message)
	}
	log.Infoln("Message terminated.")
}()`
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant