From 1d85464b8e93ff3c207b3ec771f7a8c36515aade Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 4 Mar 2024 15:59:32 +0100 Subject: [PATCH] NETOBSERV-1533: refine metrics for dashboard creation (#281) * NETOBSERV-1533: refine metrics for dashboard creation - Use just two shared metrics for eviction counters: on for eviction events and one for flows; shared across ringbuf/map implementations and labelled as such - Report more errors via metrics * Fix shared metrics - Use a single metrics object that holds shared metrics - Create shared metrics at startup * Add more metrics: - accounter and deduper buffer gauges - use eviction metrics for exporters - eviction from deduper - limiter drops counter * Use a different metric for ringbuf eviction counter * Add map size gauges --- pkg/agent/agent.go | 78 ++++++++---------- pkg/agent/agent_test.go | 2 + pkg/agent/packets_agent.go | 4 +- pkg/ebpf/tracer.go | 21 ++--- pkg/exporter/grpc_proto.go | 31 +++---- pkg/exporter/grpc_proto_test.go | 21 +---- pkg/exporter/kafka_proto.go | 17 ++-- pkg/exporter/kafka_proto_test.go | 15 +--- pkg/flow/account.go | 40 ++++----- pkg/flow/deduper.go | 7 +- pkg/flow/deduper_test.go | 7 +- pkg/flow/limiter.go | 7 ++ pkg/flow/limiter_test.go | 3 +- pkg/flow/tracer_map.go | 16 ++-- pkg/flow/tracer_ringbuf.go | 30 +++---- pkg/metrics/metrics.go | 136 ++++++++++++++++++------------- pkg/metrics/metrics_test.go | 4 +- pkg/test/tracer_fake.go | 4 +- 18 files changed, 225 insertions(+), 218 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index cfd2c5e76..e8bf27517 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -19,7 +19,6 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/gavv/monotime" - "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/compress" "github.com/sirupsen/logrus" @@ -110,6 +109,8 @@ type Flows struct { mapTracer *flow.MapTracer rbTracer *flow.RingBufTracer accounter *flow.Accounter + limiter *flow.CapacityLimiter + deduper node.MiddleFunc[[]*flow.Record, []*flow.Record] exporter node.TerminalFunc[[]*flow.Record] // elements used to decorate flows with extra information @@ -125,7 +126,7 @@ type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error - LookupAndDeleteMap(c prometheus.Counter) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) ReadRingBuf() (ringbuf.Record, error) } @@ -144,8 +145,18 @@ func FlowsAgent(cfg *Config) (*Flows, error) { } alog.Debug("agent IP: " + agentIP.String()) + // initialize metrics + metricsSettings := &metrics.Settings{ + PromConnectionInfo: metrics.PromConnectionInfo{ + Address: cfg.MetricsServerAddress, + Port: cfg.MetricsPort, + }, + Prefix: cfg.MetricsPrefix, + } + m := metrics.NewMetrics(metricsSettings) + // configure selected exporter - exportFunc, err := buildFlowExporter(cfg) + exportFunc, err := buildFlowExporter(cfg, m) if err != nil { return nil, err } @@ -172,11 +183,11 @@ func FlowsAgent(cfg *Config) (*Flows, error) { return nil, err } - return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP) + return flowsAgent(cfg, m, informer, fetcher, exportFunc, agentIP) } // flowsAgent is a private constructor with injectable dependencies, usable for tests -func flowsAgent(cfg *Config, +func flowsAgent(cfg *Config, m *metrics.Metrics, informer ifaces.Informer, fetcher ebpfFlowFetcher, exporter node.TerminalFunc[[]*flow.Record], @@ -215,25 +226,21 @@ func flowsAgent(cfg *Config, return iface } var promoServer *http.Server - metricsSettings := &metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{ - Address: cfg.MetricsServerAddress, - Port: cfg.MetricsPort, - }, - Prefix: cfg.MetricsPrefix, - } if cfg.MetricsEnable { - promoServer = promo.InitializePrometheus(metricsSettings) + promoServer = promo.InitializePrometheus(m.Settings) } - m := metrics.NewMetrics(metricsSettings) samplingGauge := m.CreateSamplingRate() samplingGauge.Set(float64(cfg.Sampling)) mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m) - accounter := flow.NewAccounter( - cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m) + accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m) + limiter := flow.NewCapacityLimiter(m) + var deduper node.MiddleFunc[[]*flow.Record, []*flow.Record] + if cfg.Deduper == DeduperFirstCome { + deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m) + } return &Flows{ ebpf: fetcher, @@ -244,6 +251,8 @@ func flowsAgent(cfg *Config, mapTracer: mapTracer, rbTracer: rbTracer, accounter: accounter, + limiter: limiter, + deduper: deduper, agentIP: agentIP, interfaceNamer: interfaceNamer, promoServer: promoServer, @@ -264,12 +273,12 @@ func flowDirections(cfg *Config) (ingress, egress bool) { } } -func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { +func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) { switch cfg.Export { case "grpc": - return buildGRPCExporter(cfg) + return buildGRPCExporter(cfg, m) case "kafka": - return buildKafkaExporter(cfg) + return buildKafkaExporter(cfg, m) case "ipfix+udp": return buildIPFIXExporter(cfg, "udp") case "ipfix+tcp": @@ -281,19 +290,12 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { } } -func buildGRPCExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { +func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) { if cfg.TargetHost == "" || cfg.TargetPort == 0 { return nil, fmt.Errorf("missing target host or port: %s:%d", cfg.TargetHost, cfg.TargetPort) } - metrics := metrics.NewMetrics(&metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{ - Address: cfg.MetricsServerAddress, - Port: cfg.MetricsPort, - }, - Prefix: cfg.MetricsPrefix, - }) - grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, metrics) + grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m) if err != nil { return nil, err } @@ -308,7 +310,7 @@ func buildDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], err return flpExporter.ExportFlows, nil } -func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { +func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) { if len(cfg.KafkaBrokers) == 0 { return nil, errors.New("at least one Kafka broker is needed") } @@ -318,13 +320,6 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) "none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err) } transport := kafkago.Transport{} - m := metrics.NewMetrics(&metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{ - Address: cfg.MetricsServerAddress, - Port: cfg.MetricsPort, - }, - Prefix: cfg.MetricsPrefix, - }) if cfg.KafkaEnableTLS { tlsConfig, err := buildTLSConfig(cfg) if err != nil { @@ -361,9 +356,7 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) Transport: &transport, Balancer: &kafkago.Hash{}, }, - NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(), - ExportedRecordsBatchSize: m.CreateKafkaBatchSize(), - Errors: m.GetErrorsCounter(), + Metrics: m, }).ExportFlows, nil } @@ -450,7 +443,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl accounter := node.AsMiddle(f.accounter.Account, node.ChannelBufferLen(f.cfg.BuffersLength)) - limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit, + limiter := node.AsMiddle(f.limiter.Limit, node.ChannelBufferLen(f.cfg.BuffersLength)) decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer), @@ -466,9 +459,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl rbTracer.SendsTo(accounter) - if f.cfg.Deduper == DeduperFirstCome { - deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer), - node.ChannelBufferLen(f.cfg.BuffersLength)) + if f.deduper != nil { + deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength)) mapTracer.SendsTo(deduper) accounter.SendsTo(deduper) deduper.SendsTo(limiter) diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 2710f3f6f..7c2172676 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -10,6 +10,7 @@ import ( test2 "github.com/mariomac/guara/pkg/test" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/netobserv/netobserv-ebpf-agent/pkg/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -204,6 +205,7 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { ebpfTracer := test.NewTracerFake() export := test.NewExporterFake() agent, err := flowsAgent(cfg, + metrics.NewMetrics(&metrics.Settings{}), test.SliceInformerFake{ {Name: "foo", Index: 3}, {Name: "bar", Index: 4}, diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 49371cea0..619fe33d3 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -11,9 +11,9 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/exporter" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/cilium/ebpf/perf" - "github.com/prometheus/client_golang/prometheus" ) // Packets reporting agent @@ -41,7 +41,7 @@ type ebpfPacketFetcher interface { io.Closer Register(iface ifaces.Interface) error - LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte + LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte ReadPerf() (perf.Record, error) } diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index bd267d249..be7a1fb5e 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" "github.com/cilium/ebpf" @@ -20,7 +21,6 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" "github.com/gavv/monotime" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -163,6 +163,7 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { if err != nil { return nil, fmt.Errorf("accessing to ringbuffer: %w", err) } + return &FlowFetcher{ objects: &objects, ringbufReader: flows, @@ -411,7 +412,7 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Race conditions here causes that some flows are lost in high-load scenarios -func (m *FlowFetcher) LookupAndDeleteMap(c prometheus.Counter) map[BpfFlowId][]BpfFlowMetrics { +func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() @@ -419,19 +420,22 @@ func (m *FlowFetcher) LookupAndDeleteMap(c prometheus.Counter) map[BpfFlowId][]B var id BpfFlowId var metrics []BpfFlowMetrics + count := 0 // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively for iterator.Next(&id, &metrics) { + count++ if err := flowMap.Delete(id); err != nil { - log.WithError(err).WithField("flowId", id). - Warnf("couldn't delete flow entry") - c.Inc() + log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") + met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() } // We observed that eBFP PerCPU map might insert multiple times the same key in the map // (probably due to race conditions) so we need to re-join metrics again at userspace // TODO: instrument how many times the keys are is repeated in the same eviction flows[id] = append(flows[id], metrics...) } + met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count)) + met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows))) return flows } @@ -594,9 +598,6 @@ func NewPacketFetcher( return nil, fmt.Errorf("loading and assigning BPF objects: %w", err) } - if err != nil { - return nil, err - } // read packets from igress+egress perf array packets, err := perf.NewReader(objects.PacketRecord, os.Getpagesize()) if err != nil { @@ -795,7 +796,7 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) { return p.perfReader.Read() } -func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte { +func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte { packetMap := p.objects.PacketRecord iterator := packetMap.Iterate() packets := make(map[int][]*byte, p.cacheMaxSize) @@ -806,7 +807,7 @@ func (p *PacketFetcher) LookupAndDeleteMap(c prometheus.Counter) map[int][]*byte if err := packetMap.Delete(id); err != nil { log.WithError(err).WithField("packetID ", id). Warnf("couldn't delete entry") - c.Inc() + met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc() } packets[id] = append(packets[id], packet...) } diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index ea6bee947..9161f0c11 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -14,6 +14,8 @@ import ( var glog = logrus.WithField("component", "exporter/GRPCProto") +const componentGRPC = "grpc" + // GRPCProto flow exporter. Its ExportFlows method accepts slices of *flow.Record // by its input channel, converts them to *pbflow.Records instances, and submits // them to the collector. @@ -24,10 +26,9 @@ type GRPCProto struct { // maxFlowsPerMessage limits the maximum number of flows per GRPC message. // If a message contains more flows than this number, the GRPC message will be split into // multiple messages. - maxFlowsPerMessage int - numberOfRecordsExportedByGRPC prometheus.Counter - exportedRecordsBatchSize prometheus.Counter - errors *metrics.ErrorCounter + maxFlowsPerMessage int + metrics *metrics.Metrics + batchCounter prometheus.Counter } func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) { @@ -36,13 +37,12 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr return nil, err } return &GRPCProto{ - hostIP: hostIP, - hostPort: hostPort, - clientConn: clientConn, - maxFlowsPerMessage: maxFlowsPerMessage, - numberOfRecordsExportedByGRPC: m.CreateNumberOfRecordsExportedByGRPC(), - exportedRecordsBatchSize: m.CreateGRPCBatchSize(), - errors: m.GetErrorsCounter(), + hostIP: hostIP, + hostPort: hostPort, + clientConn: clientConn, + maxFlowsPerMessage: maxFlowsPerMessage, + metrics: m, + batchCounter: m.CreateBatchCounter(componentGRPC), }, nil } @@ -52,18 +52,19 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) { socket := utils.GetSocket(g.hostIP, g.hostPort) log := glog.WithField("collector", socket) for inputRecords := range input { - g.exportedRecordsBatchSize.Inc() + g.metrics.EvictionCounter.WithSource(componentGRPC).Inc() for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) { log.Debugf("sending %d records", len(pbRecords.Entries)) if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { - g.errors.WithValues("CantWriteMessage", "grpc").Inc() + g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc() log.WithError(err).Error("couldn't send flow records to collector") } - g.numberOfRecordsExportedByGRPC.Add(float64(len(pbRecords.Entries))) + g.batchCounter.Inc() + g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries))) } } if err := g.clientConn.Close(); err != nil { log.WithError(err).Warn("couldn't close flow export client") - g.errors.WithValues("CantCloseClient", "grpc").Inc() + g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc() } } diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index 315cd0237..010e77d22 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -29,12 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) { defer coll.Close() // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("127.0.0.1", port, 1000, - &metrics.Metrics{Settings: &metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{}, - Prefix: "", - }, - }) + exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send some flows to the input of the exporter stage @@ -76,12 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) { defer coll.Close() // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("::1", port, 1000, - &metrics.Metrics{Settings: &metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{}, - Prefix: "", - }, - }) + exporter, err := StartGRPCProto("::1", port, 1000, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send some flows to the input of the exporter stage @@ -124,12 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) { const msgMaxLen = 10000 // Start GRPCProto exporter stage - exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, - &metrics.Metrics{Settings: &metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{}, - Prefix: "", - }, - }) + exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NewMetrics(&metrics.Settings{})) require.NoError(t, err) // Send a message much longer than the limit length diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index 97ef4a990..414b06b86 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -6,7 +6,6 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" - "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" @@ -14,6 +13,8 @@ import ( var klog = logrus.WithField("component", "exporter/KafkaProto") +const componentKafka = "kafka" + type kafkaWriter interface { WriteMessages(ctx context.Context, msgs ...kafkago.Message) error } @@ -21,10 +22,8 @@ type kafkaWriter interface { // KafkaProto exports flows over Kafka, encoded as a protobuf that is understandable by the // Flowlogs-Pipeline collector type KafkaProto struct { - Writer kafkaWriter - NumberOfRecordsExportedByKafka prometheus.Counter - ExportedRecordsBatchSize prometheus.Counter - Errors *metrics.ErrorCounter + Writer kafkaWriter + Metrics *metrics.Metrics } func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) { @@ -53,18 +52,18 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { pbBytes, err := proto.Marshal(flowToPB(record)) if err != nil { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") - kp.Errors.WithValues("CantEncodeMessage", "kafka").Inc() + kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage").Inc() continue } msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)}) - kp.ExportedRecordsBatchSize.Add(float64(len(pbBytes))) } if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil { klog.WithError(err).Error("can't write messages into Kafka") - kp.Errors.WithValues("CantWriteMessage", "kafka").Inc() + kp.Metrics.Errors.WithErrorName(componentKafka, "CannotWriteMessage").Inc() } - kp.NumberOfRecordsExportedByKafka.Add(float64(len(records))) + kp.Metrics.EvictionCounter.WithSource(componentKafka).Inc() + kp.Metrics.EvictedFlowsCounter.WithSource(componentKafka).Add(float64(len(records))) } type JSONRecord struct { diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index b53236c3b..6c5a8f58b 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -31,18 +31,9 @@ func ByteArrayFromNetIP(netIP net.IP) []uint8 { func TestProtoConversion(t *testing.T) { wc := writerCapturer{} - m := metrics.NewMetrics( - &metrics.Settings{ - PromConnectionInfo: metrics.PromConnectionInfo{}, - Prefix: "", - }) - - kj := KafkaProto{ - Writer: &wc, - NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(), - ExportedRecordsBatchSize: m.CreateKafkaBatchSize(), - Errors: m.GetErrorsCounter(), - } + m := metrics.NewMetrics(&metrics.Settings{}) + + kj := KafkaProto{Writer: &wc, Metrics: m} input := make(chan []*flow.Record, 11) record := flow.Record{} record.Id.EthProtocol = 3 diff --git a/pkg/flow/account.go b/pkg/flow/account.go index 91421ba7b..6d42b046f 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -6,7 +6,6 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -15,12 +14,12 @@ import ( // for the edge case where packets are submitted directly via ring-buffer because the kernel-side // accounting map is full. type Accounter struct { - maxEntries int - evictTimeout time.Duration - entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics - clock func() time.Time - monoClock func() time.Duration - userSpaceEvictionCounter prometheus.Counter + maxEntries int + evictTimeout time.Duration + entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + clock func() time.Time + monoClock func() time.Duration + metrics *metrics.Metrics } var alog = logrus.WithField("component", "flow/Accounter") @@ -33,14 +32,15 @@ func NewAccounter( monoClock func() time.Duration, m *metrics.Metrics, ) *Accounter { - return &Accounter{ - maxEntries: maxEntries, - evictTimeout: evictTimeout, - entries: map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}, - clock: clock, - monoClock: monoClock, - userSpaceEvictionCounter: m.CreateUserSpaceEvictionCounter(), + acc := Accounter{ + maxEntries: maxEntries, + evictTimeout: evictTimeout, + entries: map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}, + clock: clock, + monoClock: monoClock, + metrics: m, } + return &acc } // Account runs in a new goroutine. It reads all the records from the input channel @@ -59,14 +59,14 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} logrus.WithField("flows", len(evictingEntries)). Debug("evicting flows from userspace accounter on timeout") - c.evict(evictingEntries, out) + c.evict(evictingEntries, out, "timeout") case record, ok := <-in: if !ok { alog.Debug("input channel closed. Evicting entries") // if the records channel is closed, we evict the entries in the // same goroutine to wait for all the entries to be sent before // closing the channel - c.evict(c.entries, out) + c.evict(c.entries, out, "closing") alog.Debug("exiting account routine") return } @@ -78,7 +78,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} logrus.WithField("flows", len(evictingEntries)). Debug("evicting flows from userspace accounter after reaching cache max length") - c.evict(evictingEntries, out) + c.evict(evictingEntries, out, "full") // Since we will evict flows because we reached to cacheMaxFlows then reset // evictTimer to avoid unnecessary another eviction when timer expires. evictTick.Reset(c.evictTimeout) @@ -86,17 +86,19 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { c.entries[record.Id] = &record.Metrics } } + c.metrics.BufferSizeGauge.WithBufferName("accounter-entries").Set(float64(len(c.entries))) } } -func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evictor chan<- []*Record) { +func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evictor chan<- []*Record, reason string) { now := c.clock() monotonicNow := uint64(c.monoClock()) records := make([]*Record, 0, len(entries)) for key, metrics := range entries { records = append(records, NewRecord(key, metrics, now, monotonicNow)) } + c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc() + c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records))) alog.WithField("numEntries", len(records)).Debug("records evicted from userspace accounter") - c.userSpaceEvictionCounter.Inc() evictor <- records } diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go index d8cb8ea4b..13db64010 100644 --- a/pkg/flow/deduper.go +++ b/pkg/flow/deduper.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" ) var dlog = logrus.WithField("component", "flow/Deduper") @@ -39,7 +40,7 @@ type entry struct { // (no activity for it during the expiration time) // The justMark argument tells that the deduper should not drop the duplicate flows but // set their Duplicate field. -func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) { +func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer, m *metrics.Metrics) func(in <-chan []*Record, out chan<- []*Record) { cache := &deduperCache{ expire: expireTime, entries: list.New(), @@ -54,7 +55,11 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer Interf } if len(fwd) > 0 { out <- fwd + m.EvictionCounter.WithSource("deduper").Inc() + m.EvictedFlowsCounter.WithSource("deduper").Add(float64(len(fwd))) } + m.BufferSizeGauge.WithBufferName("deduper-list").Set(float64(cache.entries.Len())) + m.BufferSizeGauge.WithBufferName("deduper-map").Set(float64(len(cache.ifaces))) } } } diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index e054191b0..9089c80cd 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" ) var ( @@ -70,7 +71,7 @@ func TestDedupe(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(time.Minute, false, false, interfaceNamer)(input, output) + go Dedupe(time.Minute, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output) input <- []*Record{ oneIf2, // record 1 at interface 2: should be accepted @@ -108,7 +109,7 @@ func TestDedupe_EvictFlows(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(15*time.Second, false, false, interfaceNamer)(input, output) + go Dedupe(15*time.Second, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output) // Should only accept records 1 and 2, at interface 1 input <- []*Record{oneIf1, twoIf1, oneIf2} @@ -143,7 +144,7 @@ func TestDedupeMerge(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(time.Minute, false, true, interfaceNamer)(input, output) + go Dedupe(time.Minute, false, true, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output) input <- []*Record{ oneIf2, // record 1 at interface 2: should be accepted diff --git a/pkg/flow/limiter.go b/pkg/flow/limiter.go index 08ce64bab..64b801cf8 100644 --- a/pkg/flow/limiter.go +++ b/pkg/flow/limiter.go @@ -3,6 +3,7 @@ package flow import ( "time" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/sirupsen/logrus" ) @@ -16,6 +17,11 @@ var cllog = logrus.WithField("component", "capacity.Limiter") // log a message about the number of lost flows. type CapacityLimiter struct { droppedFlows int + metrics *metrics.Metrics +} + +func NewCapacityLimiter(m *metrics.Metrics) *CapacityLimiter { + return &CapacityLimiter{metrics: m} } func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record) { @@ -24,6 +30,7 @@ func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record) { if len(out) < cap(out) || cap(out) == 0 { out <- i } else { + c.metrics.DroppedFlowsCounter.WithSourceAndReason("limiter", "full").Add(float64(len(i))) c.droppedFlows += len(i) } } diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go index a2cb510d1..eebaeaede 100644 --- a/pkg/flow/limiter_test.go +++ b/pkg/flow/limiter_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/netobserv/gopipes/pkg/node" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -71,7 +72,7 @@ func capacityLimiterPipe() (in chan<- []*Record, out <-chan []*Record) { initOut <- i } }) - limiter := node.AsMiddle((&CapacityLimiter{}).Limit) + limiter := node.AsMiddle((NewCapacityLimiter(metrics.NewMetrics(&metrics.Settings{}))).Limit) term := node.AsTerminal(func(termIn <-chan []*Record) { for i := range termIn { outCh <- i diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index ebb9956ed..3e8b388d1 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -26,14 +26,12 @@ type MapTracer struct { // manages the access to the eviction routines, avoiding two evictions happening at the same time evictionCond *sync.Cond lastEvictionNs uint64 - hmapEvictionCounter prometheus.Counter - numberOfEvictedFlows prometheus.Counter + metrics *metrics.Metrics timeSpentinLookupAndDelete prometheus.Histogram - errors *metrics.ErrorCounter } type mapFetcher interface { - LookupAndDeleteMap(counter prometheus.Counter) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) } @@ -44,10 +42,8 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout lastEvictionNs: uint64(monotime.Now()), evictionCond: sync.NewCond(&sync.Mutex{}), staleEntriesEvictTimeout: staleEntriesEvictTimeout, - hmapEvictionCounter: m.CreateHashMapCounter(), - numberOfEvictedFlows: m.CreateNumberOfEvictedFlows(), + metrics: m, timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(), - errors: m.GetErrorsCounter(), } } @@ -105,7 +101,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c var forwardingFlows []*Record laterFlowNs := uint64(0) - flows := m.mapFetcher.LookupAndDeleteMap(m.errors.WithValues("CannotDeleteFlows", "")) + flows := m.mapFetcher.LookupAndDeleteMap(m.metrics) elapsed := time.Since(currentTime) for flowKey, flowMetrics := range flows { aggregatedMetrics := m.aggregate(flowMetrics) @@ -136,8 +132,8 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c if forceGC { runtime.GC() } - m.hmapEvictionCounter.Inc() - m.numberOfEvictedFlows.Add(float64(len(forwardingFlows))) + m.metrics.EvictionCounter.WithSource("hashmap").Inc() + m.metrics.EvictedFlowsCounter.WithSource("hashmap").Add(float64(len(forwardingFlows))) m.timeSpentinLookupAndDelete.Observe(elapsed.Seconds()) mtlog.Debugf("%d flows evicted", len(forwardingFlows)) } diff --git a/pkg/flow/tracer_ringbuf.go b/pkg/flow/tracer_ringbuf.go index 211e29601..f118a5315 100644 --- a/pkg/flow/tracer_ringbuf.go +++ b/pkg/flow/tracer_ringbuf.go @@ -13,7 +13,6 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/netobserv/gopipes/pkg/node" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -23,11 +22,10 @@ var rtlog = logrus.WithField("component", "flow.RingBufTracer") // added in the eBPF kernel space due to the map being full or busy) and submits them to the // userspace Aggregator map type RingBufTracer struct { - mapFlusher mapFlusher - ringBuffer ringBufReader - stats stats - numberOfFlowsReceived prometheus.Counter - errCanNotReadRingBuffCounter prometheus.Counter + mapFlusher mapFlusher + ringBuffer ringBufReader + stats stats + metrics *metrics.Metrics } type ringBufReader interface { @@ -46,14 +44,12 @@ type mapFlusher interface { Flush() } -func NewRingBufTracer( - reader ringBufReader, flusher mapFlusher, logTimeout time.Duration, m *metrics.Metrics, -) *RingBufTracer { +func NewRingBufTracer(reader ringBufReader, flusher mapFlusher, logTimeout time.Duration, m *metrics.Metrics) *RingBufTracer { return &RingBufTracer{ - mapFlusher: flusher, - ringBuffer: reader, - stats: stats{loggingTimeout: logTimeout}, - numberOfFlowsReceived: m.CreateNumberOfFlowsReceivedByRingBuffer(), + mapFlusher: flusher, + ringBuffer: reader, + stats: stats{loggingTimeout: logTimeout}, + metrics: m, } } @@ -82,12 +78,13 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*RawRecord func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *RawRecord) error { event, err := m.ringBuffer.ReadRingBuf() if err != nil { + m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer").Inc() return fmt.Errorf("reading from ring buffer: %w", err) } // Parses the ringbuf event entry into an Event structure. readFlow, err := ReadFrom(bytes.NewBuffer(event.RawSample)) if err != nil { - m.errCanNotReadRingBuffCounter.Inc() + m.metrics.Errors.WithErrorName("ringbuffer", "CannotParseRingbuffer").Inc() return fmt.Errorf("parsing data received from the ring buffer: %w", err) } mapFullError := readFlow.Metrics.Errno == uint8(syscall.E2BIG) @@ -96,10 +93,13 @@ func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh cha } // if the flow was received due to lack of space in the eBPF map // forces a flow's eviction to leave room for new flows in the ebpf cache + var reason string if mapFullError { m.mapFlusher.Flush() + reason = "mapfull" } - m.numberOfFlowsReceived.Inc() + // In ringbuffer, a "flow" is a 1-packet flow, it hasn't gone through aggregation yet. So we use the packet counter metric. + m.metrics.EvictedPacketsCounter.WithSourceAndReason("ringbuffer", reason).Inc() // Will need to send it to accounter anyway to account regardless of complete/ongoing flow forwardCh <- readFlow return nil diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index dfb4f71a7..57c494b28 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -46,54 +46,62 @@ func defineMetric(name, help string, t metricType, labels ...string) MetricDefin } var ( - hmapEvictionsTotal = defineMetric( - "hashmap_evictions_total", - "Number of hashmap evictions total", + evictionsTotal = defineMetric( + "evictions_total", + "Number of eviction events", TypeCounter, + "source", + "reason", ) - userspaceNumberOfEvictionsTotal = defineMetric( - "userspace_number_of_evictions_total", - "Number of userspace evictions total", + evictedFlowsTotal = defineMetric( + "evicted_flows_total", + "Number of evicted flows", TypeCounter, + "source", + "reason", ) - numberOfevictedFlowsTotal = defineMetric( - "number_of_evicted_flows_total", - "Number of evicted flows Total", - TypeCounter, - ) - numberofFlowsreceivedviaRingBufferTotal = defineMetric( - "number_of_flows_received_via_ring_buffer_total", - "Number of flows received via ring buffer total", + evictedPktTotal = defineMetric( + "evicted_packets_total", + "Number of evicted packets", TypeCounter, + "source", + "reason", ) lookupAndDeleteMapDurationSeconds = defineMetric( "lookup_and_delete_map_duration_seconds", - "Lookup and delete map duration seconds", + "Lookup and delete map duration in seconds", TypeHistogram, ) - numberOfWrittenRecordsTotal = defineMetric( - "number_of_written_records_total", - "Number of written records total", + droppedFlows = defineMetric( + "dropped_flows_total", + "Number of dropped flows", TypeCounter, - "exporter", + "source", + "reason", ) - exportedBatchSizeTotal = defineMetric( - "exported_batch_size_total", - "Exported batch size total", + bufferSize = defineMetric( + "buffer_size", + "Buffer size", + TypeGauge, + "name", + ) + exportedBatchCounterTotal = defineMetric( + "exported_batch_total", + "Exported batches", TypeCounter, "exporter", ) - samplingRateSeconds = defineMetric( - "sampling_rate_seconds", - "Sampling rate seconds", + samplingRate = defineMetric( + "sampling_rate", + "Sampling rate", TypeGauge, ) errorsCounter = defineMetric( "errors_total", "errors counter", TypeCounter, + "component", "error", - "exporter", ) ) @@ -116,10 +124,27 @@ func verifyMetricType(def *MetricDefinition, t metricType) { type Metrics struct { Settings *Settings + + // Shared metrics: + EvictionCounter *EvictionCounter + EvictedFlowsCounter *EvictionCounter + EvictedPacketsCounter *EvictionCounter + DroppedFlowsCounter *EvictionCounter + BufferSizeGauge *BufferSizeGauge + Errors *ErrorCounter } func NewMetrics(settings *Settings) *Metrics { - return &Metrics{Settings: settings} + m := &Metrics{ + Settings: settings, + } + m.EvictionCounter = &EvictionCounter{vec: m.NewCounterVec(&evictionsTotal)} + m.EvictedFlowsCounter = &EvictionCounter{vec: m.NewCounterVec(&evictedFlowsTotal)} + m.EvictedPacketsCounter = &EvictionCounter{vec: m.NewCounterVec(&evictedPktTotal)} + m.DroppedFlowsCounter = &EvictionCounter{vec: m.NewCounterVec(&droppedFlows)} + m.BufferSizeGauge = &BufferSizeGauge{vec: m.NewGaugeVec(&bufferSize)} + m.Errors = &ErrorCounter{vec: m.NewCounterVec(&errorsCounter)} + return m } // register will register against the default registry. May panic or not depending on settings @@ -169,6 +194,17 @@ func (m *Metrics) NewGauge(def *MetricDefinition, labels ...string) prometheus.G return c } +func (m *Metrics) NewGaugeVec(def *MetricDefinition) *prometheus.GaugeVec { + verifyMetricType(def, TypeGauge) + fullName := m.Settings.Prefix + def.Name + g := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: fullName, + Help: def.Help, + }, def.Labels) + m.register(g, fullName) + return g +} + func (m *Metrics) NewHistogram(def *MetricDefinition, buckets []float64, labels ...string) prometheus.Histogram { verifyMetricType(def, TypeHistogram) fullName := m.Settings.Prefix + def.Name @@ -182,56 +218,44 @@ func (m *Metrics) NewHistogram(def *MetricDefinition, buckets []float64, labels return c } -func (m *Metrics) CreateHashMapCounter() prometheus.Counter { - return m.NewCounter(&hmapEvictionsTotal) -} - -func (m *Metrics) CreateUserSpaceEvictionCounter() prometheus.Counter { - return m.NewCounter(&userspaceNumberOfEvictionsTotal) +// EvictionCounter provides syntactic sugar hidding prom's counter for eviction purpose +type EvictionCounter struct { + vec *prometheus.CounterVec } -func (m *Metrics) CreateNumberOfEvictedFlows() prometheus.Counter { - return m.NewCounter(&numberOfevictedFlowsTotal) +func (c *EvictionCounter) WithSourceAndReason(source, reason string) prometheus.Counter { + return c.vec.WithLabelValues(source, reason) } -func (m *Metrics) CreateNumberOfFlowsReceivedByRingBuffer() prometheus.Counter { - return m.NewCounter(&numberofFlowsreceivedviaRingBufferTotal) +func (c *EvictionCounter) WithSource(source string) prometheus.Counter { + return c.vec.WithLabelValues(source, "") } func (m *Metrics) CreateTimeSpendInLookupAndDelete() prometheus.Histogram { return m.NewHistogram(&lookupAndDeleteMapDurationSeconds, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}) } -func (m *Metrics) CreateNumberOfRecordsExportedByGRPC() prometheus.Counter { - return m.NewCounter(&numberOfWrittenRecordsTotal, "grpc") +// BufferSizeGauge provides syntactic sugar hidding prom's gauge tailored for buffer size +type BufferSizeGauge struct { + vec *prometheus.GaugeVec } -func (m *Metrics) CreateGRPCBatchSize() prometheus.Counter { - return m.NewCounter(&exportedBatchSizeTotal, "grpc") +func (g *BufferSizeGauge) WithBufferName(bufferName string) prometheus.Gauge { + return g.vec.WithLabelValues(bufferName) } -func (m *Metrics) CreateNumberOfRecordsExportedByKafka() prometheus.Counter { - return m.NewCounter(&numberOfWrittenRecordsTotal, "kafka") -} - -func (m *Metrics) CreateKafkaBatchSize() prometheus.Counter { - return m.NewCounter(&exportedBatchSizeTotal, "kafka") +func (m *Metrics) CreateBatchCounter(exporter string) prometheus.Counter { + return m.NewCounter(&exportedBatchCounterTotal, exporter) } func (m *Metrics) CreateSamplingRate() prometheus.Gauge { - return m.NewGauge(&samplingRateSeconds) -} - -func (m *Metrics) GetErrorsCounter() *ErrorCounter { - return &ErrorCounter{ - vec: m.NewCounterVec(&errorsCounter), - } + return m.NewGauge(&samplingRate) } type ErrorCounter struct { vec *prometheus.CounterVec } -func (c *ErrorCounter) WithValues(errName, exporter string) prometheus.Counter { - return c.vec.WithLabelValues(errName, exporter) +func (c *ErrorCounter) WithErrorName(component, errName string) prometheus.Counter { + return c.vec.WithLabelValues(component, errName) } diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index e924afc89..330101ca3 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -20,11 +20,11 @@ func TestMetricsCreation(t *testing.T) { metrics := NewMetrics(settings) // Test Counter creation - counter := metrics.CreateHashMapCounter() + counter := metrics.CreateBatchCounter("grpc") assert.NotNil(t, counter) // Test Gauge creation - gauge := metrics.CreateNumberOfEvictedFlows() + gauge := metrics.CreateSamplingRate() assert.NotNil(t, gauge) // Test Histogram creation diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 6c9badb08..b8645a772 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -8,9 +8,9 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/cilium/ebpf/ringbuf" - "github.com/prometheus/client_golang/prometheus" ) // TracerFake fakes the kernel-side eBPF map structures for testing @@ -36,7 +36,7 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } -func (m *TracerFake) LookupAndDeleteMap(_ prometheus.Counter) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { +func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: return r