Skip to content

Commit

Permalink
fix: only update preferredReadReplica if valid
Browse files Browse the repository at this point in the history
FetchResponse from a follower will _not_ contain a PreferredReadReplica.
It seems like the partitionConsumer would overwrite the previously
assigned value from the leader with -1 which would then trigger the
"reconnect to the current leader" changes from IBM#1936 causing a flip-flop
effect.

Contributes-to: IBM#2071

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Dec 1, 2021
1 parent 556ddf7 commit cbcd734
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
25 changes: 14 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) {

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer{
consumer: c,
conf: c.conf,
topic: topic,
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
consumer: c,
conf: c.conf,
topic: topic,
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
preferredReadReplica: invalidPreferredReplicaID,
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
}

if err := child.chooseStartingOffset(offset); err != nil {
Expand Down Expand Up @@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

consumerBatchSizeMetric.Update(int64(nRecs))

child.preferredReadReplica = block.PreferredReadReplica
if block.PreferredReadReplica != invalidPreferredReplicaID {
child.preferredReadReplica = block.PreferredReadReplica
}

if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
Expand Down
2 changes: 2 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const invalidPreferredReplicaID = -1

type AbortedTransaction struct {
ProducerID int64
FirstOffset int64
Expand Down

0 comments on commit cbcd734

Please sign in to comment.