-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_consumer.go
79 lines (61 loc) · 2.1 KB
/
async_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package postq
import (
"container/ring"
"fmt"
)
// AsyncEventHandlerFunc processes multiple events and returns the failed ones
type AsyncEventHandlerFunc func(Context, Events) Events
type AsyncEventConsumer struct {
eventLog *ring.Ring
// Name of the events in the push queue to watch for.
WatchEvents []string
// Number of events to be fetched and processed at a time.
BatchSize int
// An async event handler that consumes events.
Consumer AsyncEventHandlerFunc
// ConsumerOption is the configuration for the PGConsumer.
ConsumerOption *ConsumerOption
// EventFetcherOption contains configuration on how the events should be fetched.
EventFetcherOption *EventFetcherOption
}
// RecordEvents will record all the events fetched by the consumer in a ring buffer.
func (t *AsyncEventConsumer) RecordEvents(size int) {
t.eventLog = ring.New(size)
}
func (t AsyncEventConsumer) GetRecords() ([]Event, error) {
if t.eventLog == nil {
return nil, fmt.Errorf("event log is not initialized")
}
return getRecords(t.eventLog), nil
}
func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
tx, err := ctx.Pool().Begin(ctx)
if err != nil {
return 0, fmt.Errorf("error initiating db tx: %w", err)
}
defer tx.Rollback(ctx) //nolint:errcheck
events, err := fetchEvents(ctx, tx, t.WatchEvents, t.BatchSize, t.EventFetcherOption)
if err != nil {
return 0, fmt.Errorf("error fetching events: %w", err)
}
if t.eventLog != nil {
for _, event := range events {
t.eventLog.Value = event
t.eventLog = t.eventLog.Next()
}
}
failedEvents := t.Consumer(ctx, events)
if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil {
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}
return len(events), tx.Commit(ctx)
}
func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error) {
return NewPGConsumer(t.Handle, t.ConsumerOption)
}
// AsyncHandler converts the given user defined handler into a async event handler.
func AsyncHandler[T Context](fn func(ctx T, e Events) Events) AsyncEventHandlerFunc {
return func(ctx Context, e Events) Events {
return fn(ctx.(T), e)
}
}