forked from Graphmasters/occamy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
290 lines (232 loc) · 8.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package main
import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"runtime"
"time"
"github.com/streadway/amqp"
"github.com/Graphmasters/occamy"
)
const (
AMQPURL string = "amqp://localhost:5672" // AMQPURL is the address of the AMQP server.
RequestExchange string = "service_name-request" // RequestExchange is the exchange for request messages.
ControlExchange string = "service_name-control" // ControlExchange is the exchange for control mesagges.
HeaderKeyTaskID string = "task-id" // HeaderKeyTaskID is the key whose value in the header corresponds to the task ID.
)
// main starts an occamy server handling AMQP messages. The reader is expected
// to already be familiar with AMQP.
//
// To communicate to this application messages should be set to either the
// request or control exchange. It is recommended against sending messages
// directly to the queues.
//
// This application is designed to run forever and fail if one thing goes wrong.
// In practice one would implement graceful handling of errors and graceful
// shutdown logic.
//
// *IMPORTANT* The AMQP library used here does NOT implement any reconnection
// logic. Implementing reconnection logic is highly recommended.
func main() {
slots := flag.Int("slots", runtime.NumCPU(), "the number of slots/maximum number of concurrent tasks")
flag.Parse()
server := NewAMQPServer(occamy.ServerConfig{
Slots: *slots,
KillTimeout: time.Millisecond,
HeaderKeyTaskID: HeaderKeyTaskID,
Handler: handle,
Monitors: occamy.Monitors{},
})
connection := createConnection(AMQPURL)
declareExchanges(connection, RequestExchange, ControlExchange)
go consumeControlMessages(server, connection)
consumeRequestMessages(server, connection, *slots)
}
// region AMQP Message Wrapper
// AMQPMessage is a wrapper for amqp.Delivery which implements the
// occamy.Message interface.
type AMQPMessage struct {
msg amqp.Delivery
}
// ConvertToAMQPMessage creates an AMQPMessage from an amqp.Delivery.
func ConvertToAMQPMessage(delivery amqp.Delivery) *AMQPMessage {
return &AMQPMessage{msg: delivery}
}
func (m *AMQPMessage) Body() []byte {
return m.msg.Body
}
func (m *AMQPMessage) Headers() occamy.Headers {
headers := make(occamy.Headers)
if len(m.msg.Headers) != 0 {
for k, v := range m.msg.Headers {
headers[k] = v
}
}
return headers
}
func (m *AMQPMessage) Ack() error {
return m.msg.Ack(false)
}
func (m *AMQPMessage) Reject(requeue bool) error {
return m.msg.Reject(requeue)
}
// endregion
// region AMQP Server Wrapper
// AMQPServer is a wrapper for the occamy.Server which includes methods for
// and handling amqp.Delivery messages.
type AMQPServer struct {
occamy *occamy.Server
}
func NewAMQPServer(config occamy.ServerConfig) *AMQPServer {
return &AMQPServer{
occamy: occamy.NewServer(config),
}
}
func (server *AMQPServer) HandleControlMsg(msg amqp.Delivery) {
server.occamy.HandleControlMsg(ConvertToAMQPMessage(msg))
}
func (server *AMQPServer) HandleRequestMsg(msg amqp.Delivery) {
server.occamy.HandleRequestMsg(ConvertToAMQPMessage(msg))
}
func (server *AMQPServer) Shutdown(ctx context.Context) {
server.occamy.Shutdown(ctx)
}
// endregion
// region Connection
// createConnection creates an AMQP connection.
func createConnection(url string) *amqp.Connection {
connection, err := amqp.Dial(url)
if err != nil {
log.Fatalf("unable to start connection: %v", err)
}
log.Println("connection establish")
return connection
}
// endregion
// region Consume Control Messages
// consumeControlMessages declares a queue for control messages. A consumer is
// connected to this queue and messages are passed to the occamy server.
func consumeControlMessages(server *AMQPServer, connection *amqp.Connection) {
channel, err := connection.Channel()
if err != nil {
log.Fatalf("failed to create channel to control consumer: %v", err)
}
err = channel.Qos(100, 0, false)
if err != nil {
log.Fatalf("failed to set prefetch to control consumer: %v", err)
}
// *IMPORTANT*: It is important that every message goes to every running
// instance of this application. This means each application
// should have its own EXCLUSIVE queue for control messages and that these
// can be removed once the application stops. It is for this reason the
// queue name is random (and hopefully unique) for each application.
name := fmt.Sprintf("%s_%05d", ControlExchange, rand.Intn(10000))
const (
autoDelete = true
durable = false
exclusive = true
)
queue, err := channel.QueueDeclare(name, durable, autoDelete, exclusive, false, nil)
if err != nil {
log.Fatalf("failed to declare queue for control consumer: %v", err)
}
err = channel.QueueBind(queue.Name, "#", ControlExchange, false, nil)
if err != nil {
log.Fatalf("failed to bind queue for control consumer: %v", err)
}
msgs, err := channel.Consume(queue.Name, "", false, exclusive, false, false, nil)
if err != nil {
log.Fatalf("failed to start consuming queue for control consumer: %v", err)
}
log.Println("consuming control messages")
for {
msg, ok := <-msgs
if !ok {
log.Fatal("no more messages in control consumer - this was expected to go forever")
}
server.HandleControlMsg(msg)
}
}
// endregion
// region Consume Request Messages
// consumeRequestMessages declares a queue for request messages. A consumer is
// connected to this queue and messages are passed to the occamy server.
func consumeRequestMessages(server *AMQPServer, connection *amqp.Connection, slots int) {
channel, err := connection.Channel()
if err != nil {
log.Fatalf("failed to create channel to request consumer: %v", err)
}
// The prefetch should be equal to the number of slots. If the prefetch is
// higher the message will be instantly rejected and requeued, while if it
// is lower the messages will be unnecessarily waiting.
err = channel.Qos(slots, 0, false)
if err != nil {
log.Fatalf("failed to set prefetch to request consumer: %v", err)
}
// *IMPORTANT*: It is important that every messages goes to precisely one
// running instance of this application (unless it gets requeued). To share
// a queue the queue names just need to match. The queue must not be
// exclusive.
name := RequestExchange
const (
autoDelete = false
durable = true
exclusive = false
)
queue, err := channel.QueueDeclare(name, durable, autoDelete, exclusive, false, nil)
if err != nil {
log.Fatalf("failed to declare queue for request consumer: %v", err)
}
err = channel.QueueBind(queue.Name, "#", RequestExchange, false, nil)
if err != nil {
log.Fatalf("failed to bind queue for control consumer: %v", err)
}
msgs, err := channel.Consume(queue.Name, "", false, exclusive, false, false, nil)
if err != nil {
log.Fatalf("failed to start consuming queue for request consumer: %v", err)
}
log.Println("consuming request messages")
for {
msg, ok := <-msgs
if !ok {
log.Fatal("no more messages in control consumer - this was expected to go forever")
}
server.HandleRequestMsg(msg)
}
}
// endregion
// region Exchange Declare
// declareExchanges declares the exchanges necessary for communication to the
// application.
func declareExchanges(connection *amqp.Connection, exchanges ...string) {
channel, err := connection.Channel()
if err != nil {
log.Fatalf("unable to create channel to declare exchanges: %v", err)
}
// The queues exchanges should persist, so they can be used after restarts
// of the applications.
const (
durable = true
autoDelete = false
internal = false
noWait = false
)
for _, exchange := range exchanges {
err = channel.ExchangeDeclare(exchange, amqp.ExchangeTopic, durable, autoDelete, internal, noWait, nil)
if err != nil {
log.Fatalf("unable to declare exchange %s: %v", exchange, err)
}
}
log.Println("exchanges declared")
}
// endregion
// region Handle
// handle is a stand in for your own handler.
func handle(_ occamy.Headers, body []byte) (occamy.Task, error) {
// Implement your own task and handle logic!
log.Printf("unable to handle message with body: %s\n", string(body))
return nil, fmt.Errorf("handle function not implemented")
}
// endregion