Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace glog in metrics & telemetry packages #6587

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/nginx-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func createManagerAndControllerCollectors(ctx context.Context, constLabels map[s
registry = prometheus.NewRegistry()
mc = collectors.NewLocalManagerMetricsCollector(constLabels)
cc = collectors.NewControllerMetricsCollector(*enableCustomResources, constLabels)
processCollector := collectors.NewNginxProcessesMetricsCollector(constLabels)
processCollector := collectors.NewNginxProcessesMetricsCollector(ctx, constLabels)
workQueueCollector := collectors.NewWorkQueueMetricsCollector(constLabels)

err = mc.Register(registry)
Expand Down Expand Up @@ -781,18 +781,18 @@ func createPlusAndLatencyCollectors(
logger := kitlog.NewLogfmtLogger(os.Stdout)
logger = level.NewFilter(logger, level.AllowError())
plusCollector = nginxCollector.NewNginxPlusCollector(plusClient, "nginx_ingress_nginxplus", variableLabelNames, constLabels, logger)
go metrics.RunPrometheusListenerForNginxPlus(*prometheusMetricsListenPort, plusCollector, registry, prometheusSecret)
go metrics.RunPrometheusListenerForNginxPlus(ctx, *prometheusMetricsListenPort, plusCollector, registry, prometheusSecret)
} else {
httpClient := getSocketClient("/var/lib/nginx/nginx-status.sock")
client := metrics.NewNginxMetricsClient(httpClient)
go metrics.RunPrometheusListenerForNginx(*prometheusMetricsListenPort, client, registry, constLabels, prometheusSecret)
go metrics.RunPrometheusListenerForNginx(ctx, *prometheusMetricsListenPort, client, registry, constLabels, prometheusSecret)
}
if *enableLatencyMetrics {
lc = collectors.NewLatencyMetricsCollector(constLabels, upstreamServerVariableLabels, upstreamServerPeerVariableLabelNames)
lc = collectors.NewLatencyMetricsCollector(ctx, constLabels, upstreamServerVariableLabels, upstreamServerPeerVariableLabelNames)
if err := lc.Register(registry); err != nil {
nl.Errorf(l, "Error registering Latency Prometheus metrics: %v", err)
}
syslogListener = metrics.NewLatencyMetricsListener("/var/lib/nginx/nginx-syslog.sock", lc)
syslogListener = metrics.NewLatencyMetricsListener(ctx, "/var/lib/nginx/nginx-syslog.sock", lc)
go syslogListener.Run()
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc
nl.Fatalf(lbc.Logger, "failed to initialize telemetry exporter: %v", err)
}
collectorConfig := telemetry.CollectorConfig{
Context: input.LoggerContext,
Period: 24 * time.Hour,
K8sClientReader: input.KubeClient,
Version: input.NICVersion,
Expand Down
15 changes: 10 additions & 5 deletions internal/metrics/collectors/latency.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package collectors

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"

"github.com/golang/glog"
nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -73,10 +75,12 @@ type LatencyMetricsCollector struct {
metricsPublishedMap metricsPublishedMap
metricsPublishedMutex sync.Mutex
variableLabelsMutex sync.RWMutex
logger *slog.Logger
}

// NewLatencyMetricsCollector creates a new LatencyMetricsCollector
func NewLatencyMetricsCollector(
ctx context.Context,
constLabels map[string]string,
upstreamServerLabelNames []string,
upstreamServerPeerLabelNames []string,
Expand All @@ -96,6 +100,7 @@ func NewLatencyMetricsCollector(
metricsPublishedMap: make(metricsPublishedMap),
upstreamServerLabelNames: upstreamServerLabelNames,
upstreamServerPeerLabelNames: upstreamServerPeerLabelNames,
logger: nl.LoggerFromContext(ctx),
}
}

Expand Down Expand Up @@ -141,7 +146,7 @@ func (l *LatencyMetricsCollector) DeleteMetrics(upstreamServerPeerNames []string
for _, labelValues := range l.listAndDeleteMetricsPublished(name) {
success := l.httpLatency.DeleteLabelValues(labelValues...)
if !success {
glog.Warningf("could not delete metric for upstream server peer: %s with values: %v", name, labelValues)
nl.Warnf(l.logger, "could not delete metric for upstream server peer: %s with values: %v", name, labelValues)
}
}
}
Expand Down Expand Up @@ -178,7 +183,7 @@ func (l *LatencyMetricsCollector) Collect(ch chan<- prometheus.Metric) {
func (l *LatencyMetricsCollector) RecordLatency(syslogMsg string) {
lm, err := parseMessage(syslogMsg)
if err != nil {
glog.V(3).Infof("could not parse syslog message: %v", err)
nl.Debugf(l.logger, "could not parse syslog message: %v", err)
return
}

Expand All @@ -188,13 +193,13 @@ func (l *LatencyMetricsCollector) RecordLatency(syslogMsg string) {
// https://github.com/nginxinc/kubernetes-ingress/issues/5010
// https://github.com/nginxinc/kubernetes-ingress/issues/6124
if lm.Upstream == "-" {
glog.V(3).Infof("latency metrics for gRPC upstreams: %v", lm)
nl.Debugf(l.logger, "latency metrics for gRPC upstreams: %v", lm)
return
}

labelValues, err := l.createLatencyLabelValues(lm)
if err != nil {
glog.Errorf("cannot record latency for upstream %s and server %s: %v", lm.Upstream, lm.Server, err)
nl.Errorf(l.logger, "cannot record latency for upstream %s and server %s: %v", lm.Upstream, lm.Server, err)
return
}
l.httpLatency.WithLabelValues(labelValues...).Observe(lm.Latency * 1000)
Expand Down
10 changes: 7 additions & 3 deletions internal/metrics/collectors/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package collectors

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/golang/glog"
nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)

// NginxProcessesMetricsCollector implements prometheus.Collector interface
type NginxProcessesMetricsCollector struct {
workerProcessTotal *prometheus.GaugeVec
logger *slog.Logger
}

// NewNginxProcessesMetricsCollector creates a new NginxProcessMetricsCollector
func NewNginxProcessesMetricsCollector(constLabels map[string]string) *NginxProcessesMetricsCollector {
func NewNginxProcessesMetricsCollector(ctx context.Context, constLabels map[string]string) *NginxProcessesMetricsCollector {
return &NginxProcessesMetricsCollector{
workerProcessTotal: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand All @@ -29,14 +32,15 @@ func NewNginxProcessesMetricsCollector(constLabels map[string]string) *NginxProc
},
[]string{"generation"},
),
logger: nl.LoggerFromContext(ctx),
}
}

// updateWorkerProcessCount sets the number of NGINX worker processes
func (pc *NginxProcessesMetricsCollector) updateWorkerProcessCount() {
currWorkerProcesses, prevWorkerProcesses, err := getWorkerProcesses()
if err != nil {
glog.Errorf("unable to collect process metrics : %v", err)
nl.Errorf(pc.logger, "unable to collect process metrics : %v", err)
return
}
pc.workerProcessTotal.WithLabelValues("current").Set(float64(currWorkerProcesses))
Expand Down
29 changes: 17 additions & 12 deletions internal/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"strconv"
Expand All @@ -13,12 +14,13 @@ import (
"github.com/go-chi/chi/v5"
kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/glog"
prometheusClient "github.com/nginxinc/nginx-prometheus-exporter/client"
nginxCollector "github.com/nginxinc/nginx-prometheus-exporter/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"

nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
)

// NewNginxMetricsClient creates an NginxClient to fetch stats from NGINX over an unix socket
Expand All @@ -27,41 +29,43 @@ func NewNginxMetricsClient(httpClient *http.Client) *prometheusClient.NginxClien
}

// RunPrometheusListenerForNginx runs an http server to expose Prometheus metrics for NGINX
func RunPrometheusListenerForNginx(port int, client *prometheusClient.NginxClient, registry *prometheus.Registry, constLabels map[string]string, prometheusSecret *v1.Secret) {
func RunPrometheusListenerForNginx(ctx context.Context, port int, client *prometheusClient.NginxClient, registry *prometheus.Registry, constLabels map[string]string, prometheusSecret *v1.Secret) {
logger := kitlog.NewLogfmtLogger(os.Stdout)
logger = level.NewFilter(logger, level.AllowError())
registry.MustRegister(nginxCollector.NewNginxCollector(client, "nginx_ingress_nginx", constLabels, logger))
runServer(strconv.Itoa(port), registry, prometheusSecret)
runServer(ctx, strconv.Itoa(port), registry, prometheusSecret)
}

// RunPrometheusListenerForNginxPlus runs an http server to expose Prometheus metrics for NGINX Plus
func RunPrometheusListenerForNginxPlus(port int, nginxPlusCollector prometheus.Collector, registry *prometheus.Registry, prometheusSecret *v1.Secret) {
func RunPrometheusListenerForNginxPlus(ctx context.Context, port int, nginxPlusCollector prometheus.Collector, registry *prometheus.Registry, prometheusSecret *v1.Secret) {
registry.MustRegister(nginxPlusCollector)
runServer(strconv.Itoa(port), registry, prometheusSecret)
runServer(ctx, strconv.Itoa(port), registry, prometheusSecret)
}

// runServer starts the metrics server.
func runServer(port string, registry prometheus.Gatherer, prometheusSecret *v1.Secret) {
func runServer(ctx context.Context, port string, registry prometheus.Gatherer, prometheusSecret *v1.Secret) {
addr := fmt.Sprintf(":%s", port)
s, err := NewServer(addr, registry, prometheusSecret)
l := nl.LoggerFromContext(ctx)
s, err := newServer(ctx, addr, registry, prometheusSecret)
if err != nil {
glog.Fatal(err)
nl.Fatal(l, err)
}
glog.Infof("Starting prometheus listener on: %s/metrics", addr)
glog.Fatal(s.ListenAndServe())
nl.Infof(l, "Starting prometheus listener on: %s/metrics", addr)
nl.Fatal(l, s.ListenAndServe())
}

// Server holds information about NIC metrics server.
type Server struct {
Server *http.Server
URL string
Registry prometheus.Gatherer
logger *slog.Logger
}

// NewServer creates HTTP server for serving NIC metrics for Prometheus.
//
// Metrics are exposed on the `/metrics` endpoint.
func NewServer(addr string, registry prometheus.Gatherer, secret *v1.Secret) (*Server, error) {
func newServer(ctx context.Context, addr string, registry prometheus.Gatherer, secret *v1.Secret) (*Server, error) {
s := Server{
Server: &http.Server{
Addr: addr,
Expand All @@ -70,6 +74,7 @@ func NewServer(addr string, registry prometheus.Gatherer, secret *v1.Secret) (*S
},
URL: fmt.Sprintf("http://%s/", addr),
Registry: registry,
logger: nl.LoggerFromContext(ctx),
}
// Secrets are read from K8s API. If the secret for Prometheus is present
// we configure Metrics Server with the key and cert.
Expand Down Expand Up @@ -99,7 +104,7 @@ func (s *Server) Home(w http.ResponseWriter, r *http.Request) { //nolint:revive
</body>
</html>`))
if err != nil {
glog.Errorf("error while sending a response for the '/' path: %v", err)
nl.Errorf(s.logger, "error while sending a response for the '/' path: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
Expand Down
19 changes: 11 additions & 8 deletions internal/metrics/syslog_listener.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package metrics

import (
"context"
"errors"
"log/slog"
"net"

"github.com/golang/glog"

nl "github.com/nginxinc/kubernetes-ingress/internal/logger"
"github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors"
)

Expand All @@ -21,21 +22,23 @@ type LatencyMetricsListener struct {
conn *net.UnixConn
addr string
collector collectors.LatencyCollector
logger *slog.Logger
}

// NewLatencyMetricsListener returns a LatencyMetricsListener that listens over a unix socket
// for syslog messages from nginx.
func NewLatencyMetricsListener(sockPath string, c collectors.LatencyCollector) SyslogListener {
glog.Infof("Starting latency metrics server listening on: %s", sockPath)
func NewLatencyMetricsListener(ctx context.Context, sockPath string, c collectors.LatencyCollector) SyslogListener {
l := nl.LoggerFromContext(ctx)
nl.Infof(l, "Starting latency metrics server listening on: %s", sockPath)
conn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Name: sockPath,
Net: "unixgram",
})
if err != nil {
glog.Errorf("Failed to create latency metrics listener: %v. Latency metrics will not be collected.", err)
nl.Errorf(l, "Failed to create latency metrics listener: %v. Latency metrics will not be collected.", err)
return NewSyslogFakeServer()
}
return &LatencyMetricsListener{conn: conn, addr: sockPath, collector: c}
return &LatencyMetricsListener{conn: conn, addr: sockPath, collector: c, logger: l}
}

// Run reads from the unix connection until an unrecoverable error occurs or the connection is closed.
Expand All @@ -45,7 +48,7 @@ func (l LatencyMetricsListener) Run() {
n, err := l.conn.Read(buffer)
if err != nil {
if !isErrorRecoverable(err) {
glog.Info("Stopping latency metrics listener")
nl.Info(l.logger, "Stopping latency metrics listener")
return
}
}
Expand All @@ -57,7 +60,7 @@ func (l LatencyMetricsListener) Run() {
func (l LatencyMetricsListener) Stop() {
err := l.conn.Close()
if err != nil {
glog.Errorf("error closing latency metrics unix connection: %v", err)
nl.Errorf(l.logger, "error closing latency metrics unix connection: %v", err)
}
}

Expand Down
Loading
Loading