Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix heartbeat goroutine leak #149

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ immediately, you can wait on the returned channel until all consumers are done:
You can also stop consuming on all queues in your connection:

```go
finishedChan := connection.StopAllConsuming()
finishedChan := connection.Close()
```

Wait on the `finishedChan` to wait for all consumers on all queues to finish.
Expand Down
242 changes: 144 additions & 98 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rmq

import (
"os"
"strconv"
"testing"
"time"

Expand All @@ -21,6 +22,94 @@ func testRedis(t testing.TB) (addr string, closer func()) {
return mr.Addr(), mr.Close
}

func publishDeliveries(t *testing.T, queue Queue, ready, unAcked int64, deliveries ...string) (int64, int64) {
t.Helper()
checkReadyAndUnAcked(t, queue, ready, unAcked)
for _, delivery := range deliveries {
assert.NoError(t, queue.Publish(delivery))
ready += 1
checkReadyAndUnAcked(t, queue, ready, unAcked)
}
return ready, unAcked
}

// consume cnt deliveries into chan buffer
func preConsume(t *testing.T, queue Queue, ready, unAcked, cnt int64) (int64, int64) {
t.Helper()
eventuallyUnacked(t, queue, unAcked)
assert.NoError(t, queue.StartConsuming(cnt, time.Millisecond))
ready -= cnt
unAcked += cnt
checkReadyAndUnAcked(t, queue, ready, unAcked)
return ready, unAcked
}

// ack finished deliveries
func consumerAck(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected ...string) (int64, int64) {
t.Helper()
for _, exp := range expected {
require.NotNil(t, consumer.Last())
checkLast(t, queue, consumer, ready, unAcked, exp)
assert.NoError(t, consumer.Last().Ack())
ready -= 1
checkReadyAndUnAcked(t, queue, ready, unAcked)
}
return ready, unAcked
}

// receive deliveries from chan buffer and finish them
func consumeWithoutAck(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected ...string) {
t.Helper()
for _, exp := range expected {
consumer.Finish() // unacked
time.Sleep(10 * time.Millisecond)
checkLast(t, queue, consumer, ready, unAcked, exp)
}
}

// check last received deivery
func checkLast(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected string) {
t.Helper()
checkReadyAndUnAcked(t, queue, ready, unAcked)
assert.Equal(t, expected, consumer.Last().Payload())
}

func checkReadyAndUnAcked(t *testing.T, queue Queue, ready, unAcked int64) {
t.Helper()
eventuallyUnacked(t, queue, unAcked)
eventuallyReady(t, queue, ready)
}

func genDeliveries(start, end int) []string {
res := make([]string, end-start+1)
for i := 0; i <= end-start; i++ {
res[i] = "del" + strconv.Itoa(i+start)
}
return res
}

func assertQueueNum(t *testing.T, conn Connection, num int) {
t.Helper()
queues, err := conn.GetOpenQueues()
assert.NoError(t, err)
assert.Len(t, queues, num)
}

func newManualConsumer(name string) *TestConsumer {
consumer := NewTestConsumer(name)
consumer.AutoFinish = false
consumer.AutoAck = false
return consumer
}

func queueAddManualConsume(t *testing.T, queue Queue, name string) *TestConsumer {
consumer := newManualConsumer(name)
_, err := queue.AddConsumer(name, consumer) // take one 'del' from chan buffer and wait finish
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
return consumer
}

func TestCleaner(t *testing.T) {
redisAddr, closer := testRedis(t)
defer closer()
Expand All @@ -32,139 +121,96 @@ func TestCleaner(t *testing.T) {

conn, err := OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
assert.NoError(t, err)
queues, err := conn.GetOpenQueues()
assert.NoError(t, err)
assert.Len(t, queues, 0)
queue, err := conn.OpenQueue("q1")
assert.NoError(t, err)
queues, err = conn.GetOpenQueues()
assert.NoError(t, err)
assert.Len(t, queues, 1)
_, err = conn.OpenQueue("q2")
assert.NoError(t, err)
queues, err = conn.GetOpenQueues()
assertQueueNum(t, conn, 0)

queue1, err := conn.OpenQueue("q1")
assert.NoError(t, err)
assert.Len(t, queues, 2)

eventuallyReady(t, queue, 0)
assert.NoError(t, queue.Publish("del1"))
eventuallyReady(t, queue, 1)
assert.NoError(t, queue.Publish("del2"))
eventuallyReady(t, queue, 2)
assert.NoError(t, queue.Publish("del3"))
eventuallyReady(t, queue, 3)
assert.NoError(t, queue.Publish("del4"))
eventuallyReady(t, queue, 4)
assert.NoError(t, queue.Publish("del5"))
eventuallyReady(t, queue, 5)
assert.NoError(t, queue.Publish("del6"))
eventuallyReady(t, queue, 6)

eventuallyUnacked(t, queue, 0)
assert.NoError(t, queue.StartConsuming(2, time.Millisecond))
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 4)

consumer := NewTestConsumer("c-A")
consumer.AutoFinish = false
consumer.AutoAck = false
assertQueueNum(t, conn, 1)

_, err = queue.AddConsumer("consumer1", consumer)
queue2, err := conn.OpenQueue("q2")
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 4)
assertQueueNum(t, conn, 2)

require.NotNil(t, consumer.Last())
assert.Equal(t, "del1", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Ack())
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 3)
var (
ready1 = int64(0)
unAcked1A = int64(0)
ready2 = int64(0)
unAcked2A = int64(0)
)

