Skip to content

Commit

Permalink
WIP: feat: Replacing promhttp with own httptrace and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
djboris9 authored and clementnuss committed Jan 22, 2024
1 parent 1d45495 commit ff0e1b0
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 85 deletions.
140 changes: 73 additions & 67 deletions internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,89 @@
package servicecheck

import (
"crypto/tls"
"log"
"net/http"
"net/http/httptrace"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper {
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
[]string{"code", "method"},
)
// TODO:
// - RoundTripperCounter and RoundTripper duration useful? Was never officially documented and I don't see anything usable with it

latencyVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{"event"},
)
// unique type for context.Context to avoid collisions.
type kubenurseContextKey struct{}

// histVec has no labels, making it a zero-dimensional ObserverVec.
histVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: prometheus.DefBuckets,
},
[]string{},
)
//http.RoundTripper
// TODO: Easier method to get a round tripper?
type RoundTripperFunc func(req *http.Request) (*http.Response, error)

// Register all of the metrics in the standard registry.
registry.MustRegister(counter, latencyVec, histVec)
//
func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}

// Ensure RoundTripperFunc is a http.RoundTripper
var _ http.RoundTripper = (*RoundTripperFunc)(nil)

// TODO: Description
// This collects traces and logs errors. As promhttp.InstrumentRoundTripperTrace doesn't process
// errors, this is custom made and inspired by prometheus/client_golang's promhttp
func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, latencyVec *prometheus.HistogramVec) http.RoundTripper {
collectMetric := func(traceType string, start time.Time, r *http.Request, err error) {
td := time.Since(start).Seconds()
kubenurseCheckLabel := r.Context().Value(kubenurseContextKey{}).(string)

// Define functions for the available httptrace.ClientTrace hook
// functions that we want to instrument.
trace := &promhttp.InstrumentTrace{
DNSStart: func(t float64) {
latencyVec.WithLabelValues("dns_start").Observe(t)
},
DNSDone: func(t float64) {
latencyVec.WithLabelValues("dns_done").Observe(t)
},
ConnectStart: func(t float64) {
latencyVec.WithLabelValues("connect_start").Observe(t)
},
ConnectDone: func(t float64) {
latencyVec.WithLabelValues("connect_done").Observe(t)
},
TLSHandshakeStart: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_start").Observe(t)
},
TLSHandshakeDone: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_done").Observe(t)
},
WroteRequest: func(t float64) {
latencyVec.WithLabelValues("wrote_request").Observe(t)
},
GotFirstResponseByte: func(t float64) {
latencyVec.WithLabelValues("got_first_resp_byte").Observe(t)
},
// If we got an error inside a trace, log it and do not collect metrics
if err != nil {
log.Printf("httptrace: failed %s for %s with %v", traceType, kubenurseCheckLabel, err)
return
}

latencyVec.WithLabelValues(traceType, kubenurseCheckLabel).Observe(td)
}

// Wrap the default RoundTripper with middleware.
roundTripper := promhttp.InstrumentRoundTripperCounter(counter,
promhttp.InstrumentRoundTripperTrace(trace,
promhttp.InstrumentRoundTripperDuration(histVec,
transport,
),
),
)
// Return a http.RoundTripper for tracing requests
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// Capture request time
start := time.Now()

// Add tracing hooks
trace := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
collectMetric("got_conn", start, r, nil)
},
DNSStart: func(info httptrace.DNSStartInfo) {
collectMetric("dns_start", start, r, nil)
},
DNSDone: func(info httptrace.DNSDoneInfo) {
collectMetric("dns_done", start, r, info.Err)
},
ConnectStart: func(_, _ string) {
collectMetric("connect_start", start, r, nil)
},
ConnectDone: func(_, _ string, err error) {
collectMetric("connect_done", start, r, err)
},
TLSHandshakeStart: func() {
collectMetric("tls_handshake_start", start, r, nil)
},
TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
collectMetric("tls_handshake_done", start, r, nil)
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
collectMetric("wrote_request", start, r, info.Err)
},
GotFirstResponseByte: func() {
collectMetric("got_first_resp_byte", start, r, nil)
},
}

// Do request with tracing enabled
r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace))

return roundTripper
return next.RoundTrip(r)
})
}
44 changes: 30 additions & 14 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,19 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome
[]string{"type"},
)

