-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue.go
147 lines (129 loc) · 3.88 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package asyncer
import (
"context"
"errors"
"runtime"
"time"
"github.com/hibiken/asynq"
"golang.org/x/sync/errgroup"
)
type (
// QueueServer is a wrapper for asynq.Server.
QueueServer struct {
asynq *asynq.Server
}
// QueueServerOption is a function that configures a QueueServer.
QueueServerOption func(*asynq.Config)
)
// NewQueueServer creates a new instance of QueueServer.
// It takes a redis connection option and optional queue server options.
// The function returns a pointer to the created QueueServer.
func NewQueueServer(redisConnOpt asynq.RedisConnOpt, opts ...QueueServerOption) *QueueServer {
// Get the number of available CPUs.
useProcs := runtime.GOMAXPROCS(0)
if useProcs == 0 {
useProcs = 1
} else if useProcs > 1 {
useProcs = useProcs / 2
}
// Default queue options
var (
workerConcurrency = useProcs // use half of the available CPUs
workerShutdownTimeout = time.Second * 10
workerLogLevel = "info"
queueName = "default"
)
cnf := asynq.Config{
Concurrency: workerConcurrency,
LogLevel: castToAsynqLogLevel(workerLogLevel),
ShutdownTimeout: workerShutdownTimeout,
Queues: map[string]int{
queueName: workerConcurrency,
},
}
// Apply options
for _, opt := range opts {
opt(&cnf)
}
return &QueueServer{asynq: asynq.NewServer(redisConnOpt, cnf)}
}
// Run starts the queue server and registers the provided task handlers.
// It returns a function that can be used to run server in a error group.
// E.g.:
//
// eg, ctx := errgroup.WithContext(context.Background())
// eg.Go(queueServer.Run(
// yourapp.NewTaskHandler1(),
// yourapp.NewTaskHandler2(),
// ))
//
// The function returns an error if the server fails to start.
func (srv *QueueServer) Run(handlers ...TaskHandler) func() error {
return func() error {
mux := asynq.NewServeMux()
// Register handlers
for _, h := range handlers {
handlerFunc := func(
fn func(ctx context.Context, payload []byte) error,
) func(ctx context.Context, t *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
return fn(ctx, t.Payload())
}
}
mux.HandleFunc(h.TaskName(), handlerFunc(h.Handle))
}
// Run server
if err := srv.asynq.Run(mux); err != nil {
return errors.Join(ErrFailedToStartQueueServer, err)
}
return nil
}
}
// Shutdown gracefully shuts down the queue server by waiting for all
// in-flight tasks to finish processing before shutdown.
func (srv *QueueServer) Shutdown() {
srv.asynq.Stop()
srv.asynq.Shutdown()
}
// RunQueueServer starts the queue server and registers the provided task handlers.
// It returns a function that can be used to run server in a error group.
// E.g.:
//
// eg, _ := errgroup.WithContext(context.Background())
// eg.Go(asyncer.RunQueueServer(
// "redis://localhost:6379",
// logger,
// asyncer.HandlerFunc[PayloadStruct1]("task1", task1Handler),
// asyncer.HandlerFunc[PayloadStruct2]("task2", task2Handler),
// ))
//
// func task1Handler(ctx context.Context, payload PayloadStruct1) error {
// // ... handle task here ...
// }
//
// func task2Handler(ctx context.Context, payload PayloadStruct2) error {
// // ... handle task here ...
// }
//
// The function panics if the redis connection string is invalid.
// The function returns an error if the server fails to start.
func RunQueueServer(ctx context.Context, redisConnStr string, log asynq.Logger, handlers ...TaskHandler) func() error {
// Redis connect options for asynq client
redisConnOpt, err := asynq.ParseRedisURI(redisConnStr)
if err != nil {
panic(errors.Join(ErrFailedToRunQueueServer, err))
}
// Queue server options
var opts []QueueServerOption
if log != nil {
opts = append(opts, WithQueueLogger(log))
}
return func() error {
srv := NewQueueServer(redisConnOpt, opts...)
defer srv.Shutdown()
// Run server
eg, _ := errgroup.WithContext(ctx)
eg.Go(srv.Run(handlers...))
return eg.Wait()
}
}