Skip to content

Commit

Permalink
Merge pull request #124 from postfinance/feat/recent-neighbours-checks
Browse files Browse the repository at this point in the history
feat/recent neighbours checks
  • Loading branch information
clementnuss authored Apr 5, 2024
2 parents 6c48a5a + f8b17eb commit 52767fb
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 14 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ To combat this, a node filtering feature was implemented, which works as follows
- it computes its own node name checksum, and queries the next 10 (per default)
nodes in the sorted checksums list

Here's an example with 6 nodes, where each node queries the next 3 nodes:

![node filtering drawing](./doc/node_filtering.png)

Thanks to this, every node is making queries to the same 10 nodes, unless one
of those nodes disappears, in which case kubenurse will pick the next node in
the sorted checksums list. This comes with several advantages:
Expand All @@ -242,6 +246,14 @@ Per default, the neighbourhood filtering is set to 10 nodes, which means that
on cluster with more than 10 nodes, each kubenurse will query 10 nodes, as
described above.

##### Neighbourhood incoming checks metric

It is possible to check that each node receives the proper number of
neighbourhood queries with the `kubenurse_neighbourhood_incoming_checks`
metric. If you have the neighbourhood limit set to e.g. 10, then this
metric should be equal to 10 on all nodes, with some variations during a
rollout restart.

To bypass the node filtering feature, you simply need to set the
`KUBENURSE_NEIGHBOUR_LIMIT` environment variable to 0.

Expand Down
Binary file added doc/node_filtering.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions internal/kubenurse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) {
_ = enc.Encode(out)
}
}

func (s *Server) alwaysHappyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(_ http.ResponseWriter, r *http.Request) {
origin := r.Header.Get(servicecheck.NeighbourOriginHeader)
if origin != "" {
s.neighboursTTLCache.Insert(origin)
}
}
}
28 changes: 27 additions & 1 deletion internal/kubenurse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Server struct {
// Mutex to protect ready flag
mu *sync.Mutex
ready bool

// Neighbourhood incoming checks
neighbouringIncomingChecks prometheus.Gauge
neighboursTTLCache TTLCache[string]
}

// New creates a new kubenurse server. The server can be configured with the following environment variables:
Expand Down Expand Up @@ -94,12 +98,23 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
ready: true,
}

server.neighboursTTLCache.Init(60 * time.Second)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

server.neighbouringIncomingChecks = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: servicecheck.MetricsNamespace,
Name: "neighbourhood_incoming_checks",
Help: "Number of unique source nodes checks in the last minute for the neighbourhood checks",
},
)
promRegistry.MustRegister(server.neighbouringIncomingChecks)

var histogramBuckets []float64

if bucketsString := os.Getenv("KUBENURSE_HISTOGRAM_BUCKETS"); bucketsString != "" {
Expand Down Expand Up @@ -166,7 +181,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
// setup http routes
mux.HandleFunc("/ready", server.readyHandler())
mux.HandleFunc("/alive", server.aliveHandler())
mux.HandleFunc("/alwayshappy", func(http.ResponseWriter, *http.Request) {})
mux.HandleFunc("/alwayshappy", server.alwaysHappyHandler())
mux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}))
mux.Handle("/", http.RedirectHandler("/alive", http.StatusMovedPermanently))

Expand All @@ -180,6 +195,17 @@ func (s *Server) Run() error {
errc = make(chan error, 2) // max two errors can happen
)

go func() { // update the incoming neighbouring check gauge every second
t := time.NewTicker(1 * time.Second)
defer t.Stop()

for range t.C {
s.neighbouringIncomingChecks.Set(
float64(s.neighboursTTLCache.ActiveEntries()),
)
}
}()

wg.Add(1)

