From ff0e1b06721328c47d1dc7edbcc3ac022c17386b Mon Sep 17 00:00:00 2001 From: Boris Djurdjevic Date: Tue, 26 Jul 2022 17:48:37 +0200 Subject: [PATCH] WIP: feat: Replacing promhttp with own httptrace and logging --- internal/servicecheck/httptrace.go | 140 ++++++++++++++------------ internal/servicecheck/servicecheck.go | 44 +++++--- internal/servicecheck/transport.go | 5 +- internal/servicecheck/types.go | 5 +- 4 files changed, 109 insertions(+), 85 deletions(-) diff --git a/internal/servicecheck/httptrace.go b/internal/servicecheck/httptrace.go index 388a9cdd..4991d0cd 100644 --- a/internal/servicecheck/httptrace.go +++ b/internal/servicecheck/httptrace.go @@ -1,83 +1,89 @@ package servicecheck import ( + "crypto/tls" + "log" "net/http" + "net/http/httptrace" + "time" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) -func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper { - counter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "httpclient_requests_total", - Help: "A counter for requests from the kubenurse http client.", - }, - []string{"code", "method"}, - ) +// TODO: +// - RoundTripperCounter and RoundTripper duration useful? Was never officially documented and I don't see anything usable with it - latencyVec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Name: "httpclient_trace_request_duration_seconds", - Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", - Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1}, - }, - []string{"event"}, - ) +// unique type for context.Context to avoid collisions. +type kubenurseContextKey struct{} - // histVec has no labels, making it a zero-dimensional ObserverVec. - histVec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Name: "httpclient_request_duration_seconds", - Help: "A latency histogram of request latencies from the kubenurse http client.", - Buckets: prometheus.DefBuckets, - }, - []string{}, - ) +//http.RoundTripper +// TODO: Easier method to get a round tripper? +type RoundTripperFunc func(req *http.Request) (*http.Response, error) - // Register all of the metrics in the standard registry. - registry.MustRegister(counter, latencyVec, histVec) +// +func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +// Ensure RoundTripperFunc is a http.RoundTripper +var _ http.RoundTripper = (*RoundTripperFunc)(nil) + +// TODO: Description +// This collects traces and logs errors. As promhttp.InstrumentRoundTripperTrace doesn't process +// errors, this is custom made and inspired by prometheus/client_golang's promhttp +func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, latencyVec *prometheus.HistogramVec) http.RoundTripper { + collectMetric := func(traceType string, start time.Time, r *http.Request, err error) { + td := time.Since(start).Seconds() + kubenurseCheckLabel := r.Context().Value(kubenurseContextKey{}).(string) - // Define functions for the available httptrace.ClientTrace hook - // functions that we want to instrument. - trace := &promhttp.InstrumentTrace{ - DNSStart: func(t float64) { - latencyVec.WithLabelValues("dns_start").Observe(t) - }, - DNSDone: func(t float64) { - latencyVec.WithLabelValues("dns_done").Observe(t) - }, - ConnectStart: func(t float64) { - latencyVec.WithLabelValues("connect_start").Observe(t) - }, - ConnectDone: func(t float64) { - latencyVec.WithLabelValues("connect_done").Observe(t) - }, - TLSHandshakeStart: func(t float64) { - latencyVec.WithLabelValues("tls_handshake_start").Observe(t) - }, - TLSHandshakeDone: func(t float64) { - latencyVec.WithLabelValues("tls_handshake_done").Observe(t) - }, - WroteRequest: func(t float64) { - latencyVec.WithLabelValues("wrote_request").Observe(t) - }, - GotFirstResponseByte: func(t float64) { - latencyVec.WithLabelValues("got_first_resp_byte").Observe(t) - }, + // If we got an error inside a trace, log it and do not collect metrics + if err != nil { + log.Printf("httptrace: failed %s for %s with %v", traceType, kubenurseCheckLabel, err) + return + } + + latencyVec.WithLabelValues(traceType, kubenurseCheckLabel).Observe(td) } - // Wrap the default RoundTripper with middleware. - roundTripper := promhttp.InstrumentRoundTripperCounter(counter, - promhttp.InstrumentRoundTripperTrace(trace, - promhttp.InstrumentRoundTripperDuration(histVec, - transport, - ), - ), - ) + // Return a http.RoundTripper for tracing requests + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + // Capture request time + start := time.Now() + + // Add tracing hooks + trace := &httptrace.ClientTrace{ + GotConn: func(info httptrace.GotConnInfo) { + collectMetric("got_conn", start, r, nil) + }, + DNSStart: func(info httptrace.DNSStartInfo) { + collectMetric("dns_start", start, r, nil) + }, + DNSDone: func(info httptrace.DNSDoneInfo) { + collectMetric("dns_done", start, r, info.Err) + }, + ConnectStart: func(_, _ string) { + collectMetric("connect_start", start, r, nil) + }, + ConnectDone: func(_, _ string, err error) { + collectMetric("connect_done", start, r, err) + }, + TLSHandshakeStart: func() { + collectMetric("tls_handshake_start", start, r, nil) + }, + TLSHandshakeDone: func(_ tls.ConnectionState, err error) { + collectMetric("tls_handshake_done", start, r, nil) + }, + WroteRequest: func(info httptrace.WroteRequestInfo) { + collectMetric("wrote_request", start, r, info.Err) + }, + GotFirstResponseByte: func() { + collectMetric("got_first_resp_byte", start, r, nil) + }, + } + + // Do request with tracing enabled + r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace)) - return roundTripper + return next.RoundTrip(r) + }) } diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index 8517fad4..6d9d7185 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -46,7 +46,19 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome []string{"type"}, ) - promRegistry.MustRegister(errorCounter, durationHistogram) + // TODO: Add label for which request it was as this is not helpful in this current state + // TODO: Do we want to have it also as summary? + latencyVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Name: "httpclient_trace_request_duration_seconds", + Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", + Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1}, // TODO: Which buckets are really needed? + }, + []string{"event", "type"}, + ) + + promRegistry.MustRegister(errorCounter, durationHistogram, latencyVec) // setup http transport tlsConfig, err := generateTLSConfig(os.Getenv("KUBENURSE_EXTRA_CA")) @@ -75,7 +87,7 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome httpClient := &http.Client{ Timeout: 5 * time.Second, - Transport: withRequestTracing(promRegistry, transport), + Transport: withHttptrace(promRegistry, transport, latencyVec), } return &Checker{ @@ -163,43 +175,43 @@ func (c *Checker) StopScheduled() { } // APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link -func (c *Checker) APIServerDirect() (string, error) { +func (c *Checker) APIServerDirect(ctx context.Context) (string, error) { if c.SkipCheckAPIServerDirect { return skippedStr, nil } apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort) - return c.doRequest(apiurl) + return c.doRequest(ctx, apiurl) } // APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL -func (c *Checker) APIServerDNS() (string, error) { +func (c *Checker) APIServerDNS(ctx context.Context) (string, error) { if c.SkipCheckAPIServerDNS { return skippedStr, nil } apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort) - return c.doRequest(apiurl) + return c.doRequest(ctx, apiurl) } // MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress -func (c *Checker) MeIngress() (string, error) { +func (c *Checker) MeIngress(ctx context.Context) (string, error) { if c.SkipCheckMeIngress { return skippedStr, nil } - return c.doRequest(c.KubenurseIngressURL + "/alwayshappy") //nolint:goconst // readability + return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability } // MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service -func (c *Checker) MeService() (string, error) { +func (c *Checker) MeService(ctx context.Context) (string, error) { if c.SkipCheckMeService { return skippedStr, nil } - return c.doRequest(c.KubenurseServiceURL + "/alwayshappy") + return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy") } // checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes @@ -210,12 +222,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) { if neighbour.Phase == v1.PodRunning && // only query running pods (excludes pending ones) !neighbour.Terminating && // exclude terminating pods (c.allowUnschedulable || neighbour.NodeSchedulable == kubediscovery.NodeSchedulable) { - check := func() (string, error) { + check := func(ctx context.Context) (string, error) { if c.UseTLS { - return c.doRequest("https://" + neighbour.PodIP + ":8443/alwayshappy") + return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy") } - return c.doRequest("http://" + neighbour.PodIP + ":8080/alwayshappy") + return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy") } _, _ = c.measure(check, "path_"+neighbour.NodeName) @@ -227,8 +239,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) { func (c *Checker) measure(check Check, label string) (string, error) { start := time.Now() + // Add our label (check type) to the context so our http tracer can annotate + // metrics and errors based with the label + ctx := context.WithValue(context.Background(), kubenurseContextKey{}, label) + // Execute check - res, err := check() + res, err := check(ctx) // Process metrics c.durationHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds()) diff --git a/internal/servicecheck/transport.go b/internal/servicecheck/transport.go index 1e176881..def8c2c9 100644 --- a/internal/servicecheck/transport.go +++ b/internal/servicecheck/transport.go @@ -1,6 +1,7 @@ package servicecheck import ( + "context" "crypto/tls" "crypto/x509" "errors" @@ -17,14 +18,14 @@ const ( ) // doRequest does an http request only to get the http status code -func (c *Checker) doRequest(url string) (string, error) { +func (c *Checker) doRequest(ctx context.Context, url string) (string, error) { // Read Bearer Token file from ServiceAccount token, err := os.ReadFile(K8sTokenFile) if err != nil { return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err) } - req, _ := http.NewRequest("GET", url, http.NoBody) + req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) // Only add the Bearer for API Server Requests if strings.HasSuffix(url, "/version") { diff --git a/internal/servicecheck/types.go b/internal/servicecheck/types.go index d08b1e14..47c9e758 100644 --- a/internal/servicecheck/types.go +++ b/internal/servicecheck/types.go @@ -1,6 +1,7 @@ package servicecheck import ( + "context" "net/http" "time" @@ -63,8 +64,8 @@ type Result struct { Neighbourhood []kubediscovery.Neighbour `json:"neighbourhood"` } -// Check is the signature used by all checks that the checker can execute -type Check func() (string, error) +// Check is the signature used by all checks that the checker can execute. +type Check func(ctx context.Context) (string, error) // CachedResult represents a cached check result that is valid until the expiration. type CachedResult struct {