-
Notifications
You must be signed in to change notification settings - Fork 206
/
main.go
127 lines (108 loc) · 2.66 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/adjust/rmq/v5"
)
const (
prefetchLimit = 1000
pollDuration = 100 * time.Millisecond
numConsumers = 5
reportBatchSize = 10000
consumeDuration = time.Millisecond
shouldLog = false
)
func main() {
errChan := make(chan error, 10)
go logErrors(errChan)
connection, err := rmq.OpenConnection("consumer", "tcp", "localhost:6379", 2, errChan)
if err != nil {
panic(err)
}
queue, err := connection.OpenQueue("things")
if err != nil {
panic(err)
}
if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
panic(err)
}
for i := 0; i < numConsumers; i++ {
name := fmt.Sprintf("consumer %d", i)
if _, err := queue.AddConsumer(name, NewConsumer(i)); err != nil {
panic(err)
}
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
defer signal.Stop(signals)
<-signals // wait for signal
go func() {
<-signals // hard exit on second signal (in case shutdown gets stuck)
os.Exit(1)
}()
<-connection.StopAllConsuming() // wait for all Consume() calls to finish
}
type Consumer struct {
name string
count int
before time.Time
}
func NewConsumer(tag int) *Consumer {
return &Consumer{
name: fmt.Sprintf("consumer%d", tag),
count: 0,
before: time.Now(),
}
}
func (consumer *Consumer) Consume(delivery rmq.Delivery) {
payload := delivery.Payload()
debugf("start consume %s", payload)
time.Sleep(consumeDuration)
consumer.count++
if consumer.count%reportBatchSize == 0 {
duration := time.Now().Sub(consumer.before)
consumer.before = time.Now()
perSecond := time.Second / (duration / reportBatchSize)
log.Printf("%s consumed %d %s %d", consumer.name, consumer.count, payload, perSecond)
}
if consumer.count%reportBatchSize > 0 {
if err := delivery.Ack(); err != nil {
debugf("failed to ack %s: %s", payload, err)
} else {
debugf("acked %s", payload)
}
} else { // reject one per batch
if err := delivery.Reject(); err != nil {
debugf("failed to reject %s: %s", payload, err)
} else {
debugf("rejected %s", payload)
}
}
}
func logErrors(errChan <-chan error) {
for err := range errChan {
switch err := err.(type) {
case *rmq.HeartbeatError:
if err.Count == rmq.HeartbeatErrorLimit {
log.Print("heartbeat error (limit): ", err)
} else {
log.Print("heartbeat error: ", err)
}
case *rmq.ConsumeError:
log.Print("consume error: ", err)
case *rmq.DeliveryError:
log.Print("delivery error: ", err.Delivery, err)
default:
log.Print("other error: ", err)
}
}
}
func debugf(format string, args ...interface{}) {
if shouldLog {
log.Printf(format, args...)
}
}