diff --git a/cmd/flowhouse/flowhouse b/cmd/flowhouse/flowhouse deleted file mode 100755 index fa22bc7..0000000 Binary files a/cmd/flowhouse/flowhouse and /dev/null differ diff --git a/go.mod b/go.mod index 4afb68c..ab6d5ce 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/ClickHouse/clickhouse-go v1.4.1 - github.com/bio-routing/bio-rd v0.0.3-pre4 + github.com/bio-routing/bio-rd v0.0.3-pre5 github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e github.com/gogo/protobuf v1.3.1 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e3f0063..253fa00 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/bio-routing/bio-rd v0.0.3-pre2 h1:Ho7ytp/sVp3Bh9SsFd1wn/IMwygwAkTRNkO github.com/bio-routing/bio-rd v0.0.3-pre2/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs= github.com/bio-routing/bio-rd v0.0.3-pre4 h1:Q3zvIBhT2V63p3/KTFpB1hShDD/9Ej0iWSC6xY9+6ks= github.com/bio-routing/bio-rd v0.0.3-pre4/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs= +github.com/bio-routing/bio-rd v0.0.3-pre5 h1:DT9DmIf+tpU0++Y5/hz8Nx9Ly0X2NdaMlpjpLek/cDI= +github.com/bio-routing/bio-rd v0.0.3-pre5/go.mod h1:Dm2pV+USySIWrQ13pjU0+KxXwiKPGdiigDv2fM+RcDs= github.com/bio-routing/tflow2 v0.0.0-20181230153523-2e308a4a3c3a/go.mod h1:tjzJ5IykdbWNs1FjmiJWsH6SRBl+aWgxO5I44DAegIw= github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e h1:Zh5s5mFKBG1dwDLJU1fsPoFxTmixabOhqEuKrOkrKLM= github.com/bio-routing/tflow2 v0.0.0-20200122091514-89924193643e/go.mod h1:4E2F/ExVEOHe9VF0fqQP60HTCWCMOWV4PyB8R/HndPU= diff --git a/pkg/clickhousegw/clickhousegw.go b/pkg/clickhousegw/clickhousegw.go index ce50a54..ca63501 100644 --- a/pkg/clickhousegw/clickhousegw.go +++ b/pkg/clickhousegw/clickhousegw.go @@ -71,6 +71,8 @@ func (c *ClickHouseGateway) createFlowsSchemaIfNotExists() error { src_ip_pfx_len UInt8, dst_ip_pfx_addr IPv6, dst_ip_pfx_len UInt8, + nexthop IPv6, + next_asn UInt32, src_asn UInt32, dst_asn UInt32, ip_protocol UInt8, @@ -101,7 +103,28 @@ func (c *ClickHouseGateway) InsertFlows(flows []*flow.Flow) error { return errors.Wrap(err, "Begin failed") } - stmt, err := tx.Prepare("INSERT INTO flows (agent, int_in, int_out, src_ip_addr, dst_ip_addr, src_ip_pfx_addr, src_ip_pfx_len, dst_ip_pfx_addr, dst_ip_pfx_len, src_asn, dst_asn, ip_protocol, src_port, dst_port, timestamp, size, packets, samplerate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + stmt, err := tx.Prepare(`INSERT INTO flows ( + agent, + int_in, + int_out, + src_ip_addr, + dst_ip_addr, + src_ip_pfx_addr, + src_ip_pfx_len, + dst_ip_pfx_addr, + dst_ip_pfx_len, + nexthop, + next_asn, + src_asn, + dst_asn, + ip_protocol, + src_port, + dst_port, + timestamp, + size, + packets, + samplerate + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ?, ?, ?)`) if err != nil { return errors.Wrap(err, "Prepare failed") } @@ -119,6 +142,8 @@ func (c *ClickHouseGateway) InsertFlows(flows []*flow.Flow) error { fl.SrcPfx.Pfxlen(), addrToNetIP(fl.DstPfx.Addr()), fl.DstPfx.Pfxlen(), + fl.NextHop.ToNetIP(), + fl.NextAs, fl.SrcAs, fl.DstAs, fl.Protocol, diff --git a/pkg/frontend/frontend.go b/pkg/frontend/frontend.go index fd339cf..f8aedd8 100644 --- a/pkg/frontend/frontend.go +++ b/pkg/frontend/frontend.go @@ -67,6 +67,16 @@ func init() { Label: "Destination IP Prefix", ShortLabel: "Dst.IP.Pfx", }, + { + Name: "nexthop", + Label: "Nexthop", + ShortLabel: "Nexthop", + }, + { + Name: "next_asn", + Label: "Next ASN", + ShortLabel: "Next ASN", + }, { Name: "src_asn", Label: "Source ASN", @@ -335,6 +345,7 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) { conditions := make([]string, 0) conditions = append(conditions, fmt.Sprintf("t BETWEEN toDateTime(%d) AND toDateTime(%d)", start, end)) for fieldName := range fields { + fmt.Printf("fieldName: %s\n", fieldName) if fieldName == "breakdown" || fieldName == "time_start" || fieldName == "time_end" || strings.HasPrefix(fieldName, "filter_field") { continue } @@ -346,16 +357,7 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) { continue } - if len(fields[fieldName]) == 1 { - conditions = append(conditions, fmt.Sprintf("%s = '%s'", statement, fields[fieldName][0])) - } else { - values := make([]string, 0) - for _, v := range fields[fieldName] { - values = append(values, fmt.Sprintf("'%s'", v)) - } - - conditions = append(conditions, fmt.Sprintf("%s IN (%s)", statement, strings.Join(values, ", "))) - } + conditions = append(conditions, formatCondition(statement, fields, fieldName)) } groupBy := make([]string, 0) @@ -371,6 +373,53 @@ func (fe *Frontend) fieldsToQuery(fields url.Values) (string, error) { return fmt.Sprintf(q, strings.Join(selectFieldList, ", "), fe.chgw.GetDatabaseName(), strings.Join(conditions, " AND "), strings.Join(groupBy, ", ")), nil } +func formatCondition(statement string, fields url.Values, fieldName string) string { + // TODO: Add support for filtering by Prefix (Dst/Src) + + if len(fields[fieldName]) == 1 { + return formatConditionSingleValue(statement, fields, fieldName) + } + + return formatConditionMultiValues(statement, fields, fieldName) +} + +func formatConditionSingleValue(statement string, fields url.Values, fieldName string) string { + v := fields[fieldName][0] + if isIPField(fieldName) { + v = formatIPCondition(v) + } else { + v = fmt.Sprintf("'%s'", v) + } + + return fmt.Sprintf("%s = %s", statement, v) +} + +func formatConditionMultiValues(statement string, fields url.Values, fieldName string) string { + values := make([]string, 0) + for _, v := range fields[fieldName] { + if isIPField(fieldName) { + values = append(values, formatIPCondition(v)) + continue + } + + values = append(values, fmt.Sprintf("'%s'", v)) + } + + return fmt.Sprintf("%s IN (%s)", statement, strings.Join(values, ", ")) +} + +func isIPField(fieldName string) bool { + return fieldName == "nexthop" || fieldName == "src_ip_addr" || fieldName == "dst_ip_addr" || fieldName == "agent" +} + +func formatIPCondition(addr string) string { + if strings.Contains(addr, ".") { + return fmt.Sprintf("IPv4ToIPv6(IPv4StringToNum('%s'))", addr) + } + + return fmt.Sprintf("IPv6StringToNum('%s')", addr) +} + func resolveVirtualField(f string) string { if f == "src_ip_pfx" { return "concat(IPv6NumToString(src_ip_pfx_addr), '/', toString(src_ip_pfx_len))" diff --git a/pkg/ipannotator/ipannotator.go b/pkg/ipannotator/ipannotator.go index 64b2776..ad08edb 100644 --- a/pkg/ipannotator/ipannotator.go +++ b/pkg/ipannotator/ipannotator.go @@ -47,12 +47,21 @@ func (ipa *IPAnnotator) Annotate(fl *flow.Flow) error { } fl.DstPfx = *drt.Prefix() - dstFirstASPathSeg := drt.BestPath().BGPPath.ASPath.GetLastSequenceSegment() - if dstFirstASPathSeg != nil { - dstASN := dstFirstASPathSeg.GetLastASN() + dstLastASPathSeg := drt.BestPath().BGPPath.ASPath.GetLastSequenceSegment() + if dstLastASPathSeg != nil { + dstASN := dstLastASPathSeg.GetLastASN() if dstASN != nil { fl.DstAs = *dstASN } } + + dstFirstASPathSeg := drt.BestPath().BGPPath.ASPath.GetFirstSequenceSegment() + if dstFirstASPathSeg != nil { + nextASN := dstFirstASPathSeg.GetFirstASN() + if nextASN != nil { + fl.NextAs = *nextASN + } + } + return nil } diff --git a/pkg/routemirror/route_mirror.go b/pkg/routemirror/route_mirror.go index 206d415..953835a 100644 --- a/pkg/routemirror/route_mirror.go +++ b/pkg/routemirror/route_mirror.go @@ -92,6 +92,5 @@ func (r *RouteMirror) LPM(rtrAddr string, vrfRD uint64, addr bnet.IP) (*route.Ro return nil, nil } - // TODO: Check if PathSelection() in bio is wrong. Best route is apparently last element. Should be first... return routes[len(routes)-1], nil } diff --git a/pkg/servers/sflow/sfserver.go b/pkg/servers/sflow/sfserver.go index d8e692f..c9c3dac 100644 --- a/pkg/servers/sflow/sfserver.go +++ b/pkg/servers/sflow/sfserver.go @@ -262,7 +262,6 @@ func (sfs *SflowServer) processPacket(agent bnet.IP, buffer []byte) { } if fl.IntIn == "" { - fl.IntIn += fmt.Sprintf("%d", fs.FlowSampleHeader.InputIf) }