Skip to content

Commit

Permalink
Merge pull request #1 from purposeinplay/fix_queue
Browse files Browse the repository at this point in the history
fix(queue): apply job queue to task config
  • Loading branch information
AlexHashgraph authored Nov 8, 2024
2 parents f5c3624 + 74ff8b4 commit ba96acf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
21 changes: 18 additions & 3 deletions asynq/asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ func (w Worker) Perform(job worker.Job) (*worker.JobInfo, error) {

task := asynq.NewTask(job.Handler, payload)

enqueue, err := w.client.Enqueue(task)
enqueueOpts := []asynq.Option{}
if job.Queue != "" {
enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue))
}

enqueue, err := w.client.Enqueue(task, enqueueOpts...)
if err != nil {
w.logger.Error(
"error enqueuing job",
Expand Down Expand Up @@ -154,7 +159,12 @@ func (w Worker) PerformAt(

task := asynq.NewTask(job.Handler, opts)

enqueue, err := w.client.Enqueue(task, asynq.ProcessAt(jobTime))
enqueueOpts := []asynq.Option{asynq.ProcessAt(jobTime)}
if job.Queue != "" {
enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue))
}

enqueue, err := w.client.Enqueue(task, enqueueOpts...)
if err != nil {
w.logger.Error(
"error enqueuing job",
Expand Down Expand Up @@ -199,7 +209,12 @@ func (w Worker) PerformIn(

task := asynq.NewTask(job.Handler, payload)

enqueue, err := w.client.Enqueue(task, asynq.ProcessIn(jobTime))
enqueueOpts := []asynq.Option{asynq.ProcessIn(jobTime)}
if job.Queue != "" {
enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue))
}

enqueue, err := w.client.Enqueue(task, enqueueOpts...)
if err != nil {
w.logger.Error(
"error enqueuing job",
Expand Down
12 changes: 12 additions & 0 deletions asynq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ type cfgOption struct {
func (o cfgOption) apply(opts *Options) {
opts.cfg = o.Cfg
}

func WithQueues(queues map[string]int) Option {

Check warning on line 46 in asynq/options.go

View workflow job for this annotation

GitHub Actions / Lint

exported: exported function WithQueues should have comment or be unexported (revive)
return optionFunc(func(opts *Options) {
opts.cfg.Queues = queues
})
}

type optionFunc func(*Options)

func (f optionFunc) apply(opts *Options) {
f(opts)
}

0 comments on commit ba96acf

Please sign in to comment.