-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventTypeSwimlaneDispatcher.go
86 lines (73 loc) · 2.34 KB
/
eventTypeSwimlaneDispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package eventuate
import (
"github.com/eventuate-clients/eventuate-client-golang/future"
loglib "github.com/eventuate-clients/eventuate-client-golang/logger"
)
type eventPack struct {
eventData interface{}
eventMeta *EventMetadata
fr *future.Result
}
type eventLane chan eventPack
type EventTypeSwimlaneDispatcher struct {
EventDispatcher
queues map[string]map[int]eventLane
ll loglib.LogLevelEnum
lg loglib.Logger
}
func NewEventTypeSwimlaneDispatcher(eventHandlers *EventResultHandlerMap) (Dispatcher, error) {
result := &EventTypeSwimlaneDispatcher{
EventDispatcher: EventDispatcher{
handlers: eventHandlers,
},
queues: make(map[string]map[int]eventLane),
lg: loglib.NewNilLogger()}
return result, nil
}
func (dsp *EventTypeSwimlaneDispatcher) SetLogLevel(level loglib.LogLevelEnum) *EventTypeSwimlaneDispatcher {
dsp.ll = level
dsp.lg = loglib.NewLogger(level)
return dsp
}
func (dsp *EventTypeSwimlaneDispatcher) Dispatch(data interface{}, evt *EventMetadata) future.Settler {
swimlaneQ, haveSwimalneQ := dsp.queues[evt.EntityType]
if !haveSwimalneQ {
swimlaneQ = make(map[int]eventLane)
dsp.queues[evt.EntityType] = swimlaneQ
}
q, haveQ := dsp.queues[evt.EntityType][evt.SwimLane]
if !haveQ {
q = make(eventLane, 16)
dsp.queues[evt.EntityType][evt.SwimLane] = q
go func() {
defer func() {
close(q)
delete(dsp.queues[evt.EntityType], evt.SwimLane)
}()
for {
pack := <-q
fr := pack.fr
meta := pack.eventMeta
data := pack.eventData
if fr == nil || evt == nil {
return
}
dsp.lg.Println("go EventTypeSwimlaneDispatcher.Dispatch (go). Before calling a handler for EntityType-EventType", evt.EntityType, evt.EventType)
rslt := dsp.EventDispatcher.Dispatch(data, meta)
dsp.lg.Println("go EventTypeSwimlaneDispatcher.Dispatch (go). FR await ", rslt, " => ", *fr)
value, err := rslt.GetValue() // blocking here
dsp.lg.Println("go EventTypeSwimlaneDispatcher.Dispatch (go). Result ", rslt, " received ")
(*fr).Settle(value, err)
dsp.lg.Println("go EventTypeSwimlaneDispatcher.Dispatch (go). resuming loop")
}
}()
}
result := future.NewResult()
result.SetLogLevel(dsp.ll)
dsp.lg.Println("EventTypeSwimlaneDispatcher.Dispatch. New: ", result)
q <- eventPack{
eventData:data,
eventMeta: evt,
fr: result}
return future.Settler(result)
}