-
Notifications
You must be signed in to change notification settings - Fork 1
/
peer.go
291 lines (252 loc) · 6.71 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package p2p
import (
"crypto/sha1"
"fmt"
"net"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
var peerLogger = packageLogger.WithField("subpack", "peer")
// Peer is an active connection to an endpoint in the network.
// Represents one lifetime of a connection and should not be restarted
type Peer struct {
net *Network
conn net.Conn
metrics ReadWriteCollector
prot Protocol
resend *PeerHashCache
// current state, read only "constants" after the handshake
IsIncoming bool
Endpoint Endpoint
Hash string // This is more of a connection ID than hash right now.
stopper sync.Once
stop chan bool
lastPeerRequest time.Time
lastPeerSend time.Time
// communication channels
send ParcelChannel // parcels from Send() are added here
// Metrics
metricsMtx sync.RWMutex
connected time.Time
lastReceive time.Time // Keep track of how long ago we talked to the peer.
lastSend time.Time // Keep track of how long ago we talked to the peer.
totalParcelsSent uint64
totalParcelsReceived uint64
totalBytesSent uint64
totalBytesReceived uint64
bpsDown, bpsUp float64
mpsDown, mpsUp float64
dropped uint64
// logging
logger *log.Entry
}
func newPeer(net *Network, id uint32, ep Endpoint, conn net.Conn, protocol Protocol, metrics ReadWriteCollector, incoming bool) *Peer {
p := new(Peer)
p.net = net
p.prot = protocol
p.Endpoint = ep
p.metrics = metrics
p.conn = conn
p.stop = make(chan bool, 1)
p.Hash = fmt.Sprintf("%s:%s %08x", ep.IP, ep.Port, id)
p.logger = peerLogger.WithFields(log.Fields{
"hash": p.Hash,
"address": p.Endpoint.IP,
"Port": p.Endpoint.Port,
"Version": p.prot.Version(),
"node": p.net.conf.NodeName,
})
// initialize channels
p.send = newParcelChannel(p.net.conf.ChannelCapacity)
p.IsIncoming = incoming
p.connected = time.Now()
if net.conf.PeerResendFilter {
p.resend = NewPeerHashCache(net.conf.PeerResendBuckets, net.conf.PeerResendInterval)
}
go p.sendLoop()
go p.readLoop()
go p.statLoop()
return p
}
// Stop disconnects the peer from its active connection
func (p *Peer) Stop() {
p.stopper.Do(func() {
p.logger.Debug("Stopping peer")
if p.resend != nil {
p.resend.Stop()
}
close(p.stop) // stops sendLoop and readLoop and statLoop
p.conn.Close()
// sendLoop closes p.send in defer
select {
case p.net.controller.peerStatus <- peerStatus{peer: p, online: false}:
case <-p.net.stopper:
}
})
}
func (p *Peer) String() string {
return p.Hash
}
func (p *Peer) Send(parcel *Parcel) {
select {
case <-p.stop:
// don't send when stopped
default:
_, dropped := p.send.Send(parcel)
p.metricsMtx.Lock()
p.dropped += uint64(dropped)
p.metricsMtx.Unlock()
}
}
func (p *Peer) statLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.metricsMtx.Lock()
mw, mr, bw, br := p.metrics.Collect()
p.bpsDown = float64(br)
p.bpsUp = float64(bw)
p.totalBytesReceived += br
p.totalBytesSent += bw
p.mpsDown = float64(mr)
p.mpsUp = float64(mw)
p.totalParcelsReceived += mr
p.totalParcelsSent += mw
p.metricsMtx.Unlock()
case <-p.stop:
return
}
}
}
func (p *Peer) readLoop() {
if p.net.prom != nil {
p.net.prom.ReceiveRoutines.Inc()
defer p.net.prom.ReceiveRoutines.Dec()
}
defer p.Stop() // close connection on fatal error
for {
p.conn.SetReadDeadline(time.Now().Add(p.net.conf.ReadDeadline))
msg, err := p.prot.Receive()
if err != nil {
p.logger.WithError(err).Debug("connection error (readLoop)")
return
}
if err := msg.Valid(); err != nil {
p.logger.WithError(err).Warnf("received invalid msg, disconnecting peer")
if p.net.prom != nil {
p.net.prom.Invalid.Inc()
}
return
}
// metrics
p.metricsMtx.Lock()
p.lastReceive = time.Now()
p.metricsMtx.Unlock()
// stats
if p.net.prom != nil {
p.net.prom.ParcelsReceived.Inc()
p.net.prom.ParcelSize.Observe(float64(len(msg.Payload)) / 1024)
if msg.IsApplicationMessage() {
p.net.prom.AppReceived.Inc()
}
}
if p.resend != nil && msg.IsApplicationMessage() {
p.resend.Add(sha1.Sum(msg.Payload))
}
msg.Address = p.Hash // always set sender = peer
if !p.deliver(msg) { // blocking unless peer is already stopped
return
}
}
}
// deliver is a blocking delivery of this peer's messages to the peer manager.
func (p *Peer) deliver(parcel *Parcel) bool {
select {
case <-p.stop:
return false
case p.net.controller.peerData <- peerParcel{peer: p, parcel: parcel}:
}
return true
}
// sendLoop listens to the Outgoing channel, pushing all data from there
// to the tcp connection
func (p *Peer) sendLoop() {
if p.net.prom != nil {
p.net.prom.SendRoutines.Inc()
defer p.net.prom.SendRoutines.Dec()
}
defer close(p.send)
defer p.Stop() // close connection on fatal error
for {
select {
case <-p.net.stopper:
return
case <-p.stop:
return
case parcel := <-p.send:
if parcel == nil {
p.logger.Error("Received <nil> pointer from application")
continue
}
p.conn.SetWriteDeadline(time.Now().Add(p.net.conf.WriteDeadline))
err := p.prot.Send(parcel)
if err != nil { // no error is recoverable
p.logger.WithError(err).Debug("connection error (sendLoop)")
return // stops in defer
}
// metrics
p.metricsMtx.Lock()
p.lastSend = time.Now()
p.metricsMtx.Unlock()
// stats
if p.net.prom != nil {
p.net.prom.ParcelsSent.Inc()
p.net.prom.ParcelSize.Observe(float64(len(parcel.Payload)+32) / 1024)
if parcel.IsApplicationMessage() {
p.net.prom.AppSent.Inc()
}
}
}
}
}
func (p *Peer) LastSendAge() time.Duration {
p.metricsMtx.RLock()
defer p.metricsMtx.RUnlock()
return time.Since(p.lastSend)
}
// GetMetrics returns live metrics for this connection
func (p *Peer) GetMetrics() PeerMetrics {
p.metricsMtx.RLock()
defer p.metricsMtx.RUnlock()
pt := "regular"
if p.net.controller.isSpecial(p.Endpoint) {
pt = "special_config"
}
return PeerMetrics{
Hash: p.Hash,
PeerAddress: p.Endpoint.IP,
MomentConnected: p.connected,
LastReceive: p.lastReceive,
LastSend: p.lastSend,
BytesReceived: p.totalBytesReceived,
BytesSent: p.totalBytesSent,
MessagesReceived: p.totalParcelsReceived,
MessagesSent: p.totalParcelsSent,
Incoming: p.IsIncoming,
PeerType: pt,
MPSDown: p.mpsDown,
MPSUp: p.mpsUp,
BPSDown: p.bpsDown,
BPSUp: p.bpsUp,
ConnectionState: fmt.Sprintf("v%s", p.prot),
SendFillRatio: p.SendFillRatio(),
Dropped: p.dropped,
}
}
// SendFillRatio is a wrapper for the send channel's FillRatio
func (p *Peer) SendFillRatio() float64 {
return p.send.FillRatio()
}