From cbcd734bfac766a08731318bf82aa8ed19e4c20e Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 1 Dec 2021 19:06:40 +0000 Subject: [PATCH] fix: only update preferredReadReplica if valid FetchResponse from a follower will _not_ contain a PreferredReadReplica. It seems like the partitionConsumer would overwrite the previously assigned value from the leader with -1 which would then trigger the "reconnect to the current leader" changes from #1936 causing a flip-flop effect. Contributes-to: #2071 Signed-off-by: Dominic Evans --- consumer.go | 25 ++++++++++++++----------- fetch_response.go | 2 ++ 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/consumer.go b/consumer.go index f68eb1d2b..1cb910deb 100644 --- a/consumer.go +++ b/consumer.go @@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) { func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { child := &partitionConsumer{ - consumer: c, - conf: c.conf, - topic: topic, - partition: partition, - messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), - errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), - feeder: make(chan *FetchResponse, 1), - trigger: make(chan none, 1), - dying: make(chan none), - fetchSize: c.conf.Consumer.Fetch.Default, + consumer: c, + conf: c.conf, + topic: topic, + partition: partition, + messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), + errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), + feeder: make(chan *FetchResponse, 1), + preferredReadReplica: invalidPreferredReplicaID, + trigger: make(chan none, 1), + dying: make(chan none), + fetchSize: c.conf.Consumer.Fetch.Default, } if err := child.chooseStartingOffset(offset); err != nil { @@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu consumerBatchSizeMetric.Update(int64(nRecs)) - child.preferredReadReplica = block.PreferredReadReplica + if block.PreferredReadReplica != invalidPreferredReplicaID { + child.preferredReadReplica = block.PreferredReadReplica + } if nRecs == 0 { partialTrailingMessage, err := block.isPartial() diff --git a/fetch_response.go b/fetch_response.go index 19040c827..1b02b0070 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -5,6 +5,8 @@ import ( "time" ) +const invalidPreferredReplicaID = -1 + type AbortedTransaction struct { ProducerID int64 FirstOffset int64