-
Notifications
You must be signed in to change notification settings - Fork 0
/
loop.go
100 lines (96 loc) · 2.13 KB
/
loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package sechat
import (
"encoding/json"
"time"
"github.com/sirupsen/logrus"
)
// wsRoom represents data from a specific chat room.
type wsRoom struct {
Events []*Event `json:"e"`
}
// run is the main loop. It continually connects to the chat server, sleeping,
// and reconnecting upon error. It runs continually until stopped.
func (c *Conn) run(ch chan<- *Event) {
defer close(c.closedCh)
defer close(c.connectedCh)
defer close(ch)
defer c.log.Info("closing event channel")
for {
// Use the stored credentials to authenticate
if err := c.auth(); err != nil {
c.log.Error(err)
goto retry
}
// Connect to to the websocket server
if err := c.connectWebSocket(); err != nil {
c.log.Error(err)
goto retry
}
c.log.WithFields(logrus.Fields{
"connected": true,
}).Info("connected to WebSocket")
select {
case c.connectedCh <- true:
default:
}
// Event receiving loop
loop:
for {
_, r, err := c.conn.NextReader()
if err != nil {
// Check to see if the error was caused by a shutdown or if it
// is an actual error (in which case, leave the loop)
select {
case <-c.closeCh:
return
default:
c.log.Error(err)
break loop
}
}
// Partially decode the message
msg := map[string]json.RawMessage{}
if err := json.NewDecoder(r).Decode(&msg); err != nil {
continue
}
// Use a "set" to prevent duplicate events from being sent
msgIDs := map[int]struct{}{}
for _, v := range msg {
room := &wsRoom{}
if err := json.Unmarshal(v, &room); err != nil {
continue
}
for _, e := range room.Events {
if _, exists := msgIDs[e.ID]; !exists {
e.precompute()
// Use non-blocking send
select {
case ch <- e:
default:
}
msgIDs[e.ID] = struct{}{}
}
}
}
}
retry:
c.log.WithFields(logrus.Fields{
"connected": false,
}).Info("disconnected from WebSocket")
select {
case c.connectedCh <- false:
default:
}
select {
case <-c.closeCh:
return
default:
}
c.log.Info("reconnecting in 30 seconds")
select {
case <-time.After(30 * time.Second):
case <-c.closeCh:
return
}
}
}