Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP NETOBSERV-1550: Using batchAPIs to help with CPU and memory resources #256

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: ['1.21','1.22']
go: ['1.22']
steps:
- uses: actions/checkout@v3
- name: Set up Go
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: set up go 1.x
uses: actions/setup-go@v3
with:
go-version: '1.21'
go-version: '1.22'
- name: checkout
uses: actions/checkout@v3
- name: run end-to-end tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: ['1.21']
go: ['1.22']
steps:
- name: install make
run: sudo apt-get install make
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push_image_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: ['1.21']
go: ['1.22']
steps:
- name: install make
run: sudo apt-get install make
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: ['1.21']
go: ['1.22']
steps:
- name: checkout
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG BUILDPLATFORM=linux/amd64
ARG TARGETARCH=amd64

# Build the manager binary
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.21 as builder
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder

ARG TARGETARCH
ARG TARGETPLATFORM
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/netobserv/netobserv-ebpf-agent

go 1.21.0

toolchain go1.22.1
go 1.22.0

require (
github.com/caarlos0/env/v6 v6.10.1
Expand All @@ -15,6 +13,7 @@ require (
github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240325100124-fd783b283c7c
github.com/netobserv/gopipes v0.3.0
github.com/paulbellamy/ratecounter v0.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/segmentio/kafka-go v0.4.47
github.com/sirupsen/logrus v1.9.3
Expand Down Expand Up @@ -88,7 +87,6 @@ require (
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v2 v2.0.0 // indirect
github.com/pion/udp v0.1.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
Expand Down
102 changes: 58 additions & 44 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map
type FlowFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
lookupAndDeleteSupported bool
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
lookupAndDeleteSupported bool
batchLookupAndDeleteSupported bool
}

type FlowFetcherConfig struct {
Expand Down Expand Up @@ -169,18 +170,19 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}

return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
batchLookupAndDeleteSupported: !oldKernel,
}, nil
}

Expand Down Expand Up @@ -409,13 +411,15 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
}

// LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
if !m.lookupAndDeleteSupported {
return m.legacyLookupAndDeleteMap(met)
}

if m.batchLookupAndDeleteSupported {
return m.batchLookupAndDeleteMap(met)
}

flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
Expand Down Expand Up @@ -545,15 +549,16 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf

// It provides access to packets from the kernel space (via PerfCPU hashmap)
type PacketFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
lookupAndDeleteSupported bool
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
lookupAndDeleteSupported bool
batchLookupAndDeleteSupported bool
}

func NewPacketFetcher(
Expand Down Expand Up @@ -603,6 +608,10 @@ func NewPacketFetcher(
}); err != nil {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
oldKernel := utils.IsKernelOlderThan("5.14.0")
if oldKernel {
log.Infof("kernel older than 5.14.0 detected: not all hooks are supported")
}
plog.Infof("PCA Filter- Protocol: %d, Port: %d", pcaProto, pcaPort)

if err := spec.LoadAndAssign(&objects, nil); err != nil {
Expand All @@ -622,15 +631,16 @@ func NewPacketFetcher(
}

return &PacketFetcher{
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
batchLookupAndDeleteSupported: !oldKernel,
}, nil
}

Expand Down Expand Up @@ -819,6 +829,10 @@ func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte
return p.legacyLookupAndDeleteMap(met)
}

if p.batchLookupAndDeleteSupported {
return p.batchLookupAndDeleteMap(met)
}

packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)
Expand Down
83 changes: 83 additions & 0 deletions pkg/ebpf/tracer_batchapis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package ebpf

import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"

"github.com/cilium/ebpf"
"github.com/pkg/errors"
)

// batchLookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
func (m *FlowFetcher) batchLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var metrics = make([]BpfFlowMetrics, m.cacheMaxSize*ebpf.MustPossibleCPU())
var id BpfFlowId
var ids = make([]BpfFlowId, m.cacheMaxSize)
var cursor = ebpf.MapBatchCursor{}

count := 0
for {
count, err := flowMap.BatchLookupAndDelete(&cursor, ids, metrics, nil)
if err == nil || errors.Is(err, ebpf.ErrKeyNotExist) {
for i, id := range ids[:count] {
for j := 0; j < ebpf.MustPossibleCPU(); j++ {
flows[id] = append(flows[id], metrics[i*ebpf.MustPossibleCPU()+j])
}
}

break
}
if err != nil {
if errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
m.batchLookupAndDeleteSupported = false
return m.LookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
continue
}
}

met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))

return flows
}

// batchLookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
func (p *PacketFetcher) batchLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
packets := make(map[int][]*byte, p.cacheMaxSize)
streams := make([]*byte, p.cacheMaxSize*ebpf.MustPossibleCPU())
var id int
var ids = make([]int, p.cacheMaxSize)
var cursor = ebpf.MapBatchCursor{}

for {
count, err := packetMap.BatchLookupAndDelete(&cursor, ids, streams, nil)
if err == nil || errors.Is(err, ebpf.ErrKeyNotExist) {
for i, id := range ids[:count] {
for j := 0; j < ebpf.MustPossibleCPU(); j++ {
packets[id] = append(packets[id], streams[i*ebpf.MustPossibleCPU()+j])
}
}

break
}
if err != nil {
if errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
p.batchLookupAndDeleteSupported = false
return p.LookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
continue
}
}

return packets
}
Loading
Loading