Skip to content

Commit

Permalink
add ability to remove task
Browse files Browse the repository at this point in the history
  • Loading branch information
AlphaNecron committed Aug 18, 2023
1 parent 93f1717 commit 4fcdec9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
16 changes: 16 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
SetPushQueue(pushQueue Queue)
Remove(payload string, count int64, removeFromRejected bool) error
RemoveBytes(payload []byte, count int64, removeFromRejected bool) error
StartConsuming(prefetchLimit int64, pollDuration time.Duration) error
StopConsuming() <-chan struct{}
AddConsumer(tag string, consumer Consumer) (string, error)
Expand Down Expand Up @@ -119,6 +121,20 @@ func (queue *redisQueue) PublishBytes(payload ...[]byte) error {
return queue.Publish(stringifiedBytes...)
}

// Remove elements with specific value from the queue
func (queue *redisQueue) Remove(payload string, count int64, removeFromRejected bool) error {
_, err := queue.redisClient.LRem(queue.readyKey, count, payload)
if removeFromRejected {
queue.redisClient.LRem(queue.rejectedKey, count, payload)
}
return err
}

// RemoveBytes casts bytes to string and calls Remove
func (queue *redisQueue) RemoveBytes(payload []byte, count int64, removeFromRejected bool) error {
return queue.Remove(string(payload), count, removeFromRejected)
}

// SetPushQueue sets a push queue. In the consumer function you can call
// delivery.Push(). If a push queue is set the delivery then gets moved from
// the original queue to the push queue. If no push queue is set it's
Expand Down
8 changes: 8 additions & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ func (queue *TestQueue) PublishBytes(payload ...[]byte) error {
return queue.Publish(stringifiedBytes...)
}

func (queue *TestQueue) Remove(payload string, count int64, removeFromRejected bool) error {
panic(errorNotSupported)
}

func (queue *TestQueue) RemoveBytes(payload []byte, count int64, removeFromRejected bool) error {
return queue.Remove(string(payload), count, removeFromRejected)
}

func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) }
func (*TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) }
func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }
Expand Down

0 comments on commit 4fcdec9

Please sign in to comment.