diff --git a/README.md b/README.md index d8fe8a8..7334862 100644 --- a/README.md +++ b/README.md @@ -391,7 +391,7 @@ immediately, you can wait on the returned channel until all consumers are done: You can also stop consuming on all queues in your connection: ```go -finishedChan := connection.StopAllConsuming() +finishedChan := connection.Close() ``` Wait on the `finishedChan` to wait for all consumers on all queues to finish. diff --git a/cleaner_test.go b/cleaner_test.go index d0cc818..ecf7771 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -2,6 +2,7 @@ package rmq import ( "os" + "strconv" "testing" "time" @@ -21,6 +22,94 @@ func testRedis(t testing.TB) (addr string, closer func()) { return mr.Addr(), mr.Close } +func publishDeliveries(t *testing.T, queue Queue, ready, unAcked int64, deliveries ...string) (int64, int64) { + t.Helper() + checkReadyAndUnAcked(t, queue, ready, unAcked) + for _, delivery := range deliveries { + assert.NoError(t, queue.Publish(delivery)) + ready += 1 + checkReadyAndUnAcked(t, queue, ready, unAcked) + } + return ready, unAcked +} + +// consume cnt deliveries into chan buffer +func preConsume(t *testing.T, queue Queue, ready, unAcked, cnt int64) (int64, int64) { + t.Helper() + eventuallyUnacked(t, queue, unAcked) + assert.NoError(t, queue.StartConsuming(cnt, time.Millisecond)) + ready -= cnt + unAcked += cnt + checkReadyAndUnAcked(t, queue, ready, unAcked) + return ready, unAcked +} + +// ack finished deliveries +func consumerAck(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected ...string) (int64, int64) { + t.Helper() + for _, exp := range expected { + require.NotNil(t, consumer.Last()) + checkLast(t, queue, consumer, ready, unAcked, exp) + assert.NoError(t, consumer.Last().Ack()) + ready -= 1 + checkReadyAndUnAcked(t, queue, ready, unAcked) + } + return ready, unAcked +} + +// receive deliveries from chan buffer and finish them +func consumeWithoutAck(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected ...string) { + t.Helper() + for _, exp := range expected { + consumer.Finish() // unacked + time.Sleep(10 * time.Millisecond) + checkLast(t, queue, consumer, ready, unAcked, exp) + } +} + +// check last received deivery +func checkLast(t *testing.T, queue Queue, consumer *TestConsumer, ready, unAcked int64, expected string) { + t.Helper() + checkReadyAndUnAcked(t, queue, ready, unAcked) + assert.Equal(t, expected, consumer.Last().Payload()) +} + +func checkReadyAndUnAcked(t *testing.T, queue Queue, ready, unAcked int64) { + t.Helper() + eventuallyUnacked(t, queue, unAcked) + eventuallyReady(t, queue, ready) +} + +func genDeliveries(start, end int) []string { + res := make([]string, end-start+1) + for i := 0; i <= end-start; i++ { + res[i] = "del" + strconv.Itoa(i+start) + } + return res +} + +func assertQueueNum(t *testing.T, conn Connection, num int) { + t.Helper() + queues, err := conn.GetOpenQueues() + assert.NoError(t, err) + assert.Len(t, queues, num) +} + +func newManualConsumer(name string) *TestConsumer { + consumer := NewTestConsumer(name) + consumer.AutoFinish = false + consumer.AutoAck = false + return consumer +} + +func queueAddManualConsume(t *testing.T, queue Queue, name string) *TestConsumer { + consumer := newManualConsumer(name) + _, err := queue.AddConsumer(name, consumer) // take one 'del' from chan buffer and wait finish + assert.NoError(t, err) + time.Sleep(10 * time.Millisecond) + return consumer +} + func TestCleaner(t *testing.T) { redisAddr, closer := testRedis(t) defer closer() @@ -32,139 +121,96 @@ func TestCleaner(t *testing.T) { conn, err := OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) assert.NoError(t, err) - queues, err := conn.GetOpenQueues() - assert.NoError(t, err) - assert.Len(t, queues, 0) - queue, err := conn.OpenQueue("q1") - assert.NoError(t, err) - queues, err = conn.GetOpenQueues() - assert.NoError(t, err) - assert.Len(t, queues, 1) - _, err = conn.OpenQueue("q2") - assert.NoError(t, err) - queues, err = conn.GetOpenQueues() + assertQueueNum(t, conn, 0) + + queue1, err := conn.OpenQueue("q1") assert.NoError(t, err) - assert.Len(t, queues, 2) - - eventuallyReady(t, queue, 0) - assert.NoError(t, queue.Publish("del1")) - eventuallyReady(t, queue, 1) - assert.NoError(t, queue.Publish("del2")) - eventuallyReady(t, queue, 2) - assert.NoError(t, queue.Publish("del3")) - eventuallyReady(t, queue, 3) - assert.NoError(t, queue.Publish("del4")) - eventuallyReady(t, queue, 4) - assert.NoError(t, queue.Publish("del5")) - eventuallyReady(t, queue, 5) - assert.NoError(t, queue.Publish("del6")) - eventuallyReady(t, queue, 6) - - eventuallyUnacked(t, queue, 0) - assert.NoError(t, queue.StartConsuming(2, time.Millisecond)) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 4) - - consumer := NewTestConsumer("c-A") - consumer.AutoFinish = false - consumer.AutoAck = false + assertQueueNum(t, conn, 1) - _, err = queue.AddConsumer("consumer1", consumer) + queue2, err := conn.OpenQueue("q2") assert.NoError(t, err) - time.Sleep(10 * time.Millisecond) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 4) + assertQueueNum(t, conn, 2) - require.NotNil(t, consumer.Last()) - assert.Equal(t, "del1", consumer.Last().Payload()) - assert.NoError(t, consumer.Last().Ack()) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 3) + var ( + ready1 = int64(0) + unAcked1A = int64(0) + ready2 = int64(0) + unAcked2A = int64(0) + ) - consumer.Finish() + ready1, unAcked1A = publishDeliveries(t, queue1, ready1, unAcked1A, genDeliveries(1, 6)...) // pub 1...6 + ready1, unAcked1A = preConsume(t, queue1, ready1, unAcked1A, 2) // take 1, 2 into chan buffer + ready2, unAcked2A = publishDeliveries(t, queue2, ready2, unAcked2A, genDeliveries(1, 9)...) // pub 1...9 + ready2, unAcked2A = preConsume(t, queue2, ready2, unAcked2A, 5) // take 1...5 into chan buffer + + consumer1A := queueAddManualConsume(t, queue1, "C-1-A") // take 1 from chan buffer and wait finish + consumer2A := queueAddManualConsume(t, queue2, "C-2-A") // take 1 from chan buffer and wait finish + + ready1, unAcked1A = consumerAck(t, queue1, consumer1A, ready1, unAcked1A, genDeliveries(1, 1)...) // ack 1 -> take 3 + ready2, unAcked2A = consumerAck(t, queue2, consumer2A, ready2, unAcked2A, genDeliveries(1, 1)...) // ack 1 -> take 6 time.Sleep(10 * time.Millisecond) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 3) - assert.Equal(t, "del2", consumer.Last().Payload()) + consumer1A.FinishAll() + consumer2A.FinishAll() - queue.StopConsuming() - assert.NoError(t, conn.stopHeartbeat()) + <-conn.Close() time.Sleep(time.Millisecond) conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) assert.NoError(t, err) - queue, err = conn.OpenQueue("q1") + unAcked1B := int64(0) + unAcked2B := int64(0) + queue1, err = conn.OpenQueue("q1") + assert.NoError(t, err) + queue2, err = conn.OpenQueue("q2") assert.NoError(t, err) - assert.NoError(t, queue.Publish("del7")) - eventuallyReady(t, queue, 4) - assert.NoError(t, queue.Publish("del8")) - eventuallyReady(t, queue, 5) - assert.NoError(t, queue.Publish("del9")) - eventuallyReady(t, queue, 6) - assert.NoError(t, queue.Publish("del10")) - eventuallyReady(t, queue, 7) - assert.NoError(t, queue.Publish("del11")) - eventuallyReady(t, queue, 8) - - eventuallyUnacked(t, queue, 0) - assert.NoError(t, queue.StartConsuming(2, time.Millisecond)) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 6) - - consumer = NewTestConsumer("c-B") - consumer.AutoFinish = false - consumer.AutoAck = false + ready1, unAcked1B = publishDeliveries(t, queue1, ready1, unAcked1B, genDeliveries(7, 11)...) // pub 7...11 - _, err = queue.AddConsumer("consumer2", consumer) - assert.NoError(t, err) - time.Sleep(10 * time.Millisecond) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 6) - assert.Equal(t, "del4", consumer.Last().Payload()) + ready1, unAcked1B = preConsume(t, queue1, ready1, unAcked1B, 2) // take 4, 5 into chan buffer + ready2, unAcked2B = preConsume(t, queue2, ready2, unAcked2B, 2) // take 7, 8 into chan buffer - consumer.Finish() // unacked - time.Sleep(10 * time.Millisecond) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 6) + consumer1B := queueAddManualConsume(t, queue1, "C-1-B") // take 4 from chan buffer and wait finish - assert.Equal(t, "del5", consumer.Last().Payload()) - assert.NoError(t, consumer.Last().Ack()) + consumeWithoutAck(t, queue1, consumer1B, ready1, unAcked1B, genDeliveries(5, 5)...) // finish 4 and take 5 from chan buffer time.Sleep(10 * time.Millisecond) - eventuallyUnacked(t, queue, 2) - eventuallyReady(t, queue, 5) + ready1, unAcked1B = consumerAck(t, queue1, consumer1B, ready1, unAcked1B, genDeliveries(5, 5)...) // ack 5 -> take 6 - queue.StopConsuming() - assert.NoError(t, conn.stopHeartbeat()) + consumer1B.FinishAll() + <-conn.Close() time.Sleep(time.Millisecond) cleanerConn, err := OpenConnection("cleaner-conn", "tcp", redisAddr, 1, nil) assert.NoError(t, err) + unAcked1C := int64(0) + unAcked2C := int64(0) cleaner := NewCleaner(cleanerConn) returned, err := cleaner.Clean() assert.NoError(t, err) - assert.Equal(t, int64(4), returned) - eventuallyReady(t, queue, 9) // 2 of 11 were acked above - queues, err = conn.GetOpenQueues() - assert.NoError(t, err) - assert.Len(t, queues, 2) + // queue1: 7..11 in ready. 2,3,4,6 in unacked. 1, 5 acked. + // queue2: 9 in ready, 2...8 in unacked, 1 acked. + assert.Equal(t, unAcked1A+unAcked1B+unAcked2A+unAcked2B, returned) + ready1 += unAcked1A + unAcked1B + ready2 += unAcked2A + unAcked2B + + checkReadyAndUnAcked(t, queue1, ready1, unAcked1C) + checkReadyAndUnAcked(t, queue2, ready2, unAcked2C) + assertQueueNum(t, conn, 2) conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) assert.NoError(t, err) - queue, err = conn.OpenQueue("q1") + queue1, err = conn.OpenQueue("q1") assert.NoError(t, err) - assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) - consumer = NewTestConsumer("c-C") + assert.NoError(t, queue1.StartConsuming(ready1+1, time.Millisecond)) + consumerC := NewTestConsumer("c-C") - _, err = queue.AddConsumer("consumer3", consumer) + _, err = queue1.AddConsumer("consumer3", consumerC) assert.NoError(t, err) time.Sleep(10 * time.Millisecond) assert.Eventually(t, func() bool { - return len(consumer.Deliveries()) == 9 + return len(consumerC.Deliveries()) == int(ready1) }, 10*time.Second, 2*time.Millisecond) - queue.StopConsuming() - assert.NoError(t, conn.stopHeartbeat()) + <-conn.Close() time.Sleep(time.Millisecond) returned, err = cleaner.Clean() diff --git a/connection.go b/connection.go index 34a571e..1f706ea 100644 --- a/connection.go +++ b/connection.go @@ -26,7 +26,7 @@ type Connection interface { OpenQueue(name string) (Queue, error) CollectStats(queueList []string) (Stats, error) GetOpenQueues() ([]string, error) - StopAllConsuming() <-chan struct{} + Close() <-chan struct{} // internals // used in cleaner @@ -122,6 +122,7 @@ func (connection *redisConnection) heartbeat(errChan chan<- error) { // continue below case c := <-connection.heartbeatStop: close(c) + connection.heartbeatStop = nil return } @@ -136,7 +137,8 @@ func (connection *redisConnection) heartbeat(errChan chan<- error) { if errorCount >= HeartbeatErrorLimit { // reached error limit - connection.StopAllConsuming() + connection.heartbeatStop = nil + connection.Close() // Clients reading from errChan need to see this error // This allows them to shut themselves down // Therefore we block adding it to errChan to ensure delivery @@ -185,11 +187,11 @@ func (connection *redisConnection) GetOpenQueues() ([]string, error) { return connection.redisClient.SMembers(queuesKey) } -// StopAllConsuming stops consuming on all queues opened in this connection. +// Close stops consuming on all queues opened in this connection. // It returns a channel which can be used to wait for all active consumers to // finish their current Consume() call. This is useful to implement graceful // shutdown. -func (connection *redisConnection) StopAllConsuming() <-chan struct{} { +func (connection *redisConnection) Close() <-chan struct{} { connection.lock.Lock() defer func() { // regardless of how we exit this method, the connection is always stopped when we return @@ -200,7 +202,7 @@ func (connection *redisConnection) StopAllConsuming() <-chan struct{} { finishedChan := make(chan struct{}) // If we are already stopped or there are no open queues, then there is nothing to do - if connection.stopped || len(connection.openQueues) == 0 { + if connection.stopped { close(finishedChan) return finishedChan } @@ -215,6 +217,7 @@ func (connection *redisConnection) StopAllConsuming() <-chan struct{} { for _, c := range chans { <-c } + connection.stopHeartbeat() close(finishedChan) // log.Printf("rmq connection stopped consuming %s", queue) }() @@ -295,7 +298,6 @@ func (connection *redisConnection) stopHeartbeat() error { heartbeatStopped := make(chan struct{}) connection.heartbeatStop <- heartbeatStopped <-heartbeatStopped - connection.heartbeatStop = nil // avoid stopping twice count, err := connection.redisClient.Del(connection.heartbeatKey) if err != nil { diff --git a/example/batch_consumer/main.go b/example/batch_consumer/main.go index 3e42c8d..c494809 100644 --- a/example/batch_consumer/main.go +++ b/example/batch_consumer/main.go @@ -55,7 +55,7 @@ func main() { os.Exit(1) }() - <-connection.StopAllConsuming() // wait for all Consume() calls to finish + <-connection.Close() // wait for all Consume() calls to finish } type BatchConsumer struct { diff --git a/example/consumer/main.go b/example/consumer/main.go index c8b6166..03fbfcb 100644 --- a/example/consumer/main.go +++ b/example/consumer/main.go @@ -56,7 +56,7 @@ func main() { os.Exit(1) }() - <-connection.StopAllConsuming() // wait for all Consume() calls to finish + <-connection.Close() // wait for all Consume() calls to finish } type Consumer struct { diff --git a/queue_test.go b/queue_test.go index 8abf993..7cffaf3 100644 --- a/queue_test.go +++ b/queue_test.go @@ -700,14 +700,14 @@ func TestStopConsuming_BatchConsumer(t *testing.T) { assert.NoError(t, connection.stopHeartbeat()) } -func TestConnection_StopAllConsuming_CantOpenQueue(t *testing.T) { +func TestConnection_Close_CantOpenQueue(t *testing.T) { redisAddr, closer := testRedis(t) defer closer() connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) assert.NoError(t, err) - finishedChan := connection.StopAllConsuming() + finishedChan := connection.Close() require.NotNil(t, finishedChan) <-finishedChan // wait for stopping to finish @@ -716,7 +716,7 @@ func TestConnection_StopAllConsuming_CantOpenQueue(t *testing.T) { require.Equal(t, ErrorConsumingStopped, err) } -func TestConnection_StopAllConsuming_CantStartConsuming(t *testing.T) { +func TestConnection_Close_CantStartConsuming(t *testing.T) { redisAddr, closer := testRedis(t) defer closer() @@ -727,7 +727,7 @@ func TestConnection_StopAllConsuming_CantStartConsuming(t *testing.T) { _, err = queue.PurgeReady() assert.NoError(t, err) - finishedChan := connection.StopAllConsuming() + finishedChan := connection.Close() require.NotNil(t, finishedChan) <-finishedChan // wait for stopping to finish @@ -754,7 +754,7 @@ func TestQueue_StopConsuming_CantStartConsuming(t *testing.T) { require.Equal(t, ErrorConsumingStopped, err) } -func TestConnection_StopAllConsuming_CantAddConsumer(t *testing.T) { +func TestConnection_Close_CantAddConsumer(t *testing.T) { redisAddr, closer := testRedis(t) defer closer() @@ -767,7 +767,7 @@ func TestConnection_StopAllConsuming_CantAddConsumer(t *testing.T) { assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) - finishedChan := connection.StopAllConsuming() + finishedChan := connection.Close() require.NotNil(t, finishedChan) <-finishedChan // wait for stopping to finish diff --git a/test_connection.go b/test_connection.go index 0dbd370..fdf343f 100644 --- a/test_connection.go +++ b/test_connection.go @@ -25,7 +25,7 @@ func (connection TestConnection) OpenQueue(name string) (Queue, error) { func (TestConnection) CollectStats([]string) (Stats, error) { panic(errorNotSupported) } func (TestConnection) GetOpenQueues() ([]string, error) { panic(errorNotSupported) } -func (TestConnection) StopAllConsuming() <-chan struct{} { panic(errorNotSupported) } +func (TestConnection) Close() <-chan struct{} { panic(errorNotSupported) } func (TestConnection) checkHeartbeat() error { panic(errorNotSupported) } func (TestConnection) getConnections() ([]string, error) { panic(errorNotSupported) } func (TestConnection) hijackConnection(string) Connection { panic(errorNotSupported) }