promRegistry.MustRegister(errorCounter, durationHistogram)
// TODO: Add label for which request it was as this is not helpful in this current state
// TODO: Do we want to have it also as summary?
latencyVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1}, // TODO: Which buckets are really needed?
},
[]string{"event", "type"},
)

promRegistry.MustRegister(errorCounter, durationHistogram, latencyVec)

// setup http transport
tlsConfig, err := generateTLSConfig(os.Getenv("KUBENURSE_EXTRA_CA"))
Expand Down Expand Up @@ -75,7 +87,7 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome

httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: withRequestTracing(promRegistry, transport),
Transport: withHttptrace(promRegistry, transport, latencyVec),
}

return &Checker{
Expand Down Expand Up @@ -163,43 +175,43 @@ func (c *Checker) StopScheduled() {
}

// APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link
func (c *Checker) APIServerDirect() (string, error) {
func (c *Checker) APIServerDirect(ctx context.Context) (string, error) {
if c.SkipCheckAPIServerDirect {
return skippedStr, nil
}

apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort)

return c.doRequest(apiurl)
return c.doRequest(ctx, apiurl)
}

// APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL
func (c *Checker) APIServerDNS() (string, error) {
func (c *Checker) APIServerDNS(ctx context.Context) (string, error) {
if c.SkipCheckAPIServerDNS {
return skippedStr, nil
}

apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort)

return c.doRequest(apiurl)
return c.doRequest(ctx, apiurl)
}

// MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress
func (c *Checker) MeIngress() (string, error) {
func (c *Checker) MeIngress(ctx context.Context) (string, error) {
if c.SkipCheckMeIngress {
return skippedStr, nil
}

return c.doRequest(c.KubenurseIngressURL + "/alwayshappy") //nolint:goconst // readability
return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability
}

// MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service
func (c *Checker) MeService() (string, error) {
func (c *Checker) MeService(ctx context.Context) (string, error) {
if c.SkipCheckMeService {
return skippedStr, nil
}

return c.doRequest(c.KubenurseServiceURL + "/alwayshappy")
return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy")
}

// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes
Expand All @@ -210,12 +222,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
if neighbour.Phase == v1.PodRunning && // only query running pods (excludes pending ones)
!neighbour.Terminating && // exclude terminating pods
(c.allowUnschedulable || neighbour.NodeSchedulable == kubediscovery.NodeSchedulable) {
check := func() (string, error) {
check := func(ctx context.Context) (string, error) {
if c.UseTLS {
return c.doRequest("https://" + neighbour.PodIP + ":8443/alwayshappy")
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy")
}

return c.doRequest("http://" + neighbour.PodIP + ":8080/alwayshappy")
return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy")
}

_, _ = c.measure(check, "path_"+neighbour.NodeName)
Expand All @@ -227,8 +239,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
func (c *Checker) measure(check Check, label string) (string, error) {
start := time.Now()

// Add our label (check type) to the context so our http tracer can annotate
// metrics and errors based with the label
ctx := context.WithValue(context.Background(), kubenurseContextKey{}, label)

// Execute check
res, err := check()
res, err := check(ctx)

// Process metrics
c.durationHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
Expand Down
5 changes: 3 additions & 2 deletions internal/servicecheck/transport.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicecheck

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -17,14 +18,14 @@ const (
)

// doRequest does an http request only to get the http status code
func (c *Checker) doRequest(url string) (string, error) {
func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
// Read Bearer Token file from ServiceAccount
token, err := os.ReadFile(K8sTokenFile)
if err != nil {
return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err)
}

req, _ := http.NewRequest("GET", url, http.NoBody)
req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody)

// Only add the Bearer for API Server Requests
if strings.HasSuffix(url, "/version") {
Expand Down
5 changes: 3 additions & 2 deletions internal/servicecheck/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicecheck

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -63,8 +64,8 @@ type Result struct {
Neighbourhood []kubediscovery.Neighbour `json:"neighbourhood"`
}

// Check is the signature used by all checks that the checker can execute
type Check func() (string, error)
// Check is the signature used by all checks that the checker can execute.
type Check func(ctx context.Context) (string, error)

// CachedResult represents a cached check result that is valid until the expiration.
type CachedResult struct {
Expand Down

0 comments on commit ff0e1b0

Please sign in to comment.