-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
165 lines (138 loc) · 3.61 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package postq
import (
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
// Event represents the event queue table.
// The table must have the following fields.
type Event struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Error *string `json:"error"`
Attempts int `json:"attempts"`
LastAttempt *time.Time `json:"last_attempt"`
Properties map[string]string `json:"properties"`
CreatedAt time.Time `json:"created_at"`
Priority int `json:"priority"`
}
func (t Event) TableName() string {
return "event_queue"
}
func (t *Event) SetError(err string) {
t.Error = &err
}
// Scan scans pgx rows into Event
func (t *Event) Scan(rows pgx.Row) error {
err := rows.Scan(
&t.ID,
&t.Name,
&t.CreatedAt,
&t.Properties,
&t.Error,
&t.LastAttempt,
&t.Attempts,
&t.Priority,
)
if err != nil {
return err
}
return nil
}
type Events []Event
// Recreate creates the given failed events in batches after updating the
// attempts count.
func (events Events) Recreate(ctx Context, tx *pgx.Conn) error {
if len(events) == 0 {
return nil
}
var batch pgx.Batch
for _, event := range events {
attempts := event.Attempts + 1
query := `INSERT INTO event_queue
(name, properties, error, last_attempt, attempts, priority)
VALUES($1, $2, $3, NOW(), $4, $5)`
batch.Queue(query, event.Name, event.Properties, event.Error, attempts, event.Priority)
}
br := tx.SendBatch(ctx, &batch)
defer br.Close()
for {
rows, err := br.Query()
rows.Close()
if err != nil {
break
}
}
return nil
}
type EventFetcherOption struct {
// MaxAttempts is the number of times an event is attempted to process
// default: 3
MaxAttempts int
// BaseDelay is the base delay between retries
// default: 60 seconds
BaseDelay int
// Exponent is the exponent of the base delay
// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
Exponent int
}
// fetchEvents fetches given watch events from the `event_queue` table.
func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, opts *EventFetcherOption) ([]Event, error) {
if batchSize == 0 {
batchSize = 1
}
const selectEventsQuery = `
DELETE FROM event_queue
WHERE id IN (
SELECT id FROM event_queue
WHERE
attempts <= @maxAttempts AND
name = ANY(@events) AND
(last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponent))
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT @batchSize
)
RETURNING id, name, created_at, properties, error, last_attempt, attempts, priority
`
args := pgx.NamedArgs{
"events": watchEvents,
"batchSize": batchSize,
"maxAttempts": 3,
"baseDelay": 60,
"exponent": 5,
}
if opts != nil {
if opts.MaxAttempts > 0 {
args["maxAttempts"] = opts.MaxAttempts
}
if opts.BaseDelay > 0 {
args["baseDelay"] = opts.BaseDelay
}
if opts.Exponent > 0 {
args["exponent"] = opts.Exponent
}
}
rows, err := tx.Query(ctx, selectEventsQuery, args)
if err != nil {
return nil, fmt.Errorf("error selecting events: %w", err)
}
defer rows.Close()
var events []Event
for rows.Next() {
var e Event
if err := e.Scan(rows); err != nil {
return nil, fmt.Errorf("error scanning row: %w", err)
}
events = append(events, e)
}
if rows.Err() != nil {
return nil, fmt.Errorf("error iterating rows: %w", rows.Err())
}
if len(events) > 0 {
ctx.Tracef("%s %d events fetched", strings.Join(watchEvents, ","), len(events))
}
return events, nil
}