forked from r3labs/sse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
111 lines (97 loc) · 2.49 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
// Stream ...
type Stream struct {
// Enables replaying of eventlog to newly added subscribers
AutoReplay bool
Eventlog EventLog
stats chan chan int
subscribers []*Subscriber
register chan *Subscriber
deregister chan *Subscriber
event chan *Event
quit chan bool
}
// StreamRegistration ...
type StreamRegistration struct {
id string
stream *Stream
}
// newStream returns a new stream
func newStream(bufsize int, replay bool) *Stream {
return &Stream{
AutoReplay: replay,
subscribers: make([]*Subscriber, 0),
register: make(chan *Subscriber),
deregister: make(chan *Subscriber),
event: make(chan *Event, bufsize),
quit: make(chan bool),
Eventlog: make(EventLog, 0),
}
}
func (str *Stream) run() {
go func(str *Stream) {
for {
select {
// Add new subscriber
case subscriber := <-str.register:
str.subscribers = append(str.subscribers, subscriber)
if str.AutoReplay {
str.Eventlog.Replay(subscriber)
}
// Remove closed subscriber
case subscriber := <-str.deregister:
i := str.getSubIndex(subscriber)
if i != -1 {
str.removeSubscriber(i)
}
// Publish event to subscribers
case event := <-str.event:
if str.AutoReplay {
str.Eventlog.Add(event)
}
for i := range str.subscribers {
str.subscribers[i].connection <- event
}
// Shutdown if the server closes
case <-str.quit:
// remove connections
str.removeAllSubscribers()
return
}
}
}(str)
}
func (str *Stream) close() {
str.quit <- true
}
func (str *Stream) getSubIndex(sub *Subscriber) int {
for i := range str.subscribers {
if str.subscribers[i] == sub {
return i
}
}
return -1
}
// addSubscriber will create a new subscriber on a stream
func (str *Stream) addSubscriber(eventid string) *Subscriber {
sub := &Subscriber{
eventid: eventid,
quit: str.deregister,
connection: make(chan *Event, 64),
}
str.register <- sub
return sub
}
func (str *Stream) removeSubscriber(i int) {
close(str.subscribers[i].connection)
str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...)
}
func (str *Stream) removeAllSubscribers() {
for i := 0; i < len(str.subscribers); i++ {
close(str.subscribers[i].connection)
}
str.subscribers = str.subscribers[:0]
}