-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriptionManager.go
71 lines (57 loc) · 1.84 KB
/
subscriptionManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package eventuate
import (
"github.com/eventuate-clients/eventuate-client-golang/future"
)
type subscriptionManager struct {
*StompClient
}
func newSubscriptionManager(client *StompClient) *subscriptionManager {
return &subscriptionManager{
StompClient: client}
}
func (mgr *subscriptionManager) Subscribe(
subscriberId string,
eventHandlers *EventResultHandlerMap,
subscriberOptions *SubscriberOptions,
useSwimlane bool) (*DispatchingSubscription, error) {
dspMaker := DispatcherMaker(NewEventDispatcher)
if useSwimlane {
dspMaker = DispatcherMaker(NewEventTypeSwimlaneDispatcher)
}
return mgr.subscribeForStrategy(
subscriberId,
eventHandlers,
subscriberOptions,
dspMaker)
}
func (mgr *subscriptionManager) subscribeForStrategy(
subscriberId string,
eventHandlers *EventResultHandlerMap,
subscriberOptions *SubscriberOptions,
dispatcherMaker DispatcherMaker) (*DispatchingSubscription, error) {
dispatcher, err := dispatcherMaker(eventHandlers)
if err != nil {
return nil, err
}
evtHandler := EventResultHandler(func(data interface{}, meta *EventMetadata) future.Settler {
mgr.lg.Printf("SUBS_MANAGER: handling event: %v, (%#v)\n", meta.Id, meta)
defer mgr.lg.Printf("SUBS_MANAGER: handling event (finished): %v\n", meta.Id)
return dispatcher.Dispatch(data, meta)
})
entityEventsGroup := eventHandlers.transformToEntityEventGroups()
sub, subErr := mgr.StompClient.Subscribe(subscriberId, entityEventsGroup, subscriberOptions, &evtHandler)
if subErr != nil {
return nil, subErr
}
msub := &DispatchingSubscription{
Subscription: sub,
eventHandlers: eventHandlers,
typeHints: mgr.typeHints}
go func(sub *Subscription) {
for evt := range sub.incomingEvent {
msub.lg.Printf("Received event via STOMP chan: %v (before dispatching)\n", evt.String())
msub.dispatchEvent(evt)
}
}(sub)
return msub, nil
}