From 4fcdec9708833a022018e4ea86e2d4bdc591c23a Mon Sep 17 00:00:00 2001 From: Nguyen Thanh Quang Date: Fri, 18 Aug 2023 18:19:19 +0700 Subject: [PATCH] add ability to remove task --- queue.go | 16 ++++++++++++++++ test_queue.go | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/queue.go b/queue.go index c246ead..e53686d 100644 --- a/queue.go +++ b/queue.go @@ -18,6 +18,8 @@ type Queue interface { Publish(payload ...string) error PublishBytes(payload ...[]byte) error SetPushQueue(pushQueue Queue) + Remove(payload string, count int64, removeFromRejected bool) error + RemoveBytes(payload []byte, count int64, removeFromRejected bool) error StartConsuming(prefetchLimit int64, pollDuration time.Duration) error StopConsuming() <-chan struct{} AddConsumer(tag string, consumer Consumer) (string, error) @@ -119,6 +121,20 @@ func (queue *redisQueue) PublishBytes(payload ...[]byte) error { return queue.Publish(stringifiedBytes...) } +// Remove elements with specific value from the queue +func (queue *redisQueue) Remove(payload string, count int64, removeFromRejected bool) error { + _, err := queue.redisClient.LRem(queue.readyKey, count, payload) + if removeFromRejected { + queue.redisClient.LRem(queue.rejectedKey, count, payload) + } + return err +} + +// RemoveBytes casts bytes to string and calls Remove +func (queue *redisQueue) RemoveBytes(payload []byte, count int64, removeFromRejected bool) error { + return queue.Remove(string(payload), count, removeFromRejected) +} + // SetPushQueue sets a push queue. In the consumer function you can call // delivery.Push(). If a push queue is set the delivery then gets moved from // the original queue to the push queue. If no push queue is set it's diff --git a/test_queue.go b/test_queue.go index b4664c4..6dbfea8 100644 --- a/test_queue.go +++ b/test_queue.go @@ -30,6 +30,14 @@ func (queue *TestQueue) PublishBytes(payload ...[]byte) error { return queue.Publish(stringifiedBytes...) } +func (queue *TestQueue) Remove(payload string, count int64, removeFromRejected bool) error { + panic(errorNotSupported) +} + +func (queue *TestQueue) RemoveBytes(payload []byte, count int64, removeFromRejected bool) error { + return queue.Remove(string(payload), count, removeFromRejected) +} + func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) } func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) } func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }