-
Notifications
You must be signed in to change notification settings - Fork 8
/
eventstream.go
40 lines (32 loc) · 1.09 KB
/
eventstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package workflow
import (
"context"
"time"
)
// EventStreamer implementations should all be tested with adaptertest.TestEventStreamer
type EventStreamer interface {
NewProducer(ctx context.Context, topic string) (Producer, error)
NewConsumer(ctx context.Context, topic string, name string, opts ...ConsumerOption) (Consumer, error)
}
type Producer interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}
type Consumer interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
// Ack is used for the event streamer to safeUpdate its cursor of what messages have
// been consumed. If Ack is not called then the event streamer, depending on implementation,
// will likely not keep track of which records / events have been consumed.
type Ack func() error
type ConsumerOptions struct {
PollFrequency time.Duration
Lag time.Duration
}
type ConsumerOption func(*ConsumerOptions)
func WithConsumerPollFrequency(d time.Duration) ConsumerOption {
return func(opt *ConsumerOptions) {
opt.PollFrequency = d
}
}