forked from boogie-byte/sflow-patcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sflow.go
234 lines (197 loc) · 6.54 KB
/
sflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package main
import (
"encoding/binary"
"fmt"
log "github.com/sirupsen/logrus"
)
const (
vxlanPort = 4789
ipProtoUDP = 0x11
etherType8021Q = 0x8100
etherTypeIPv4 = 0x0800
etherTypeIPv6 = 0x86DD
)
// sFlow struct diagrams could be found here:
// https://sflow.org/developers/diagrams/sFlowV5Datagram.pdf
// https://sflow.org/developers/diagrams/sFlowV5Sample.pdf
// https://sflow.org/developers/diagrams/sFlowV5FlowData.pdf
func processDatagram(c *copier) uint32 {
// Do not process unsupported datagram versions
if dv := c.copyUint32(); dv != 5 {
panic(fmt.Sprintf("Unsupported datagram version %d", dv))
}
sequenceNumber := 0
switch at := c.copyUint32(); at {
case 1:
// Copy IPv4 address (4 bytes), agentID (4 bytes),
// sequenceNumber (4 bytes), agentUptime (4 bytes)
if flagDebugEnabled {
sequenceNumber = getSequenceNumber(c.src[c.srcOff:], 8, 8+4)
}
c.copyBytes(16)
case 2:
// Copy IPv6 address (16 bytes), agentID (4 bytes),
// sequenceNumber (4 bytes), agentUptime (4 bytes)
if flagDebugEnabled {
sequenceNumber = getSequenceNumber(c.src[c.srcOff:], 20, 20+4)
}
c.copyBytes(28)
default:
panic(fmt.Sprintf("Unsupported agent address type %d", at))
}
log.Debugf("sequenceNumber: %d", sequenceNumber)
sampleCount := c.copyUint32()
sampleCountPosition := c.dstOff
wrongSampleCount := 0
for i := uint32(0); i < sampleCount; i++ {
toProcess := false
if getSampleFormat(c.src[c.srcOff:]) == 1 {
if vlan := getVlan(c.src[c.srcOff:]); vlanMapLookup(vlan) {
log.Debugf("pass sflow sample with VID %d", vlan)
toProcess = true
} else {
log.Debugf("drop sflow sample with VID %d", vlan)
}
} else {
toProcess = true
}
if toProcess {
processSample(c)
} else {
wrongSampleCount += 1
c.srcOff += getSampleLength(c.src[c.srcOff:]) + 8
}
//if vlan := getVlan(c.src[c.srcOff:]); vlanMapLookup(vlan) || getSampleFormat(c.src[c.srcOff:]) != 1 {
// log.Debugf("pass sflow sample with VID %d", vlan)
// processSample(c)
//} else {
// log.Debugf("drop sflow sample with VID %d", vlan)
// wrongSampleCount += 1
// c.srcOff += getSampleLength(c.src[c.srcOff:]) + 8
//}
}
newSampleCount := uint32(int(sampleCount) - wrongSampleCount)
c.writeUint32At(newSampleCount, sampleCountPosition-4)
return newSampleCount
}
func getSampleFormat(src []byte) uint32 {
df := binary.BigEndian.Uint32(src[:8])
dataType := uint32(0xFFF) & uint32(df)
log.Debugf("format: %d", dataType)
return dataType
}
func getVlan(src []byte) uint32 {
return binary.BigEndian.Uint32(src[48:52])
}
func getSequenceNumber(src []byte, x int, y int) int {
return int(binary.BigEndian.Uint32(src[x:y]))
}
func getSampleLength(src []byte) int {
length := int(binary.BigEndian.Uint32(src[4:8]))
//log.Debugf("lenght %d", length)
return length
}
func processSample(c *copier) {
enterpriseID, format := c.copyDataFormat()
oldSampleLength := int(c.copyUint32())
srcSampleStart := c.srcOffset()
dstSampleStart := c.dstOffset()
if enterpriseID != 0 {
log.Debugf("Skipping unsupported sample enterpriseID %d", enterpriseID)
c.copyBytesAt(oldSampleLength, srcSampleStart, dstSampleStart)
return
} else if format != 1 {
log.Debugf("Skipping unsupported sample type %d", format)
c.copyBytesAt(oldSampleLength, srcSampleStart, dstSampleStart)
return
}
// Copy sampleSequenceNumber (4 bytes),
// sampleDataSource (4 bytes), samplingRate (4 bytes),
// samplePool (4 bytes), dropped (4 bytes), inputInterface (4 bytes),
// outputInterface (4 bytes)
c.copyBytes(28)
recordCount := c.copyUint32()
for i := uint32(0); i < recordCount; i++ {
processRecord(c)
}
// Update the sample lenght field
newSampleLength := uint32(c.dstOffset() - dstSampleStart)
c.writeUint32At(newSampleLength, dstSampleStart-4)
}
func processRecord(c *copier) {
enterpriseID, format := c.copyDataFormat()
oldRecordLength := int(c.copyUint32())
srcRecordStart := c.srcOffset()
dstRecordStart := c.dstOffset()
if enterpriseID != 0 {
log.Debugf("Skipping unsupported record enterpriseID %d", enterpriseID)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
} else if format != 1 {
log.Debugf("Skipping unsupported record type %d", format)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
// Parse headerProtocol
if hp := c.copyUint32(); hp != 1 {
log.Debugf("Skipping unsupported frame type %d", hp)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
frameLength := c.copyUint32()
// Copy payloadRemoved (4 bytes)
c.copyBytes(4)
oldHeaderLength := c.copyUint32()
// Skip dstMAC (6 bytes), srcMAC (6 bytes)
c.skip(12)
var ipProto uint8
PARSE_FRAME:
switch etherType := c.readUint16(); etherType {
case etherType8021Q:
c.skip(2) // Skip VLANID
goto PARSE_FRAME // Re-parse the frame
case etherTypeIPv4:
ihl := int(c.readUint8() & 0x0F) // IP header length in 32-bit words
c.skip(8) // Skip several IPv4 header fields
ipProto = c.readUint8()
c.skip(ihl*4 - 10) // Skip all IPv4 fields left
case etherTypeIPv6:
c.skip(6) // Skip several IPv6 header fields
ipProto = c.readUint8()
c.skip(33) // Skip all IPv6 fields left
default:
log.Debugf("Skipping unsupported ethertype %d", etherType)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
if ipProto != ipProtoUDP {
log.Debug("Skipping non-UDP packet")
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
c.skip(2) // skip UDP src port (2 bytes)
if dstUDPPort := c.readUint16(); dstUDPPort != vxlanPort {
log.Debug("Skipping non-VXLAN packet")
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
c.skip(12) // Skip UDP packet length (2 bytes), UDP checksum (2 bytes), VXLAN header (8 bytes)
// Copy the rest of the frame
dstHeaderStart := c.dstOffset()
c.copyBytes(oldRecordLength - (c.srcOffset() - srcRecordStart))
// XDR format requies 4-byte alignment, and the record headers
// is the only variable-length record field
headerLength := c.dstOffset() - dstHeaderStart
if mod := headerLength % 4; mod != 0 {
c.pad(4 - mod)
headerLength += 4 - mod
}
// Update the record lenght field
newRecordLength := uint32(c.dstOffset() - dstRecordStart)
c.writeUint32At(newRecordLength, dstRecordStart-4)
// Update the frameLength to reflect the absence of the stripped headers
frameLength -= oldHeaderLength - uint32(headerLength)
c.writeUint32At(frameLength, dstRecordStart+4)
// Update the headerLength to reflect the absence of the stripped headers
c.writeUint32At(uint32(headerLength), dstRecordStart+12)
}