Skip to content

Commit

Permalink
NETOBSERV-1533: refine metrics for dashboard creation (#281)
Browse files Browse the repository at this point in the history
* NETOBSERV-1533: refine metrics for dashboard creation

- Use just two shared metrics for eviction counters: on for eviction
  events and one for flows; shared across ringbuf/map implementations
and labelled as such
- Report more errors via metrics

* Fix shared metrics

- Use a single metrics object that holds shared metrics
- Create shared metrics at startup

* Add more metrics:

- accounter and deduper buffer gauges
- use eviction metrics for exporters
- eviction from deduper
- limiter drops counter

* Use a different metric for ringbuf eviction counter

* Add map size gauges
  • Loading branch information
jotak authored Mar 4, 2024
1 parent d4d25a4 commit 1d85464
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 218 deletions.
78 changes: 35 additions & 43 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cilium/ebpf/ringbuf"
"github.com/gavv/monotime"
"github.com/prometheus/client_golang/prometheus"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -110,6 +109,8 @@ type Flows struct {
mapTracer *flow.MapTracer
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
exporter node.TerminalFunc[[]*flow.Record]

// elements used to decorate flows with extra information
Expand All @@ -125,7 +126,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap(c prometheus.Counter) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error)
}
Expand All @@ -144,8 +145,18 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}
alog.Debug("agent IP: " + agentIP.String())

// initialize metrics
metricsSettings := &metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
}
m := metrics.NewMetrics(metricsSettings)

// configure selected exporter
exportFunc, err := buildFlowExporter(cfg)
exportFunc, err := buildFlowExporter(cfg, m)
if err != nil {
return nil, err
}
Expand All @@ -172,11 +183,11 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return nil, err
}

return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP)
return flowsAgent(cfg, m, informer, fetcher, exportFunc, agentIP)
}

// flowsAgent is a private constructor with injectable dependencies, usable for tests
func flowsAgent(cfg *Config,
func flowsAgent(cfg *Config, m *metrics.Metrics,
informer ifaces.Informer,
fetcher ebpfFlowFetcher,
exporter node.TerminalFunc[[]*flow.Record],
Expand Down Expand Up @@ -215,25 +226,21 @@ func flowsAgent(cfg *Config,
return iface
}
var promoServer *http.Server
metricsSettings := &metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
}
if cfg.MetricsEnable {
promoServer = promo.InitializePrometheus(metricsSettings)
promoServer = promo.InitializePrometheus(m.Settings)
}

m := metrics.NewMetrics(metricsSettings)
samplingGauge := m.CreateSamplingRate()
samplingGauge.Set(float64(cfg.Sampling))

mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
var deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
if cfg.Deduper == DeduperFirstCome {
deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
}

return &Flows{
ebpf: fetcher,
Expand All @@ -244,6 +251,8 @@ func flowsAgent(cfg *Config,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
deduper: deduper,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
promoServer: promoServer,
Expand All @@ -264,12 +273,12 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
}
}

func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
switch cfg.Export {
case "grpc":
return buildGRPCExporter(cfg)
return buildGRPCExporter(cfg, m)
case "kafka":
return buildKafkaExporter(cfg)
return buildKafkaExporter(cfg, m)
case "ipfix+udp":
return buildIPFIXExporter(cfg, "udp")
case "ipfix+tcp":
Expand All @@ -281,19 +290,12 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
}
}

func buildGRPCExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
metrics := metrics.NewMetrics(&metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
})
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, metrics)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +310,7 @@ func buildDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], err
return flpExporter.ExportFlows, nil
}

func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
Expand All @@ -318,13 +320,6 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error)
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
}
transport := kafkago.Transport{}
m := metrics.NewMetrics(&metrics.Settings{
PromConnectionInfo: metrics.PromConnectionInfo{
Address: cfg.MetricsServerAddress,
Port: cfg.MetricsPort,
},
Prefix: cfg.MetricsPrefix,
})
if cfg.KafkaEnableTLS {
tlsConfig, err := buildTLSConfig(cfg)
if err != nil {
Expand Down Expand Up @@ -361,9 +356,7 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error)
Transport: &transport,
Balancer: &kafkago.Hash{},
},
NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(),
ExportedRecordsBatchSize: m.CreateKafkaBatchSize(),
Errors: m.GetErrorsCounter(),
Metrics: m,
}).ExportFlows, nil
}

