Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1533: refine metrics for dashboard creation #281

Merged
merged 5 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 35 additions & 43 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cilium/ebpf/ringbuf"
"github.com/gavv/monotime"
"github.com/prometheus/client_golang/prometheus"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -110,6 +109,8 @@ type Flows struct {
mapTracer *flow.MapTracer
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
exporter node.TerminalFunc[[]*flow.Record]

// elements used to decorate flows with extra information
Expand All @@ -125,7 +126,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap(c prometheus.Counter) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error)
}
Expand All @@ -144,8 +145,18 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}
alog.Debug("agent IP: " + agentIP.String())

// initialize metrics
metricsSettings := &metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
}
m := metrics.NewMetrics(metricsSettings)

// configure selected exporter
exportFunc, err := buildFlowExporter(cfg)
exportFunc, err := buildFlowExporter(cfg, m)
if err != nil {
return nil, err
}
Expand All @@ -172,11 +183,11 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return nil, err
}

return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP)
return flowsAgent(cfg, m, informer, fetcher, exportFunc, agentIP)
}

// flowsAgent is a private constructor with injectable dependencies, usable for tests
func flowsAgent(cfg *Config,
func flowsAgent(cfg *Config, m *metrics.Metrics,
informer ifaces.Informer,
fetcher ebpfFlowFetcher,
exporter node.TerminalFunc[[]*flow.Record],
Expand Down Expand Up @@ -215,25 +226,21 @@ func flowsAgent(cfg *Config,
return iface
}
var promoServer *http.Server
metricsSettings := &metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
}
if cfg.MetricsEnable {
promoServer = promo.InitializePrometheus(metricsSettings)
promoServer = promo.InitializePrometheus(m.Settings)
}

m := metrics.NewMetrics(metricsSettings)
samplingGauge := m.CreateSamplingRate()
samplingGauge.Set(float64(cfg.Sampling))

mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
var deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
if cfg.Deduper == DeduperFirstCome {
deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
}

return &Flows{
ebpf: fetcher,
Expand All @@ -244,6 +251,8 @@ func flowsAgent(cfg *Config,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
deduper: deduper,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
promoServer: promoServer,
Expand All @@ -264,12 +273,12 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
}
}

func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
switch cfg.Export {
case "grpc":
return buildGRPCExporter(cfg)
return buildGRPCExporter(cfg, m)
case "kafka":
return buildKafkaExporter(cfg)
return buildKafkaExporter(cfg, m)
case "ipfix+udp":
return buildIPFIXExporter(cfg, "udp")
case "ipfix+tcp":
Expand All @@ -281,19 +290,12 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
}
}

func buildGRPCExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
metrics := metrics.NewMetrics(&metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
})
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, metrics)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +310,7 @@ func buildDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], err
return flpExporter.ExportFlows, nil
}

func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
Expand All @@ -318,13 +320,6 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error)
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
}
transport := kafkago.Transport{}
m := metrics.NewMetrics(&metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
})
if cfg.KafkaEnableTLS {
tlsConfig, err := buildTLSConfig(cfg)
if err != nil {
Expand Down Expand Up @@ -361,9 +356,7 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error)
Transport: &transport,
Balancer: &kafkago.Hash{},
},
NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(),
ExportedRecordsBatchSize: m.CreateKafkaBatchSize(),
Errors: m.GetErrorsCounter(),
Metrics: m,
}).ExportFlows, nil
}

Expand Down Expand Up @@ -450,7 +443,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
accounter := node.AsMiddle(f.accounter.Account,
node.ChannelBufferLen(f.cfg.BuffersLength))

limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
limiter := node.AsMiddle(f.limiter.Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))

decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
Expand All @@ -466,9 +459,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl

rbTracer.SendsTo(accounter)

if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))
if f.deduper != nil {
deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
deduper.SendsTo(limiter)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -204,6 +205,7 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
ebpfTracer := test.NewTracerFake()
export := test.NewExporterFake()
agent, err := flowsAgent(cfg,
metrics.NewMetrics(&metrics.Settings{}),
test.SliceInformerFake{
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"

"github.com/cilium/ebpf/perf"
"github.com/prometheus/client_golang/prometheus"
)

// Packets reporting agent
Expand Down Expand Up @@ -41,7 +41,7 @@ type ebpfPacketFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
}

Expand Down
21 changes: 11 additions & 10 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/cilium/ebpf"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/gavv/monotime"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -163,6 +163,7 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
if err != nil {
return nil, fmt.Errorf("accessing to ringbuffer: %w", err)
}

return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
Expand Down Expand Up @@ -411,27 +412,30 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap(c prometheus.Counter) map[BpfFlowId][]BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metrics []BpfFlowMetrics

count := 0
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
c.Inc()
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
}
met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't those counters are dup to what we we already have in tracer_map ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's different , the counters are used to compute rates, like number of flows evicted per second, but this allows to get the size of the maps. In fact (to answer your next question) they have been incredibly useful to me as it's thanks to these gauges that I understood exactly what was going wrong here with LookupAndDelete: the "hashmap-total" was growing to 100K elements ie. the full map size even when it was not supposed to be full, while the "hashmap-unique" gauge was much smaller, something like 2K. So that's what led me to find the issue about deleting within the iteration.


return flows
}
Expand Down Expand Up @@ -594,9 +598,6 @@ func NewPacketFetcher(
return nil, fmt.Errorf("loading and assigning BPF objects: %w", err)
}

if err != nil {
return nil, err
}
// read packets from igress+egress perf array
packets, err := perf.NewReader(objects.PacketRecord, os.Getpagesize())
if err != nil {
Expand Down Expand Up @@ -795,7 +796,7 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
return p.perfReader.Read()
}

func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte {
func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)
Expand All @@ -806,7 +807,7 @@ func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).
Warnf("couldn't delete entry")
c.Inc()
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc()
}
packets[id] = append(packets[id], packet...)
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

var glog = logrus.WithField("component", "exporter/GRPCProto")

const componentGRPC = "grpc"

// GRPCProto flow exporter. Its ExportFlows method accepts slices of *flow.Record
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
Expand All @@ -24,10 +26,9 @@ type GRPCProto struct {
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
numberOfRecordsExportedByGRPC prometheus.Counter
exportedRecordsBatchSize prometheus.Counter
errors *metrics.ErrorCounter
maxFlowsPerMessage int
metrics *metrics.Metrics
batchCounter prometheus.Counter
}

func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
Expand All @@ -36,13 +37,12 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr
return nil, err
}
return &GRPCProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
numberOfRecordsExportedByGRPC: m.CreateNumberOfRecordsExportedByGRPC(),
exportedRecordsBatchSize: m.CreateGRPCBatchSize(),
errors: m.GetErrorsCounter(),
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
metrics: m,
batchCounter: m.CreateBatchCounter(componentGRPC),
}, nil
}

Expand All @@ -52,18 +52,19 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
socket := utils.GetSocket(g.hostIP, g.hostPort)
log := glog.WithField("collector", socket)
for inputRecords := range input {
g.exportedRecordsBatchSize.Inc()
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.errors.WithValues("CantWriteMessage", "grpc").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.numberOfRecordsExportedByGRPC.Add(float64(len(pbRecords.Entries)))
g.batchCounter.Inc()
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
}
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.errors.WithValues("CantCloseClient", "grpc").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc()
}
}
Loading
Loading