diff --git a/README.md b/README.md index cb67bac5..85300576 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,10 @@ To combat this, a node filtering feature was implemented, which works as follows - it computes its own node name checksum, and queries the next 10 (per default) nodes in the sorted checksums list +Here's an example with 6 nodes, where each node queries the next 3 nodes: + +![node filtering drawing](./doc/node_filtering.png) + Thanks to this, every node is making queries to the same 10 nodes, unless one of those nodes disappears, in which case kubenurse will pick the next node in the sorted checksums list. This comes with several advantages: @@ -242,6 +246,14 @@ Per default, the neighbourhood filtering is set to 10 nodes, which means that on cluster with more than 10 nodes, each kubenurse will query 10 nodes, as described above. +##### Neighbourhood incoming checks metric + +It is possible to check that each node receives the proper number of +neighbourhood queries with the `kubenurse_neighbourhood_incoming_checks` +metric. If you have the neighbourhood limit set to e.g. 10, then this +metric should be equal to 10 on all nodes, with some variations during a +rollout restart. + To bypass the node filtering feature, you simply need to set the `KUBENURSE_NEIGHBOUR_LIMIT` environment variable to 0. diff --git a/doc/node_filtering.png b/doc/node_filtering.png new file mode 100644 index 00000000..5252d81f Binary files /dev/null and b/doc/node_filtering.png differ diff --git a/internal/kubenurse/handler.go b/internal/kubenurse/handler.go index c71a9b85..e9013f14 100644 --- a/internal/kubenurse/handler.go +++ b/internal/kubenurse/handler.go @@ -62,3 +62,12 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) { _ = enc.Encode(out) } } + +func (s *Server) alwaysHappyHandler() func(w http.ResponseWriter, r *http.Request) { + return func(_ http.ResponseWriter, r *http.Request) { + origin := r.Header.Get(servicecheck.NeighbourOriginHeader) + if origin != "" { + s.neighboursTTLCache.Insert(origin) + } + } +} diff --git a/internal/kubenurse/server.go b/internal/kubenurse/server.go index cc794fe8..c43e66c0 100644 --- a/internal/kubenurse/server.go +++ b/internal/kubenurse/server.go @@ -37,6 +37,10 @@ type Server struct { // Mutex to protect ready flag mu *sync.Mutex ready bool + + // Neighbourhood incoming checks + neighbouringIncomingChecks prometheus.Gauge + neighboursTTLCache TTLCache[string] } // New creates a new kubenurse server. The server can be configured with the following environment variables: @@ -94,12 +98,23 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle ready: true, } + server.neighboursTTLCache.Init(60 * time.Second) + promRegistry := prometheus.NewRegistry() promRegistry.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) + server.neighbouringIncomingChecks = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: servicecheck.MetricsNamespace, + Name: "neighbourhood_incoming_checks", + Help: "Number of unique source nodes checks in the last minute for the neighbourhood checks", + }, + ) + promRegistry.MustRegister(server.neighbouringIncomingChecks) + var histogramBuckets []float64 if bucketsString := os.Getenv("KUBENURSE_HISTOGRAM_BUCKETS"); bucketsString != "" { @@ -166,7 +181,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle // setup http routes mux.HandleFunc("/ready", server.readyHandler()) mux.HandleFunc("/alive", server.aliveHandler()) - mux.HandleFunc("/alwayshappy", func(http.ResponseWriter, *http.Request) {}) + mux.HandleFunc("/alwayshappy", server.alwaysHappyHandler()) mux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})) mux.Handle("/", http.RedirectHandler("/alive", http.StatusMovedPermanently)) @@ -180,6 +195,17 @@ func (s *Server) Run() error { errc = make(chan error, 2) // max two errors can happen ) + go func() { // update the incoming neighbouring check gauge every second + t := time.NewTicker(1 * time.Second) + defer t.Stop() + + for range t.C { + s.neighbouringIncomingChecks.Set( + float64(s.neighboursTTLCache.ActiveEntries()), + ) + } + }() + wg.Add(1) go func() { diff --git a/internal/kubenurse/ttl_cache.go b/internal/kubenurse/ttl_cache.go new file mode 100644 index 00000000..833d6321 --- /dev/null +++ b/internal/kubenurse/ttl_cache.go @@ -0,0 +1,56 @@ +package kubenurse + +import ( + "sync" + "time" +) + +// This TTLCache is a suboptimal, first-shot implementation of TTL cache, +// that is, entries expire after a certain duration. +// It should ideally be implemented with a Red-Black Binary tree to keep track +// of the entries to expire. + +type TTLCache[K comparable] struct { + m map[K]*CacheEntry[K] + TTL time.Duration + mu sync.Mutex +} + +type CacheEntry[K comparable] struct { + val K + lastInsert time.Time +} + +func (c *TTLCache[K]) Init(ttl time.Duration) { + c.m = make(map[K]*CacheEntry[K]) + c.mu = sync.Mutex{} + c.TTL = ttl +} + +func (c *TTLCache[K]) Insert(k K) { + c.mu.Lock() + defer c.mu.Unlock() + + if entry, ok := c.m[k]; ok { + entry.lastInsert = time.Now() + } else { + entry := CacheEntry[K]{val: k, lastInsert: time.Now()} + c.m[k] = &entry + } +} + +func (c *TTLCache[K]) ActiveEntries() int { + c.cleanupExpired() + return len(c.m) +} + +func (c *TTLCache[K]) cleanupExpired() { + c.mu.Lock() + defer c.mu.Unlock() + + for k, entry := range c.m { + if time.Since(entry.lastInsert) > c.TTL { + delete(c.m, k) + } + } +} diff --git a/internal/kubenurse/ttl_cache_test.go b/internal/kubenurse/ttl_cache_test.go new file mode 100644 index 00000000..f6d8832f --- /dev/null +++ b/internal/kubenurse/ttl_cache_test.go @@ -0,0 +1,27 @@ +package kubenurse + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTTLCache(t *testing.T) { + + c := TTLCache[string]{} + c.Init(10 * time.Millisecond) + + c.Insert("node-a") + time.Sleep(5 * time.Millisecond) + + require.Equal(t, 1, c.ActiveEntries(), "after 5ms and with a 10ms TTL, there should have been 1 entry in the cache") + + c.Insert("node-a") // refresh ttl + time.Sleep(5 * time.Millisecond) + require.Equal(t, 1, c.ActiveEntries(), "after 5ms and with a 10ms TTL, there should have been 1 entry in the cache") + + time.Sleep(5 * time.Millisecond) + require.Equal(t, 0, c.ActiveEntries(), "after 10ms and with a 10ms TTL, there should be no entry in the cache") + +} diff --git a/internal/servicecheck/httptrace.go b/internal/servicecheck/httptrace.go index 942aca93..baa14d23 100644 --- a/internal/servicecheck/httptrace.go +++ b/internal/servicecheck/httptrace.go @@ -27,7 +27,7 @@ func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper { httpclientReqTotal := prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: metricsNamespace, + Namespace: MetricsNamespace, Name: "httpclient_requests_total", Help: "A counter for requests from the kubenurse http client.", }, @@ -36,7 +36,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati httpclientReqDuration := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: metricsNamespace, + Namespace: MetricsNamespace, Name: "httpclient_request_duration_seconds", Help: "A latency histogram of request latencies from the kubenurse http client.", Buckets: durationHistogram, @@ -46,7 +46,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati httpclientTraceReqDuration := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: metricsNamespace, + Namespace: MetricsNamespace, Name: "httpclient_trace_request_duration_seconds", Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", Buckets: durationHistogram, diff --git a/internal/servicecheck/neighbours.go b/internal/servicecheck/neighbours.go index 0f8145ec..3cf413a6 100644 --- a/internal/servicecheck/neighbours.go +++ b/internal/servicecheck/neighbours.go @@ -20,6 +20,10 @@ var ( currentNode string ) +const ( + NeighbourOriginHeader = "KUBENURSE-NEIGHBOUR-ORIGIN" +) + // Neighbour represents a kubenurse which should be reachable type Neighbour struct { PodName string @@ -93,10 +97,10 @@ func (c *Checker) checkNeighbours(nh []*Neighbour) { for _, neighbour := range nh { check := func(ctx context.Context) (string, error) { if c.UseTLS { - return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy") + return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy", true) } - return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy") + return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy", true) } _, _ = c.measure(check, "path_"+neighbour.NodeName) diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index dfb176f9..3924db86 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -19,7 +19,7 @@ const ( okStr = "ok" errStr = "error" skippedStr = "skipped" - metricsNamespace = "kubenurse" + MetricsNamespace = "kubenurse" ) // New configures the checker with a httpClient and a cache timeout for check @@ -28,7 +28,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) { errorCounter := prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: metricsNamespace, + Namespace: MetricsNamespace, Name: "errors_total", Help: "Kubenurse error counter partitioned by error type", }, @@ -37,7 +37,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, durationHistogram := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: metricsNamespace, + Namespace: MetricsNamespace, Name: "request_duration", Help: "Kubenurse request duration partitioned by target path", Buckets: durationHistogramBuckets, @@ -163,7 +163,7 @@ func (c *Checker) APIServerDirect(ctx context.Context) (string, error) { apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort) - return c.doRequest(ctx, apiurl) + return c.doRequest(ctx, apiurl, false) } // APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL @@ -174,7 +174,7 @@ func (c *Checker) APIServerDNS(ctx context.Context) (string, error) { apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort) - return c.doRequest(ctx, apiurl) + return c.doRequest(ctx, apiurl, false) } // MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress @@ -183,7 +183,7 @@ func (c *Checker) MeIngress(ctx context.Context) (string, error) { return skippedStr, nil } - return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability + return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy", false) //nolint:goconst // readability } // MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service @@ -192,7 +192,7 @@ func (c *Checker) MeService(ctx context.Context) (string, error) { return skippedStr, nil } - return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy") + return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy", false) } // measure implements metric collections for the check diff --git a/internal/servicecheck/transport.go b/internal/servicecheck/transport.go index def8c2c9..06c445b1 100644 --- a/internal/servicecheck/transport.go +++ b/internal/servicecheck/transport.go @@ -18,7 +18,7 @@ const ( ) // doRequest does an http request only to get the http status code -func (c *Checker) doRequest(ctx context.Context, url string) (string, error) { +func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader bool) (string, error) { // Read Bearer Token file from ServiceAccount token, err := os.ReadFile(K8sTokenFile) if err != nil { @@ -32,6 +32,11 @@ func (c *Checker) doRequest(ctx context.Context, url string) (string, error) { req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) } + if addOriginHeader { + hostname, _ := os.Hostname() + req.Header.Add(NeighbourOriginHeader, hostname) + } + resp, err := c.httpClient.Do(req) if err != nil { return err.Error(), err