diff --git a/asynq/asynq.go b/asynq/asynq.go index 226d8c0..ccf1750 100644 --- a/asynq/asynq.go +++ b/asynq/asynq.go @@ -107,7 +107,12 @@ func (w Worker) Perform(job worker.Job) (*worker.JobInfo, error) { task := asynq.NewTask(job.Handler, payload) - enqueue, err := w.client.Enqueue(task) + enqueueOpts := []asynq.Option{} + if job.Queue != "" { + enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue)) + } + + enqueue, err := w.client.Enqueue(task, enqueueOpts...) if err != nil { w.logger.Error( "error enqueuing job", @@ -154,7 +159,12 @@ func (w Worker) PerformAt( task := asynq.NewTask(job.Handler, opts) - enqueue, err := w.client.Enqueue(task, asynq.ProcessAt(jobTime)) + enqueueOpts := []asynq.Option{asynq.ProcessAt(jobTime)} + if job.Queue != "" { + enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue)) + } + + enqueue, err := w.client.Enqueue(task, enqueueOpts...) if err != nil { w.logger.Error( "error enqueuing job", @@ -199,7 +209,12 @@ func (w Worker) PerformIn( task := asynq.NewTask(job.Handler, payload) - enqueue, err := w.client.Enqueue(task, asynq.ProcessIn(jobTime)) + enqueueOpts := []asynq.Option{asynq.ProcessIn(jobTime)} + if job.Queue != "" { + enqueueOpts = append(enqueueOpts, asynq.Queue(job.Queue)) + } + + enqueue, err := w.client.Enqueue(task, enqueueOpts...) if err != nil { w.logger.Error( "error enqueuing job", diff --git a/asynq/options.go b/asynq/options.go index be37b4b..f047c92 100644 --- a/asynq/options.go +++ b/asynq/options.go @@ -42,3 +42,15 @@ type cfgOption struct { func (o cfgOption) apply(opts *Options) { opts.cfg = o.Cfg } + +func WithQueues(queues map[string]int) Option { + return optionFunc(func(opts *Options) { + opts.cfg.Queues = queues + }) +} + +type optionFunc func(*Options) + +func (f optionFunc) apply(opts *Options) { + f(opts) +}