consumer.Finish()
ready1, unAcked1A = publishDeliveries(t, queue1, ready1, unAcked1A, genDeliveries(1, 6)...) // pub 1...6
ready1, unAcked1A = preConsume(t, queue1, ready1, unAcked1A, 2) // take 1, 2 into chan buffer
ready2, unAcked2A = publishDeliveries(t, queue2, ready2, unAcked2A, genDeliveries(1, 9)...) // pub 1...9
ready2, unAcked2A = preConsume(t, queue2, ready2, unAcked2A, 5) // take 1...5 into chan buffer

consumer1A := queueAddManualConsume(t, queue1, "C-1-A") // take 1 from chan buffer and wait finish
consumer2A := queueAddManualConsume(t, queue2, "C-2-A") // take 1 from chan buffer and wait finish

ready1, unAcked1A = consumerAck(t, queue1, consumer1A, ready1, unAcked1A, genDeliveries(1, 1)...) // ack 1 -> take 3
ready2, unAcked2A = consumerAck(t, queue2, consumer2A, ready2, unAcked2A, genDeliveries(1, 1)...) // ack 1 -> take 6
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 3)
assert.Equal(t, "del2", consumer.Last().Payload())
consumer1A.FinishAll()
consumer2A.FinishAll()

queue.StopConsuming()
assert.NoError(t, conn.stopHeartbeat())
<-conn.Close()
time.Sleep(time.Millisecond)

conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
assert.NoError(t, err)
queue, err = conn.OpenQueue("q1")
unAcked1B := int64(0)
unAcked2B := int64(0)
queue1, err = conn.OpenQueue("q1")
assert.NoError(t, err)
queue2, err = conn.OpenQueue("q2")
assert.NoError(t, err)

assert.NoError(t, queue.Publish("del7"))
eventuallyReady(t, queue, 4)
assert.NoError(t, queue.Publish("del8"))
eventuallyReady(t, queue, 5)
assert.NoError(t, queue.Publish("del9"))
eventuallyReady(t, queue, 6)
assert.NoError(t, queue.Publish("del10"))
eventuallyReady(t, queue, 7)
assert.NoError(t, queue.Publish("del11"))
eventuallyReady(t, queue, 8)

eventuallyUnacked(t, queue, 0)
assert.NoError(t, queue.StartConsuming(2, time.Millisecond))
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 6)

consumer = NewTestConsumer("c-B")
consumer.AutoFinish = false
consumer.AutoAck = false
ready1, unAcked1B = publishDeliveries(t, queue1, ready1, unAcked1B, genDeliveries(7, 11)...) // pub 7...11

_, err = queue.AddConsumer("consumer2", consumer)
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 6)
assert.Equal(t, "del4", consumer.Last().Payload())
ready1, unAcked1B = preConsume(t, queue1, ready1, unAcked1B, 2) // take 4, 5 into chan buffer
ready2, unAcked2B = preConsume(t, queue2, ready2, unAcked2B, 2) // take 7, 8 into chan buffer

consumer.Finish() // unacked
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 6)
consumer1B := queueAddManualConsume(t, queue1, "C-1-B") // take 4 from chan buffer and wait finish

assert.Equal(t, "del5", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Ack())
consumeWithoutAck(t, queue1, consumer1B, ready1, unAcked1B, genDeliveries(5, 5)...) // finish 4 and take 5 from chan buffer
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 5)
ready1, unAcked1B = consumerAck(t, queue1, consumer1B, ready1, unAcked1B, genDeliveries(5, 5)...) // ack 5 -> take 6

queue.StopConsuming()
assert.NoError(t, conn.stopHeartbeat())
consumer1B.FinishAll()
<-conn.Close()
time.Sleep(time.Millisecond)

