From 7fa4fa02ea78de62b32f39dc77645690a1f45119 Mon Sep 17 00:00:00 2001 From: Mohamed Mahmoud Date: Wed, 24 Jan 2024 11:29:05 -0500 Subject: [PATCH 1/2] NETOBSERV-559: Using batchAPIs to help with CPU and memory resources Signed-off-by: Mohamed Mahmoud --- go.mod | 6 +- pkg/ebpf/tracer.go | 102 +++++++++++++++------------ pkg/ebpf/tracer_batchapis.go | 83 ++++++++++++++++++++++ pkg/ebpf/tracer_test.go | 132 +++++++++++++++++++++++++++++++++++ 4 files changed, 275 insertions(+), 48 deletions(-) create mode 100644 pkg/ebpf/tracer_batchapis.go create mode 100644 pkg/ebpf/tracer_test.go diff --git a/go.mod b/go.mod index eb6642163..8d5ab57d8 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/netobserv/netobserv-ebpf-agent -go 1.21.0 - -toolchain go1.22.1 +go 1.22.0 require ( github.com/caarlos0/env/v6 v6.10.1 @@ -15,6 +13,7 @@ require ( github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240325100124-fd783b283c7c github.com/netobserv/gopipes v0.3.0 github.com/paulbellamy/ratecounter v0.2.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/segmentio/kafka-go v0.4.47 github.com/sirupsen/logrus v1.9.3 @@ -88,7 +87,6 @@ require ( github.com/pion/logging v0.2.2 // indirect github.com/pion/transport/v2 v2.0.0 // indirect github.com/pion/udp v0.1.4 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 25853bb08..226e90c41 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -53,18 +53,19 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher") // and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated // in the map type FlowFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - ringbufReader *ringbuf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool - pktDropsTracePoint link.Link - rttFentryLink link.Link - rttKprobeLink link.Link - lookupAndDeleteSupported bool + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + ringbufReader *ringbuf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + pktDropsTracePoint link.Link + rttFentryLink link.Link + rttKprobeLink link.Link + lookupAndDeleteSupported bool + batchLookupAndDeleteSupported bool } type FlowFetcherConfig struct { @@ -169,18 +170,19 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { } return &FlowFetcher{ - objects: &objects, - ringbufReader: flows, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cfg.CacheMaxSize, - enableIngress: cfg.EnableIngress, - enableEgress: cfg.EnableEgress, - pktDropsTracePoint: pktDropsLink, - rttFentryLink: rttFentryLink, - rttKprobeLink: rttKprobeLink, - lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported + objects: &objects, + ringbufReader: flows, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cfg.CacheMaxSize, + enableIngress: cfg.EnableIngress, + enableEgress: cfg.EnableEgress, + pktDropsTracePoint: pktDropsLink, + rttFentryLink: rttFentryLink, + rttKprobeLink: rttKprobeLink, + lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported + batchLookupAndDeleteSupported: !oldKernel, }, nil } @@ -409,13 +411,15 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { } // LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. -// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively -// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { if !m.lookupAndDeleteSupported { return m.legacyLookupAndDeleteMap(met) } + if m.batchLookupAndDeleteSupported { + return m.batchLookupAndDeleteMap(met) + } + flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() @@ -545,15 +549,16 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf // It provides access to packets from the kernel space (via PerfCPU hashmap) type PacketFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - perfReader *perf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool - lookupAndDeleteSupported bool + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + perfReader *perf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + lookupAndDeleteSupported bool + batchLookupAndDeleteSupported bool } func NewPacketFetcher( @@ -603,6 +608,10 @@ func NewPacketFetcher( }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } + oldKernel := utils.IsKernelOlderThan("5.14.0") + if oldKernel { + log.Infof("kernel older than 5.14.0 detected: not all hooks are supported") + } plog.Infof("PCA Filter- Protocol: %d, Port: %d", pcaProto, pcaPort) if err := spec.LoadAndAssign(&objects, nil); err != nil { @@ -622,15 +631,16 @@ func NewPacketFetcher( } return &PacketFetcher{ - objects: &objects, - perfReader: packets, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cacheMaxSize, - enableIngress: ingress, - enableEgress: egress, - lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported + objects: &objects, + perfReader: packets, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cacheMaxSize, + enableIngress: ingress, + enableEgress: egress, + lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported + batchLookupAndDeleteSupported: !oldKernel, }, nil } @@ -819,6 +829,10 @@ func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte return p.legacyLookupAndDeleteMap(met) } + if p.batchLookupAndDeleteSupported { + return p.batchLookupAndDeleteMap(met) + } + packetMap := p.objects.PacketRecord iterator := packetMap.Iterate() packets := make(map[int][]*byte, p.cacheMaxSize) diff --git a/pkg/ebpf/tracer_batchapis.go b/pkg/ebpf/tracer_batchapis.go new file mode 100644 index 000000000..6fbeb51d5 --- /dev/null +++ b/pkg/ebpf/tracer_batchapis.go @@ -0,0 +1,83 @@ +package ebpf + +import ( + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" + + "github.com/cilium/ebpf" + "github.com/pkg/errors" +) + +// batchLookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. +func (m *FlowFetcher) batchLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { + flowMap := m.objects.AggregatedFlows + + var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var metrics = make([]BpfFlowMetrics, m.cacheMaxSize*ebpf.MustPossibleCPU()) + var id BpfFlowId + var ids = make([]BpfFlowId, m.cacheMaxSize) + var cursor = ebpf.MapBatchCursor{} + + count := 0 + for { + count, err := flowMap.BatchLookupAndDelete(&cursor, ids, metrics, nil) + if err == nil || errors.Is(err, ebpf.ErrKeyNotExist) { + for i, id := range ids[:count] { + for j := 0; j < ebpf.MustPossibleCPU(); j++ { + flows[id] = append(flows[id], metrics[i*ebpf.MustPossibleCPU()+j]) + } + } + + break + } + if err != nil { + if errors.Is(err, ebpf.ErrNotSupported) { + log.WithError(err).Warnf("switching to legacy mode") + m.batchLookupAndDeleteSupported = false + return m.LookupAndDeleteMap(met) + } + log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") + met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() + continue + } + } + + met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count)) + met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows))) + + return flows +} + +// batchLookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. +func (p *PacketFetcher) batchLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte { + packetMap := p.objects.PacketRecord + packets := make(map[int][]*byte, p.cacheMaxSize) + streams := make([]*byte, p.cacheMaxSize*ebpf.MustPossibleCPU()) + var id int + var ids = make([]int, p.cacheMaxSize) + var cursor = ebpf.MapBatchCursor{} + + for { + count, err := packetMap.BatchLookupAndDelete(&cursor, ids, streams, nil) + if err == nil || errors.Is(err, ebpf.ErrKeyNotExist) { + for i, id := range ids[:count] { + for j := 0; j < ebpf.MustPossibleCPU(); j++ { + packets[id] = append(packets[id], streams[i*ebpf.MustPossibleCPU()+j]) + } + } + + break + } + if err != nil { + if errors.Is(err, ebpf.ErrNotSupported) { + log.WithError(err).Warnf("switching to legacy mode") + p.batchLookupAndDeleteSupported = false + return p.LookupAndDeleteMap(met) + } + log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") + met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() + continue + } + } + + return packets +} diff --git a/pkg/ebpf/tracer_test.go b/pkg/ebpf/tracer_test.go new file mode 100644 index 000000000..23d6e33ba --- /dev/null +++ b/pkg/ebpf/tracer_test.go @@ -0,0 +1,132 @@ +package ebpf + +import ( + "errors" + "testing" + + "github.com/cilium/ebpf" +) + +func BenchmarkFlowFetcher_LookupAndDeleteMap(b *testing.B) { + var flowFetcherConfig = FlowFetcherConfig{ + EnableIngress: true, + EnableEgress: true, + Debug: false, + Sampling: 1, + CacheMaxSize: 100, + } + + b.Run("BatchLookupAndDelete", func(b *testing.B) { + m, err := NewFlowFetcher(&flowFetcherConfig) + if err != nil { + b.Fatal(err) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + err = m.testBatchUpdateMap() + if err != nil { + b.Fatal(err) + } + b.StartTimer() + m.testBatchLookupAndDeleteMap() + } + }) + + b.Run("IterateLookupAndDelete", func(b *testing.B) { + m, err := NewFlowFetcher(&flowFetcherConfig) + if err != nil { + b.Fatal(err) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + err = m.testBatchUpdateMap() + if err != nil { + b.Fatal(err) + } + b.StartTimer() + m.testIterateLookupAndDeleteMap() + } + }) +} + +func (m *FlowFetcher) testBatchUpdateMap() error { + flowMap := m.objects.AggregatedFlows + var ids = make([]BpfFlowId, m.cacheMaxSize) + var metrics = make([]BpfFlowMetrics, m.cacheMaxSize*ebpf.MustPossibleCPU()) + + for i := 0; i < m.cacheMaxSize; i++ { + ids[i] = BpfFlowId{ + IfIndex: uint32(i), + } + for j := 0; j < ebpf.MustPossibleCPU(); j++ { + metrics[i*ebpf.MustPossibleCPU()+j] = BpfFlowMetrics{ + Bytes: uint64(10 * (i + j)), + Packets: uint32(i + j), + } + } + } + + _, err := flowMap.BatchUpdate(ids, metrics, nil) + return err + +} + +func (m *FlowFetcher) testBatchLookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { + flowMap := m.objects.AggregatedFlows + + var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var metrics = make([]BpfFlowMetrics, m.cacheMaxSize*ebpf.MustPossibleCPU()) + var ids = make([]BpfFlowId, m.cacheMaxSize) + var cursor = ebpf.MapBatchCursor{} + + for { + count, err := flowMap.BatchLookupAndDelete(&cursor, ids, metrics, nil) + if err == nil || errors.Is(err, ebpf.ErrKeyNotExist) { + for i, id := range ids[:count] { + for j := 0; j < ebpf.MustPossibleCPU(); j++ { + flows[id] = append(flows[id], metrics[i*ebpf.MustPossibleCPU()+j]) + } + } + + break + } + if err != nil || count == 0 { + log.Debugf("failed to use BatchLookupAndDelete api: %v fall back to use iterate and delete api", err) + break + } + } + + return flows +} + +func (m *FlowFetcher) testIterateLookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { + flowMap := m.objects.AggregatedFlows + + var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var metrics = make([]BpfFlowMetrics, m.cacheMaxSize) + var id BpfFlowId + var ids []BpfFlowId + iterator := flowMap.Iterate() + // First, get all ids and don't care about metrics (we need lookup+delete to be atomic) + for iterator.Next(&id, &metrics) { + ids = append(ids, id) + } + // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions + // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively + for _, id := range ids { + if err := flowMap.Delete(id); err != nil { + log.WithError(err).WithField("flowId", id). + Warnf("couldn't delete flow entry") + } + // We observed that eBFP PerCPU map might insert multiple times the same key in the map + // (probably due to race conditions) so we need to re-join metrics again at userspace + // TODO: instrument how many times the keys are is repeated in the same eviction + flows[id] = append(flows[id], metrics...) + } + + return flows +} From 5781630b4ab3ac3ed1a2132613f6075ca80359bf Mon Sep 17 00:00:00 2001 From: Mohamed Mahmoud Date: Mon, 25 Mar 2024 10:40:41 -0400 Subject: [PATCH 2/2] update workflow to go1.22 as its required Signed-off-by: Mohamed Mahmoud --- .github/workflows/pull_request.yml | 2 +- .github/workflows/pull_request_e2e.yml | 2 +- .github/workflows/push_image.yml | 2 +- .github/workflows/push_image_pr.yml | 2 +- .github/workflows/release.yml | 2 +- Dockerfile | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 698cea5d8..29ea9d025 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.21','1.22'] + go: ['1.22'] steps: - uses: actions/checkout@v3 - name: Set up Go diff --git a/.github/workflows/pull_request_e2e.yml b/.github/workflows/pull_request_e2e.yml index 8dc5c3157..e2e3a9b60 100644 --- a/.github/workflows/pull_request_e2e.yml +++ b/.github/workflows/pull_request_e2e.yml @@ -17,7 +17,7 @@ jobs: - name: set up go 1.x uses: actions/setup-go@v3 with: - go-version: '1.21' + go-version: '1.22' - name: checkout uses: actions/checkout@v3 - name: run end-to-end tests diff --git a/.github/workflows/push_image.yml b/.github/workflows/push_image.yml index 67860d2dc..3e7c5f5b1 100644 --- a/.github/workflows/push_image.yml +++ b/.github/workflows/push_image.yml @@ -15,7 +15,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.21'] + go: ['1.22'] steps: - name: install make run: sudo apt-get install make diff --git a/.github/workflows/push_image_pr.yml b/.github/workflows/push_image_pr.yml index 7ea166883..111539d50 100644 --- a/.github/workflows/push_image_pr.yml +++ b/.github/workflows/push_image_pr.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.21'] + go: ['1.22'] steps: - name: install make run: sudo apt-get install make diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 129b55dbb..261030762 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ['1.21'] + go: ['1.22'] steps: - name: checkout uses: actions/checkout@v3 diff --git a/Dockerfile b/Dockerfile index ae8f3cbc7..eadc959ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ ARG BUILDPLATFORM=linux/amd64 ARG TARGETARCH=amd64 # Build the manager binary -FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.21 as builder +FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder ARG TARGETARCH ARG TARGETPLATFORM