Skip to content

Commit

Permalink
Merge pull request #1 from bio-routing/feature/nexthop
Browse files Browse the repository at this point in the history
Add nexthop+next_asn fields
  • Loading branch information
taktv6 authored Jan 15, 2021
2 parents a81acf4 + 4b5e4e7 commit 31eb0fc
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 17 deletions.
Binary file removed cmd/flowhouse/flowhouse
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/ClickHouse/clickhouse-go v1.4.1
github.com/bio-routing/bio-rd v0.0.3-pre4
github.com/bio-routing/bio-rd v0.0.3-pre5
github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e
github.com/gogo/protobuf v1.3.1
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/bio-routing/bio-rd v0.0.3-pre2 h1:Ho7ytp/sVp3Bh9SsFd1wn/IMwygwAkTRNkO
github.com/bio-routing/bio-rd v0.0.3-pre2/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs=
github.com/bio-routing/bio-rd v0.0.3-pre4 h1:Q3zvIBhT2V63p3/KTFpB1hShDD/9Ej0iWSC6xY9+6ks=
github.com/bio-routing/bio-rd v0.0.3-pre4/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs=
github.com/bio-routing/bio-rd v0.0.3-pre5 h1:DT9DmIf+tpU0++Y5/hz8Nx9Ly0X2NdaMlpjpLek/cDI=
github.com/bio-routing/bio-rd v0.0.3-pre5/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs=
github.com/bio-routing/tflow2 v0.0.0-20181230153523-2e308a4a3c3a/go.mod h1:tjzJ5IykdbWNs1FjmiJWsH6SRBl+aWgxO5I44DAegIw=
github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e h1:Zh5s5mFKBG1dwDLJU1fsPoFxTmixabOhqEuKrOkrKLM=
github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e/go.mod h1:4E2F/ExVEOHe9VF0fqQP60HTCWCMOWV4PyB8R/HndPU=
Expand Down
27 changes: 26 additions & 1 deletion pkg/clickhousegw/clickhousegw.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (c *ClickHouseGateway) createFlowsSchemaIfNotExists() error {
src_ip_pfx_len UInt8,
dst_ip_pfx_addr IPv6,
dst_ip_pfx_len UInt8,
nexthop IPv6,
next_asn UInt32,
src_asn UInt32,
dst_asn UInt32,
ip_protocol UInt8,
Expand Down Expand Up @@ -101,7 +103,28 @@ func (c *ClickHouseGateway) InsertFlows(flows []*flow.Flow) error {
return errors.Wrap(err, "Begin failed")
}

stmt, err := tx.Prepare("INSERT INTO flows (agent, int_in, int_out, src_ip_addr, dst_ip_addr, src_ip_pfx_addr, src_ip_pfx_len, dst_ip_pfx_addr, dst_ip_pfx_len, src_asn, dst_asn, ip_protocol, src_port, dst_port, timestamp, size, packets, samplerate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
stmt, err := tx.Prepare(`INSERT INTO flows (
agent,
int_in,
int_out,
src_ip_addr,
dst_ip_addr,
src_ip_pfx_addr,
src_ip_pfx_len,
dst_ip_pfx_addr,
dst_ip_pfx_len,
nexthop,
next_asn,
src_asn,
dst_asn,
ip_protocol,
src_port,
dst_port,
timestamp,
size,
packets,
samplerate
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ?, ?, ?)`)
if err != nil {
return errors.Wrap(err, "Prepare failed")
}
Expand All @@ -119,6 +142,8 @@ func (c *ClickHouseGateway) InsertFlows(flows []*flow.Flow) error {
fl.SrcPfx.Pfxlen(),
addrToNetIP(fl.DstPfx.Addr()),
fl.DstPfx.Pfxlen(),
fl.NextHop.ToNetIP(),
fl.NextAs,
fl.SrcAs,
fl.DstAs,
fl.Protocol,
Expand Down
69 changes: 59 additions & 10 deletions pkg/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ func init() {
Label: "Destination IP Prefix",
ShortLabel: "Dst.IP.Pfx",
},
{
Name: "nexthop",
Label: "Nexthop",
ShortLabel: "Nexthop",
},
{
Name: "next_asn",
Label: "Next ASN",
ShortLabel: "Next ASN",
},
{
Name: "src_asn",
Label: "Source ASN",
Expand Down Expand Up @@ -335,6 +345,7 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) {
conditions := make([]string, 0)
conditions = append(conditions, fmt.Sprintf("t BETWEEN toDateTime(%d) AND toDateTime(%d)", start, end))
for fieldName := range fields {
fmt.Printf("fieldName: %s\n", fieldName)
if fieldName == "breakdown" || fieldName == "time_start" || fieldName == "time_end" || strings.HasPrefix(fieldName, "filter_field") {
continue
}
Expand All @@ -346,16 +357,7 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) {
continue
}

if len(fields[fieldName]) == 1 {
conditions = append(conditions, fmt.Sprintf("%s = '%s'", statement, fields[fieldName][0]))
} else {
values := make([]string, 0)
for _, v := range fields[fieldName] {
values = append(values, fmt.Sprintf("'%s'", v))
}

conditions = append(conditions, fmt.Sprintf("%s IN (%s)", statement, strings.Join(values, ", ")))
}
conditions = append(conditions, formatCondition(statement, fields, fieldName))
}

groupBy := make([]string, 0)
Expand All @@ -371,6 +373,53 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) {
return fmt.Sprintf(q, strings.Join(selectFieldList, ", "), fe.chgw.GetDatabaseName(), strings.Join(conditions, " AND "), strings.Join(groupBy, ", ")), nil
}

func formatCondition(statement string, fields url.Values, fieldName string) string {
// TODO: Add support for filtering by Prefix (Dst/Src)

if len(fields[fieldName]) == 1 {
return formatConditionSingleValue(statement, fields, fieldName)
}

return formatConditionMultiValues(statement, fields, fieldName)
}

func formatConditionSingleValue(statement string, fields url.Values, fieldName string) string {
v := fields[fieldName][0]
if isIPField(fieldName) {
v = formatIPCondition(v)
} else {
v = fmt.Sprintf("'%s'", v)
}

return fmt.Sprintf("%s = %s", statement, v)
}

func formatConditionMultiValues(statement string, fields url.Values, fieldName string) string {
values := make([]string, 0)
for _, v := range fields[fieldName] {
if isIPField(fieldName) {
values = append(values, formatIPCondition(v))
continue
}

values = append(values, fmt.Sprintf("'%s'", v))
}

return fmt.Sprintf("%s IN (%s)", statement, strings.Join(values, ", "))
}

func isIPField(fieldName string) bool {
return fieldName == "nexthop" || fieldName == "src_ip_addr" || fieldName == "dst_ip_addr" || fieldName == "agent"
}

func formatIPCondition(addr string) string {
if strings.Contains(addr, ".") {
return fmt.Sprintf("IPv4ToIPv6(IPv4StringToNum('%s'))", addr)
}

return fmt.Sprintf("IPv6StringToNum('%s')", addr)
}

func resolveVirtualField(f string) string {
if f == "src_ip_pfx" {
return "concat(IPv6NumToString(src_ip_pfx_addr), '/', toString(src_ip_pfx_len))"
Expand Down
15 changes: 12 additions & 3 deletions pkg/ipannotator/ipannotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ func (ipa *IPAnnotator) Annotate(fl *flow.Flow) error {
}

fl.DstPfx = *drt.Prefix()
dstFirstASPathSeg := drt.BestPath().BGPPath.ASPath.GetLastSequenceSegment()
if dstFirstASPathSeg != nil {
dstASN := dstFirstASPathSeg.GetLastASN()
dstLastASPathSeg := drt.BestPath().BGPPath.ASPath.GetLastSequenceSegment()
if dstLastASPathSeg != nil {
dstASN := dstLastASPathSeg.GetLastASN()
if dstASN != nil {
fl.DstAs = *dstASN
}
}

dstFirstASPathSeg := drt.BestPath().BGPPath.ASPath.GetFirstSequenceSegment()
if dstFirstASPathSeg != nil {
nextASN := dstFirstASPathSeg.GetFirstASN()
if nextASN != nil {
fl.NextAs = *nextASN
}
}

return nil
}
1 change: 0 additions & 1 deletion pkg/routemirror/route_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,5 @@ func (r *RouteMirror) LPM(rtrAddr string, vrfRD uint64, addr bnet.IP) (*route.Ro
return nil, nil
}

// TODO: Check if PathSelection() in bio is wrong. Best route is apparently last element. Should be first...
return routes[len(routes)-1], nil
}
1 change: 0 additions & 1 deletion pkg/servers/sflow/sfserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (sfs *SflowServer) processPacket(agent bnet.IP, buffer []byte) {
}

if fl.IntIn == "" {

fl.IntIn += fmt.Sprintf("%d", fs.FlowSampleHeader.InputIf)
}

Expand Down

0 comments on commit 31eb0fc

Please sign in to comment.