cleanerConn, err := OpenConnection("cleaner-conn", "tcp", redisAddr, 1, nil)
assert.NoError(t, err)
unAcked1C := int64(0)
unAcked2C := int64(0)
cleaner := NewCleaner(cleanerConn)
returned, err := cleaner.Clean()
assert.NoError(t, err)
assert.Equal(t, int64(4), returned)
eventuallyReady(t, queue, 9) // 2 of 11 were acked above
queues, err = conn.GetOpenQueues()
assert.NoError(t, err)
assert.Len(t, queues, 2)
// queue1: 7..11 in ready. 2,3,4,6 in unacked. 1, 5 acked.
// queue2: 9 in ready, 2...8 in unacked, 1 acked.
assert.Equal(t, unAcked1A+unAcked1B+unAcked2A+unAcked2B, returned)
ready1 += unAcked1A + unAcked1B
ready2 += unAcked2A + unAcked2B

checkReadyAndUnAcked(t, queue1, ready1, unAcked1C)
checkReadyAndUnAcked(t, queue2, ready2, unAcked2C)
assertQueueNum(t, conn, 2)

conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
assert.NoError(t, err)
queue, err = conn.OpenQueue("q1")
queue1, err = conn.OpenQueue("q1")
assert.NoError(t, err)
assert.NoError(t, queue.StartConsuming(10, time.Millisecond))
consumer = NewTestConsumer("c-C")
assert.NoError(t, queue1.StartConsuming(ready1+1, time.Millisecond))
consumerC := NewTestConsumer("c-C")

_, err = queue.AddConsumer("consumer3", consumer)
_, err = queue1.AddConsumer("consumer3", consumerC)
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
assert.Eventually(t, func() bool {
return len(consumer.Deliveries()) == 9
return len(consumerC.Deliveries()) == int(ready1)
}, 10*time.Second, 2*time.Millisecond)

queue.StopConsuming()
assert.NoError(t, conn.stopHeartbeat())
<-conn.Close()
time.Sleep(time.Millisecond)

returned, err = cleaner.Clean()
Expand Down
14 changes: 8 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Connection interface {
OpenQueue(name string) (Queue, error)
CollectStats(queueList []string) (Stats, error)
GetOpenQueues() ([]string, error)
StopAllConsuming() <-chan struct{}
Close() <-chan struct{}

// internals
// used in cleaner
Expand Down Expand Up @@ -122,6 +122,7 @@ func (connection *redisConnection) heartbeat(errChan chan<- error) {
// continue below
case c := <-connection.heartbeatStop:
close(c)
connection.heartbeatStop = nil
return
}

Expand All @@ -136,7 +137,8 @@ func (connection *redisConnection) heartbeat(errChan chan<- error) {

if errorCount >= HeartbeatErrorLimit {
// reached error limit
connection.StopAllConsuming()
connection.heartbeatStop = nil
connection.Close()
// Clients reading from errChan need to see this error
// This allows them to shut themselves down
// Therefore we block adding it to errChan to ensure delivery
Expand Down Expand Up @@ -185,11 +187,11 @@ func (connection *redisConnection) GetOpenQueues() ([]string, error) {
return connection.redisClient.SMembers(queuesKey)
}

// StopAllConsuming stops consuming on all queues opened in this connection.
// Close stops consuming on all queues opened in this connection.
// It returns a channel which can be used to wait for all active consumers to
// finish their current Consume() call. This is useful to implement graceful
// shutdown.
func (connection *redisConnection) StopAllConsuming() <-chan struct{} {
func (connection *redisConnection) Close() <-chan struct{} {
connection.lock.Lock()
defer func() {
// regardless of how we exit this method, the connection is always stopped when we return
Expand All @@ -200,7 +202,7 @@ func (connection *redisConnection) StopAllConsuming() <-chan struct{} {
finishedChan := make(chan struct{})

// If we are already stopped or there are no open queues, then there is nothing to do
if connection.stopped || len(connection.openQueues) == 0 {
if connection.stopped {
close(finishedChan)
return finishedChan
}
Expand All @@ -215,6 +217,7 @@ func (connection *redisConnection) StopAllConsuming() <-chan struct{} {
for _, c := range chans {
<-c
}
connection.stopHeartbeat()
close(finishedChan)
// log.Printf("rmq connection stopped consuming %s", queue)
}()
Expand Down Expand Up @@ -295,7 +298,6 @@ func (connection *redisConnection) stopHeartbeat() error {
heartbeatStopped := make(chan struct{})
connection.heartbeatStop <- heartbeatStopped
<-heartbeatStopped
connection.heartbeatStop = nil // avoid stopping twice

count, err := connection.redisClient.Del(connection.heartbeatKey)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion example/batch_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func main() {
os.Exit(1)
}()

<-connection.StopAllConsuming() // wait for all Consume() calls to finish
<-connection.Close() // wait for all Consume() calls to finish
}

type BatchConsumer struct {
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
os.Exit(1)
}()

<-connection.StopAllConsuming() // wait for all Consume() calls to finish
<-connection.Close() // wait for all Consume() calls to finish
}

type Consumer struct {
Expand Down
Loading