Expand Down Expand Up @@ -450,7 +443,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
accounter := node.AsMiddle(f.accounter.Account,
node.ChannelBufferLen(f.cfg.BuffersLength))

limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
limiter := node.AsMiddle(f.limiter.Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))

decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
Expand All @@ -466,9 +459,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl

rbTracer.SendsTo(accounter)

if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))
if f.deduper != nil {
deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
deduper.SendsTo(limiter)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -204,6 +205,7 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
ebpfTracer := test.NewTracerFake()
export := test.NewExporterFake()
agent, err := flowsAgent(cfg,
metrics.NewMetrics(&metrics.Settings{}),
test.SliceInformerFake{
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"

"github.com/cilium/ebpf/perf"
"github.com/prometheus/client_golang/prometheus"
)

// Packets reporting agent
Expand Down Expand Up @@ -41,7 +41,7 @@ type ebpfPacketFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
}

Expand Down
21 changes: 11 additions & 10 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/cilium/ebpf"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/gavv/monotime"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -163,6 +163,7 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
if err != nil {
return nil, fmt.Errorf("accessing to ringbuffer: %w", err)
}

return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
Expand Down Expand Up @@ -411,27 +412,30 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap(c prometheus.Counter) map[BpfFlowId][]BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metrics []BpfFlowMetrics

count := 0
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
c.Inc()
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
}
met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))

return flows
}
Expand Down Expand Up @@ -594,9 +598,6 @@ func NewPacketFetcher(
return nil, fmt.Errorf("loading and assigning BPF objects: %w", err)
}

if err != nil {
return nil, err
}
// read packets from igress+egress perf array
packets, err := perf.NewReader(objects.PacketRecord, os.Getpagesize())
if err != nil {
Expand Down Expand Up @@ -795,7 +796,7 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
return p.perfReader.Read()
}

func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte {
func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)
Expand All @@ -806,7 +807,7 @@ func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).
Warnf("couldn't delete entry")
c.Inc()
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc()
}
packets[id] = append(packets[id], packet...)
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

var glog = logrus.WithField("component", "exporter/GRPCProto")

const componentGRPC = "grpc"

// GRPCProto flow exporter. Its ExportFlows method accepts slices of *flow.Record
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
Expand All @@ -24,10 +26,9 @@ type GRPCProto struct {
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
numberOfRecordsExportedByGRPC prometheus.Counter
exportedRecordsBatchSize prometheus.Counter
errors *metrics.ErrorCounter
maxFlowsPerMessage int
metrics *metrics.Metrics
batchCounter prometheus.Counter
}

func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
Expand All @@ -36,13 +37,12 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr
return nil, err
}
return &GRPCProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
numberOfRecordsExportedByGRPC: m.CreateNumberOfRecordsExportedByGRPC(),
exportedRecordsBatchSize: m.CreateGRPCBatchSize(),
errors: m.GetErrorsCounter(),
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
metrics: m,
batchCounter: m.CreateBatchCounter(componentGRPC),
}, nil
}

Expand All @@ -52,18 +52,19 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
socket := utils.GetSocket(g.hostIP, g.hostPort)
log := glog.WithField("collector", socket)
for inputRecords := range input {
g.exportedRecordsBatchSize.Inc()
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.errors.WithValues("CantWriteMessage", "grpc").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.numberOfRecordsExportedByGRPC.Add(float64(len(pbRecords.Entries)))
g.batchCounter.Inc()
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
}
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.errors.WithValues("CantCloseClient", "grpc").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc()
}
}
Loading

0 comments on commit 1d85464

Please sign in to comment.