go func() {
Expand Down
56 changes: 56 additions & 0 deletions internal/kubenurse/ttl_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kubenurse

import (
"sync"
"time"
)

// This TTLCache is a suboptimal, first-shot implementation of TTL cache,
// that is, entries expire after a certain duration.
// It should ideally be implemented with a Red-Black Binary tree to keep track
// of the entries to expire.

type TTLCache[K comparable] struct {
m map[K]*CacheEntry[K]
TTL time.Duration
mu sync.Mutex
}

type CacheEntry[K comparable] struct {
val K
lastInsert time.Time
}

func (c *TTLCache[K]) Init(ttl time.Duration) {
c.m = make(map[K]*CacheEntry[K])
c.mu = sync.Mutex{}
c.TTL = ttl
}

func (c *TTLCache[K]) Insert(k K) {
c.mu.Lock()
defer c.mu.Unlock()

if entry, ok := c.m[k]; ok {
entry.lastInsert = time.Now()
} else {
entry := CacheEntry[K]{val: k, lastInsert: time.Now()}
c.m[k] = &entry
}
}

func (c *TTLCache[K]) ActiveEntries() int {
c.cleanupExpired()
return len(c.m)
}

func (c *TTLCache[K]) cleanupExpired() {
c.mu.Lock()
defer c.mu.Unlock()

for k, entry := range c.m {
if time.Since(entry.lastInsert) > c.TTL {
delete(c.m, k)
}
}
}
27 changes: 27 additions & 0 deletions internal/kubenurse/ttl_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kubenurse

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTTLCache(t *testing.T) {

c := TTLCache[string]{}
c.Init(10 * time.Millisecond)

c.Insert("node-a")
time.Sleep(5 * time.Millisecond)

require.Equal(t, 1, c.ActiveEntries(), "after 5ms and with a 10ms TTL, there should have been 1 entry in the cache")

c.Insert("node-a") // refresh ttl
time.Sleep(5 * time.Millisecond)
require.Equal(t, 1, c.ActiveEntries(), "after 5ms and with a 10ms TTL, there should have been 1 entry in the cache")

time.Sleep(5 * time.Millisecond)
require.Equal(t, 0, c.ActiveEntries(), "after 10ms and with a 10ms TTL, there should be no entry in the cache")

}
6 changes: 3 additions & 3 deletions internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper {
httpclientReqTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
Expand All @@ -36,7 +36,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati

httpclientReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: durationHistogram,
Expand All @@ -46,7 +46,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati

httpclientTraceReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: durationHistogram,
Expand Down
8 changes: 6 additions & 2 deletions internal/servicecheck/neighbours.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var (
currentNode string
)

const (
NeighbourOriginHeader = "KUBENURSE-NEIGHBOUR-ORIGIN"
)

// Neighbour represents a kubenurse which should be reachable
type Neighbour struct {
PodName string
Expand Down Expand Up @@ -93,10 +97,10 @@ func (c *Checker) checkNeighbours(nh []*Neighbour) {
for _, neighbour := range nh {
check := func(ctx context.Context) (string, error) {
if c.UseTLS {
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy")
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy", true)
}

return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy")
return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy", true)
}

_, _ = c.measure(check, "path_"+neighbour.NodeName)
Expand Down
14 changes: 7 additions & 7 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
okStr = "ok"
errStr = "error"
skippedStr = "skipped"
metricsNamespace = "kubenurse"
MetricsNamespace = "kubenurse"
)

// New configures the checker with a httpClient and a cache timeout for check
Expand All @@ -28,7 +28,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,
allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) {
errorCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "Kubenurse error counter partitioned by error type",
},
Expand All @@ -37,7 +37,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,

durationHistogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "request_duration",
Help: "Kubenurse request duration partitioned by target path",
Buckets: durationHistogramBuckets,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Checker) APIServerDirect(ctx context.Context) (string, error) {

apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort)

return c.doRequest(ctx, apiurl)
return c.doRequest(ctx, apiurl, false)
}

// APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL
Expand All @@ -174,7 +174,7 @@ func (c *Checker) APIServerDNS(ctx context.Context) (string, error) {

apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort)

return c.doRequest(ctx, apiurl)
return c.doRequest(ctx, apiurl, false)
}

// MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress
Expand All @@ -183,7 +183,7 @@ func (c *Checker) MeIngress(ctx context.Context) (string, error) {
return skippedStr, nil
}

return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability
return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy", false) //nolint:goconst // readability
}

// MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service
Expand All @@ -192,7 +192,7 @@ func (c *Checker) MeService(ctx context.Context) (string, error) {
return skippedStr, nil
}

return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy")
return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy", false)
}

// measure implements metric collections for the check
Expand Down
7 changes: 6 additions & 1 deletion internal/servicecheck/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

// doRequest does an http request only to get the http status code
func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader bool) (string, error) {
// Read Bearer Token file from ServiceAccount
token, err := os.ReadFile(K8sTokenFile)
if err != nil {
Expand All @@ -32,6 +32,11 @@ func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}

if addOriginHeader {
hostname, _ := os.Hostname()
req.Header.Add(NeighbourOriginHeader, hostname)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return err.Error(), err
Expand Down

0 comments on commit 52767fb

Please sign in to comment.