Skip to content

Commit

Permalink
Merge pull request #4 from bio-routing/feature/ipfix
Browse files Browse the repository at this point in the history
Add IPFIX support
  • Loading branch information
taktv6 authored May 21, 2021
2 parents 56ae52c + 0058b02 commit 79f5503
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/flowhouse/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
DefaultVRF string `yaml:"default_vrf"`
defaultVRF uint64
ListenSFlow string `yaml:"listen_sflow"`
ListenIPFIX string `yaml:"listen_ipfix"`
ListenHTTP string `yaml:"listen_http"`
Dicts frontend.Dicts `yaml:"dicts"`
Clickhouse *clickhousegw.ClickhouseConfig `yaml:"clickhouse"`
Expand Down
1 change: 1 addition & 0 deletions cmd/flowhouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func main() {
SNMPCommunity: cfg.SNMPCommunity,
RISTimeout: time.Duration(cfg.RISTimeout) * time.Second,
ListenSflow: cfg.ListenSFlow,
ListenIPFIX: cfg.ListenIPFIX,
ListenHTTP: cfg.ListenHTTP,
DefaultVRF: cfg.GetDefaultVRF(),
Dicts: cfg.Dicts,
Expand Down
11 changes: 9 additions & 2 deletions pkg/flowhouse/flowhouse.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package flowhouse

import (
"fmt"
"net/http"
"runtime"
"time"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/bio-routing/flowhouse/pkg/ipannotator"
"github.com/bio-routing/flowhouse/pkg/models/flow"
"github.com/bio-routing/flowhouse/pkg/routemirror"
"github.com/bio-routing/flowhouse/pkg/servers/ipfix"
"github.com/bio-routing/flowhouse/pkg/servers/sflow"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -31,6 +31,7 @@ type Flowhouse struct {
grpcClientManager *clientmanager.ClientManager
ipa *ipannotator.IPAnnotator
sfs *sflow.SflowServer
ifxs *ipfix.IPFIXServer
chgw *clickhousegw.ClickHouseGateway
fe *frontend.Frontend
flowsRX chan []*flow.Flow
Expand All @@ -42,6 +43,7 @@ type Config struct {
SNMPCommunity string
RISTimeout time.Duration
ListenSflow string
ListenIPFIX string
ListenHTTP string
DefaultVRF uint64
Dicts frontend.Dicts
Expand Down Expand Up @@ -74,6 +76,12 @@ func New(cfg *Config) (*Flowhouse, error) {
}
fh.sfs = sfs

ifxs, err := ipfix.New(fh.cfg.ListenIPFIX, runtime.NumCPU(), fh.flowsRX, fh.ifMapper)
if err != nil {
return nil, errors.Wrap(err, "Unable to start IPFIX server")
}
fh.ifxs = ifxs

chgw, err := clickhousegw.New(fh.cfg.ChCfg)
if err != nil {
return nil, errors.Wrap(err, "Unable to create clickhouse wrapper")
Expand All @@ -100,7 +108,6 @@ func (f *Flowhouse) AddAgent(name string, addr bnet.IP, risAddrs []string, vrfs
}

for _, v := range vrfs {
fmt.Printf("Adding Target %s %s %v %d\n", name, addr.String(), rtSource, v)
f.routeMirror.AddTarget(name, addr, rtSource, v)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ipannotator/ipannotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func New(rm *routemirror.RouteMirror) *IPAnnotator {
func (ipa *IPAnnotator) Annotate(fl *flow.Flow) error {
srt, err := ipa.rm.LPM(fl.Agent.String(), fl.VRFIn, fl.SrcAddr)
if err != nil {
return errors.Wrap(err, "Unable to get route for source address")
return errors.Wrapf(err, "Unable to get route for source address %s", fl.SrcAddr.String())
}

if srt == nil {
Expand All @@ -39,7 +39,7 @@ func (ipa *IPAnnotator) Annotate(fl *flow.Flow) error {

drt, err := ipa.rm.LPM(fl.Agent.String(), fl.VRFOut, fl.DstAddr)
if err != nil {
return errors.Wrap(err, "Unable to get route for source address")
return errors.Wrapf(err, "Unable to get route for destination address %s", fl.DstAddr.String())
}

if drt == nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/models/flow/flow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flow

import (
"fmt"

bnet "github.com/bio-routing/bio-rd/net"
)

Expand Down Expand Up @@ -34,3 +36,20 @@ func (fl *Flow) Add(a *Flow) {
fl.Size += a.Size
fl.Packets += a.Packets
}

// Dump dumps the flow
func (fl *Flow) Dump() {
fmt.Printf("--------------------------------\n")
fmt.Printf("Flow dump:\n")
fmt.Printf("Router: %s\n", fl.Agent.String())
fmt.Printf("Family: %d\n", fl.Family)
fmt.Printf("SrcAddr: %s\n", fl.SrcAddr.String())
fmt.Printf("DstAddr: %s\n", fl.DstAddr.String())
fmt.Printf("Protocol: %d\n", fl.Protocol)
fmt.Printf("NextHop: %s\n", fl.NextHop.String())
fmt.Printf("IntIn: %s\n", fl.IntIn)
fmt.Printf("IntOut: %s\n", fl.IntOut)
fmt.Printf("Packets: %d\n", fl.Packets)
fmt.Printf("Bytes: %d\n", fl.Size)
fmt.Printf("--------------------------------\n")
}
20 changes: 14 additions & 6 deletions pkg/packet/ipfix/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package ipfix

import (
"fmt"
"net"
"unsafe"

"github.com/bio-routing/tflow2/convert"
Expand All @@ -37,7 +36,7 @@ func errorIncompatibleVersion(version uint16) error {
}

// Decode is the main function of this package. It converts raw packet bytes to Packet struct.
func Decode(raw []byte, remote net.IP) (*Packet, error) {
func Decode(raw []byte) (*Packet, error) {
data := convert.Reverse(raw) //TODO: Make it endian aware. This assumes a little endian machine

pSize := len(data)
Expand Down Expand Up @@ -77,7 +76,10 @@ func Decode(raw []byte, remote net.IP) (*Packet, error) {

if fls.Header.SetID == TemplateSetID {
// Template
decodeTemplate(&packet, ptr, uintptr(fls.Header.Length)-sizeOfSetHeader, remote)
err := decodeTemplate(&packet, ptr, uintptr(fls.Header.Length)-sizeOfSetHeader)
if err != nil {
return nil, errors.Wrap(err, "Unable to decode template")
}
} else if fls.Header.SetID > SetIDTemplateMax {
// Actual data packet
decodeData(&packet, ptr, uintptr(fls.Header.Length)-sizeOfSetHeader)
Expand All @@ -103,7 +105,7 @@ func decodeData(packet *Packet, headerPtr unsafe.Pointer, size uintptr) {
}

// decodeTemplate decodes a template from `packet`
func decodeTemplate(packet *Packet, end unsafe.Pointer, size uintptr, remote net.IP) {
func decodeTemplate(packet *Packet, end unsafe.Pointer, size uintptr) error {
min := uintptr(end) - size
for uintptr(end) > min {
headerPtr := unsafe.Pointer(uintptr(end) - sizeOfTemplateRecordHeader)
Expand All @@ -114,16 +116,22 @@ func decodeTemplate(packet *Packet, end unsafe.Pointer, size uintptr, remote net
tmplRecs.Records = make([]*TemplateRecord, 0, numPreAllocRecs)

ptr := unsafe.Pointer(uintptr(headerPtr) - sizeOfTemplateRecordHeader)
var i uint16
for i = 0; i < tmplRecs.Header.FieldCount; i++ {
for i := uint16(0); i < tmplRecs.Header.FieldCount; i++ {
rec := (*TemplateRecord)(unsafe.Pointer(ptr))

if rec.isEnterprise() {
return fmt.Errorf("Enterprise TLV currently not supported")
}

tmplRecs.Records = append(tmplRecs.Records, rec)
ptr = unsafe.Pointer(uintptr(ptr) - sizeOfTemplateRecord)
}

packet.Templates = append(packet.Templates, tmplRecs)
end = unsafe.Pointer(uintptr(end) - uintptr(tmplRecs.Header.FieldCount)*sizeOfTemplateRecord - sizeOfTemplateRecordHeader)
}

return nil
}

// PrintHeader prints the header of `packet`
Expand Down
30 changes: 30 additions & 0 deletions pkg/packet/ipfix/decode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ipfix

import (
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
)

func TestDecodeTemplate(t *testing.T) {
tests := []struct {
name string
pkt *Packet
end unsafe.Pointer
size uintptr
expected *Packet
}{
{
name: "Test #1",
pkt: &Packet{
Templates: make([]*TemplateRecords, 0),
},
},
}

for _, test := range tests {
decodeTemplate(test.pkt, test.end, test.size)
assert.Equal(t, test.expected, test.pkt, test.name)
}
}
48 changes: 48 additions & 0 deletions pkg/packet/ipfix/template_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ipfix

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsEnterprise(t *testing.T) {
tests := []struct {
name string
tmpl *TemplateRecord
expected bool
}{
{
name: "test #1",
tmpl: &TemplateRecord{
Type: 0,
},
expected: false,
},
{
name: "test #2",
tmpl: &TemplateRecord{
Type: 65535,
},
expected: true,
},
{
name: "test #3",
tmpl: &TemplateRecord{
Type: 32768,
},
expected: true,
},
{
name: "test #4",
tmpl: &TemplateRecord{
Type: 32767,
},
expected: false,
},
}

for _, test := range tests {
assert.Equal(t, test.expected, test.tmpl.isEnterprise(), test.name)
}
}
20 changes: 15 additions & 5 deletions pkg/packet/ipfix/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package ipfix

import "unsafe"
import (
"unsafe"
)

const (
// numPreAllocFlowDataRecs is number of elements to pre allocate in DataRecs slice
Expand Down Expand Up @@ -57,6 +59,10 @@ type TemplateRecord struct {
Type uint16
}

func (tmpl *TemplateRecord) isEnterprise() bool {
return tmpl.Type&0x8000 == 0x8000
}

// FlowDataRecord is actual NetFlow data. This structure does not contain any
// information about the actual data meaning. It must be combined with
// corresponding TemplateRecord to be decoded to a single NetFlow data row.
Expand Down Expand Up @@ -90,25 +96,29 @@ func (dtpl *TemplateRecords) DecodeFlowSet(set Set) (list []FlowDataRecord) {
if record.Values == nil {
return
}

list = append(list, record)
n = n - count
}

return
return list
}

// parseFieldValues reads actual fields values from a Data Record utilizing a template
func parseFieldValues(flows []byte, fields []*TemplateRecord) ([][]byte, int) {
func parseFieldValues(data []byte, fields []*TemplateRecord) ([][]byte, int) {
count := 0
n := len(flows)
n := len(data)
values := make([][]byte, len(fields))

for i, f := range fields {
if n < int(f.Length) {
return nil, 0
}
values[i] = flows[n-int(f.Length) : n]

values[i] = data[n-int(f.Length) : n]
count += int(f.Length)
n -= int(f.Length)
}

return values, count
}
4 changes: 2 additions & 2 deletions pkg/routemirror/route_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *RouteMirror) getRouter(needle string) *router {
func (r *RouteMirror) LPM(rtrAddr string, vrfRD uint64, addr bnet.IP) (*route.Route, error) {
rtr := r.getRouter(rtrAddr)
if rtr == nil {
return nil, fmt.Errorf("Router not found")
return nil, fmt.Errorf("Router %s not found", rtrAddr)
}

afi := uint8(6)
Expand All @@ -82,7 +82,7 @@ func (r *RouteMirror) LPM(rtrAddr string, vrfRD uint64, addr bnet.IP) (*route.Ro

v := rtr.getVRF(vrfRD)
if v == nil {
return nil, fmt.Errorf("Invalid VRF %d pn %q", vrfRD, rtrAddr)
return nil, fmt.Errorf("Invalid VRF %d pn %s", vrfRD, rtrAddr)
}

rib := v.getLocRIB(afi)
Expand Down
Loading

0 comments on commit 79f5503

Please sign in to comment.