-
Notifications
You must be signed in to change notification settings - Fork 8
/
await.go
106 lines (89 loc) · 2.3 KB
/
await.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package workflow
import (
"context"
"errors"
"strconv"
"time"
)
func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) {
var opt awaitOpts
for _, option := range opts {
option(&opt)
}
pollFrequency := w.defaultOpts.pollingFrequency
if opt.pollFrequency > 0 {
pollFrequency = opt.pollFrequency
}
role := makeRole("await", w.Name, strconv.FormatInt(int64(status), 10), foreignID)
return awaitWorkflowStatusByForeignID[Type, Status](ctx, w, status, foreignID, runID, role, pollFrequency)
}
func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Run[Type, Status], error) {
topic := Topic(w.Name, int(status))
// Terminal statuses result in the RunState changing to Completed and are stored in the RunStateChangeTopic
// as it is a key event in the Workflow Run's lifecycle.
if w.statusGraph.IsTerminal(int(status)) {
topic = RunStateChangeTopic(w.Name)
}
stream, err := w.eventStreamer.NewConsumer(
ctx,
topic,
role,
WithConsumerPollFrequency(pollFrequency),
)
if err != nil {
return nil, err
}
defer stream.Close()
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
e, ack, err := stream.Recv(ctx)
if err != nil {
return nil, err
}
shouldFilter := FilterUsing(e,
filterByForeignID(foreignID),
filterByRunID(runID),
)
if shouldFilter {
err = ack()
if err != nil {
return nil, err
}
continue
}
r, err := w.recordStore.Lookup(ctx, e.ForeignID)
if errors.Is(err, ErrRecordNotFound) {
err = ack()
if err != nil {
return nil, err
}
continue
} else if err != nil {
return nil, err
}
var t Type
err = Unmarshal(r.Object, &t)
if err != nil {
return nil, err
}
return &Run[Type, Status]{
TypedRecord: TypedRecord[Type, Status]{
Record: *r,
Status: Status(r.Status),
Object: &t,
},
controller: NewRunStateController(w.recordStore.Store, r),
}, ack()
}
}
type awaitOpts struct {
pollFrequency time.Duration
}
type AwaitOption func(o *awaitOpts)
func WithAwaitPollingFrequency(d time.Duration) AwaitOption {
return func(o *awaitOpts) {
o.pollFrequency = d
}
}