diff --git a/.github/workflows/ci-helm-deploy-nginx.yml b/.github/workflows/ci-helm-deploy-nginx.yml index 91acc07f..ba969606 100644 --- a/.github/workflows/ci-helm-deploy-nginx.yml +++ b/.github/workflows/ci-helm-deploy-nginx.yml @@ -12,7 +12,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: GoReleaser uses: goreleaser/goreleaser-action@v4 with: diff --git a/.github/workflows/ci-helm-deploy-traefik.yml b/.github/workflows/ci-helm-deploy-traefik.yml index 568fe43b..6108e334 100644 --- a/.github/workflows/ci-helm-deploy-traefik.yml +++ b/.github/workflows/ci-helm-deploy-traefik.yml @@ -17,7 +17,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: GoReleaser uses: goreleaser/goreleaser-action@v4 with: diff --git a/.github/workflows/ci-kustomize-deploy.yml b/.github/workflows/ci-kustomize-deploy.yml index 99be8735..dcd9f025 100644 --- a/.github/workflows/ci-kustomize-deploy.yml +++ b/.github/workflows/ci-kustomize-deploy.yml @@ -12,7 +12,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: GoReleaser uses: goreleaser/goreleaser-action@v4 with: diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 20187c13..570f9994 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ jobs: steps: - uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - uses: actions/checkout@v4 - uses: golangci/golangci-lint-action@v4 with: @@ -29,7 +29,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: Run unit tests run: go test -race -covermode atomic -coverprofile=profile.cov ./... - name: Send coverage report diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7e2557c8..13f82e58 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,7 +13,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: Login to DockerHub uses: docker/login-action@v3 with: diff --git a/README.md b/README.md index b6dc7691..cb67bac5 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,12 @@ ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/postfinance/kubenurse) # Kubenurse + kubenurse is a little service that monitors all network connections in a Kubernetes cluster. Kubenurse measures request durations, records errors and exports those metrics in Prometheus format. ## Deployment + You can get the Docker image from [Docker Hub](https://hub.docker.com/r/postfinance/kubenurse/). The [examples](https://github.com/postfinance/kubenurse/tree/master/examples) directory contains manifests which can be used to deploy kubenurse to the kube-system namespace of your cluster. @@ -45,6 +47,7 @@ The following command can be used to install kubenurse with Helm: `helm upgrade | insecure | Set `KUBENURSE_INSECURE` environment variable | `true` | | allow_unschedulable | Sets `KUBENURSE_ALLOW_UNSCHEDULABLE` environment variable | `false` | | neighbour_filter | Sets `KUBENURSE_NEIGHBOUR_FILTER` environment variable | `app.kubernetes.io/name=kubenurse` | +| neighbour_limit | Sets `KUBENURSE_NEIGHBOUR_LIMIT` environment variable | `10` | | extra_ca | Sets `KUBENURSE_EXTRA_CA` environment variable | | | check_api_server_direct | Sets `KUBENURSE_CHECK_API_SERVER_DIRECT` environment variable | `true` | | check_api_server_dns | Sets `KUBENURSE_CHECK_API_SERVER_DNS` environment variable | `true` | @@ -74,7 +77,6 @@ dashboards [as this example](./doc/grafana-kubenurse.json) that show network lat ![Grafana ingress view](doc/grafana_ingress.png "Grafana ingress view") ![Grafana path view](doc/grafana_path.png "Grafana path view") - ## Configuration kubenurse is configured with environment variables: @@ -85,12 +87,13 @@ kubenurse is configured with environment variables: - `KUBENURSE_EXTRA_CA`: Additional CA cert path for TLS connections - `KUBENURSE_NAMESPACE`: Namespace in which to look for the neighbour kubenurses - `KUBENURSE_NEIGHBOUR_FILTER`: A Kubernetes label selector (eg. `app=kubenurse`) to filter neighbour kubenurses +- `KUBENURSE_NEIGHBOUR_LIMIT`: The maximum number of neighbours each kubenurse will query - `KUBENURSE_ALLOW_UNSCHEDULABLE`: If this is `"true"`, path checks to neighbouring kubenurses are made even if they are running on unschedulable nodes. - `KUBENURSE_CHECK_API_SERVER_DIRECT`: If this is `"true"` kubenurse will perform the check [API Server Direct](#API Server Direct). default is "true" - `KUBENURSE_CHECK_API_SERVER_DNS`: If this is `"true"`, kubenurse will perform the check [API Server DNS](#API Server DNS). default is "true" - `KUBENURSE_CHECK_ME_INGRESS`: If this is `"true"`, kubenurse will perform the check [Me Ingress](#Me Ingress). default is "true" - `KUBENURSE_CHECK_ME_SERVICE`: If this is `"true"`, kubenurse will perform the check [Me Service](#Me Service). default is "true" -- `KUBENURSE_CHECK_NEIGHBOURHOOD`: If this is `"true"`, kubenurse will perform the check [Neighbourhood](#Neighbourhood). default is "true" +- `KUBENURSE_CHECK_NEIGHBOURHOOD`: If this is `"true"`, kubenurse will perform the check [Neighbourhood](#neighbourhood). default is "true" - `KUBENURSE_CHECK_INTERVAL`: the frequency to perform kubenurse checks. the string should be formatted for [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). defaults to `5s` - `KUBENURSE_REUSE_CONNECTIONS`: whether to reuse connections or not for all checks. default is "false" - `KUBENURSE_HISTOGRAM_BUCKETS`: optional comma-separated list of float64, used in place of the [default prometheus histogram buckets](https://pkg.go.dev/github.com/prometheus/client_golang@v1.16.0/prometheus#DefBuckets) @@ -152,8 +155,8 @@ The `/alive` endpoint returns a JSON like this with status code 200 if everythin } ``` - ## Health Checks + Every five seconds and on every access of `/alive`, the checks described below are run. Check results are cached for 3 seconds in order to prevent excessive network traffic. @@ -162,12 +165,14 @@ A little illustration of what communication occurs, is here: ![Communication](doc/Communication.png "Communication") ### API Server Direct + Checks the `/version` endpoint of the Kubernetes API Server through the direct link (`KUBERNETES_SERVICE_HOST`, `KUBERNETES_SERVICE_PORT`). Metric type: `api_server_direct` ### API Server DNS + Checks the `/version` endpoint of the Kubernetes API Server through the Cluster DNS URL `https://kubernetes.default.svc:$KUBERNETES_SERVICE_PORT`. This also verifies a working `kube-dns` deployment. @@ -175,6 +180,7 @@ This also verifies a working `kube-dns` deployment. Metric type: `api_server_dns` ### Me Ingress + Checks if the kubenurse is reachable at the `/alwayshappy` endpoint behind the ingress. This address is provided by the environment variable `KUBENURSE_INGRESS_URL` that could look like `https://kubenurse.example.com`. @@ -183,6 +189,7 @@ This also verifies a correct upstream DNS resolution. Metric type: `me_ingress` ### Me Service + Checks if the kubenurse is reachable at the `/alwayshappy` endpoint through the Kubernetes service. The address is provided by the environment variable `KUBENURSE_SERVICE_URL` that could look like `http://kubenurse.mynamespace.default.svc:8080`. @@ -191,6 +198,7 @@ This also verifies a working `kube-proxy` setup. Metric type: `me_service` ### Neighbourhood + Checks if every neighbour kubenurse is reachable at the `/alwayshappy` endpoint. Neighbours are discovered by querying the kube-apiserver for every Pod in the `KUBENURSE_NAMESPACE` with label `KUBENURSE_NEIGHBOUR_FILTER`. @@ -201,7 +209,44 @@ this can be changed by setting `KUBENURSE_ALLOW_UNSCHEDULABLE="true"`. Metric type: `path_$KUBELET_HOSTNAME` +#### Neighbourhood filtering + +The number of checks for the neighbourhood used to grow as $O(N^2)$, which +rendered `kubenurse` impractical on large clusters, as documented in issue +[#55](https://github.com/postfinance/kubenurse/issues/55). +To combat this, a node filtering feature was implemented, which works as follows + +- kubenurse computes the `sha256` checksums for all neighbours' node names +- it sorts those checksums (this is actually implemented with a max-heap) +- it computes its own node name checksum, and queries the next 10 (per default) + nodes in the sorted checksums list + +Thanks to this, every node is making queries to the same 10 nodes, unless one +of those nodes disappears, in which case kubenurse will pick the next node in +the sorted checksums list. This comes with several advantages: + +- because of the way we first hash the node names, the checks distribution is + randomly distributed, independant of the node names. if we only picked the 10 + next nodes in a sorted list of the node names, then we might have biased the + results in environments where node names are sequential +- metrics-wise, a `kubenurse` pod should typically only have entries for ca. 10 + other neighbouring nodes worth of checks, which greatly reduces the load on + your monitoring infrastructure +- because we use a deterministic algorithm to choose which nodes to query, the + metrics churn rate stays minimal. (that is, if we randomly picked 10 nodes + for every check, then in the end there would be one prometheus bucket for + every node on the cluster, which would put useless load on the monitoring + infrastructure) + +Per default, the neighbourhood filtering is set to 10 nodes, which means that +on cluster with more than 10 nodes, each kubenurse will query 10 nodes, as +described above. + +To bypass the node filtering feature, you simply need to set the +`KUBENURSE_NEIGHBOUR_LIMIT` environment variable to 0. + ## Metrics + All performed checks expose metrics which can be used to monitor/alert: - SDN network latencies and errors @@ -214,5 +259,6 @@ All performed checks expose metrics which can be used to monitor/alert: - External DNS resolution errors (ingress URL resolution) At `/metrics` you will find these: + - `kubenurse_errors_total`: Kubenurse error counter partitioned by error type - `kubenurse_request_duration`: a histogram for Kubenurse request duration partitioned by error type diff --git a/go.mod b/go.mod index e3a3ac83..58542b97 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/postfinance/kubenurse -go 1.21 - -toolchain go1.21.5 +go 1.22 require ( github.com/prometheus/client_golang v1.19.0 diff --git a/helm/kubenurse/templates/daemonset.yaml b/helm/kubenurse/templates/daemonset.yaml index 1f56a3ce..fa5d0118 100644 --- a/helm/kubenurse/templates/daemonset.yaml +++ b/helm/kubenurse/templates/daemonset.yaml @@ -56,6 +56,8 @@ spec: value: {{ .Release.Namespace }} - name: KUBENURSE_NEIGHBOUR_FILTER value: {{ .Values.neighbour_filter }} + - name: KUBENURSE_NEIGHBOUR_LIMIT + value: {{ .Values.neighbour_limit | quote }} {{- if .Values.extra_ca }} - name: KUBENURSE_EXTRA_CA value: {{ .Values.extra_ca }} diff --git a/helm/kubenurse/values.yaml b/helm/kubenurse/values.yaml index ad0561d5..2b7312a2 100644 --- a/helm/kubenurse/values.yaml +++ b/helm/kubenurse/values.yaml @@ -35,6 +35,8 @@ service_url: "" allow_unschedulable: false # KUBENURSE_NEIGHBOUR_FILTER neighbour_filter: app.kubernetes.io/name=kubenurse +# KUBENURSE_NEIGHBOUR_LIMIT +neighbour_limit: 10 # KUBENURSE_EXTRA_CA extra_ca: "" # KUBENURSE_CHECK_API_SERVER_DIRECT diff --git a/internal/kubenurse/handler.go b/internal/kubenurse/handler.go index e71dcb97..c71a9b85 100644 --- a/internal/kubenurse/handler.go +++ b/internal/kubenurse/handler.go @@ -9,7 +9,7 @@ import ( ) func (s *Server) readyHandler() func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, _ *http.Request) { s.mu.Lock() defer s.mu.Unlock() @@ -34,8 +34,8 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) { servicecheck.Result // kubediscovery - NeighbourhoodState string `json:"neighbourhood_state"` - Neighbourhood []servicecheck.Neighbour `json:"neighbourhood"` + NeighbourhoodState string `json:"neighbourhood_state"` + Neighbourhood []*servicecheck.Neighbour `json:"neighbourhood"` } res := s.checker.LastCheckResult diff --git a/internal/kubenurse/server.go b/internal/kubenurse/server.go index 1059ab05..cc794fe8 100644 --- a/internal/kubenurse/server.go +++ b/internal/kubenurse/server.go @@ -48,6 +48,7 @@ type Server struct { // * KUBERNETES_SERVICE_PORT // * KUBENURSE_NAMESPACE // * KUBENURSE_NEIGHBOUR_FILTER +// * KUBENURSE_NEIGHBOUR_LIMIT // * KUBENURSE_SHUTDOWN_DURATION // * KUBENURSE_CHECK_API_SERVER_DIRECT // * KUBENURSE_CHECK_API_SERVER_DNS @@ -126,7 +127,6 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle shutdownDuration := 5 * time.Second if v, ok := os.LookupEnv("KUBENURSE_SHUTDOWN_DURATION"); ok { - var err error shutdownDuration, err = time.ParseDuration(v) if err != nil { @@ -134,13 +134,23 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle } } + chk.ShutdownDuration = shutdownDuration chk.KubenurseIngressURL = os.Getenv("KUBENURSE_INGRESS_URL") chk.KubenurseServiceURL = os.Getenv("KUBENURSE_SERVICE_URL") chk.KubernetesServiceHost = os.Getenv("KUBERNETES_SERVICE_HOST") chk.KubernetesServicePort = os.Getenv("KUBERNETES_SERVICE_PORT") chk.KubenurseNamespace = os.Getenv("KUBENURSE_NAMESPACE") chk.NeighbourFilter = os.Getenv("KUBENURSE_NEIGHBOUR_FILTER") - chk.ShutdownDuration = shutdownDuration + neighLimit := os.Getenv("KUBENURSE_NEIGHBOUR_LIMIT") + + if neighLimit != "" { + chk.NeighbourLimit, err = strconv.Atoi(neighLimit) + if err != nil { + return nil, err + } + } else { + chk.NeighbourLimit = 10 + } //nolint:goconst // No need to make "false" a constant in my opinion, readability is better like this. chk.SkipCheckAPIServerDirect = os.Getenv("KUBENURSE_CHECK_API_SERVER_DIRECT") == "false" diff --git a/internal/servicecheck/httptrace.go b/internal/servicecheck/httptrace.go index 20d9786b..942aca93 100644 --- a/internal/servicecheck/httptrace.go +++ b/internal/servicecheck/httptrace.go @@ -1,6 +1,7 @@ package servicecheck import ( + "context" "crypto/tls" "log" "net/http" @@ -30,8 +31,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati Name: "httpclient_requests_total", Help: "A counter for requests from the kubenurse http client.", }, - // []string{"code", "method", "type"}, // TODO - []string{"code", "method"}, + []string{"code", "method", "type"}, ) httpclientReqDuration := prometheus.NewHistogramVec( @@ -41,8 +41,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati Help: "A latency histogram of request latencies from the kubenurse http client.", Buckets: durationHistogram, }, - // []string{"type"}, // TODO - []string{}, + []string{"type"}, ) httpclientTraceReqDuration := prometheus.NewHistogramVec( @@ -52,8 +51,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", Buckets: durationHistogram, }, - []string{"event"}, - // []string{"event", "type"}, // TODO + []string{"event", "type"}, ) registry.MustRegister(httpclientReqTotal, httpclientReqDuration, httpclientTraceReqDuration) @@ -68,7 +66,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati return } - httpclientTraceReqDuration.WithLabelValues(traceEventType).Observe(td) // TODO: add back kubenurseTypeKey + httpclientTraceReqDuration.WithLabelValues(traceEventType, kubenurseTypeLabel).Observe(td) } // Return a http.RoundTripper for tracing requests @@ -78,10 +76,10 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati // Add tracing hooks trace := &httptrace.ClientTrace{ - GotConn: func(info httptrace.GotConnInfo) { + GotConn: func(_ httptrace.GotConnInfo) { collectMetric("got_conn", start, r, nil) }, - DNSStart: func(info httptrace.DNSStartInfo) { + DNSStart: func(_ httptrace.DNSStartInfo) { collectMetric("dns_start", start, r, nil) }, DNSDone: func(info httptrace.DNSDoneInfo) { @@ -96,7 +94,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati TLSHandshakeStart: func() { collectMetric("tls_handshake_start", start, r, nil) }, - TLSHandshakeDone: func(_ tls.ConnectionState, err error) { + TLSHandshakeDone: func(_ tls.ConnectionState, _ error) { collectMetric("tls_handshake_done", start, r, nil) }, WroteRequest: func(info httptrace.WroteRequestInfo) { @@ -110,14 +108,14 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati // Do request with tracing enabled r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace)) - // // TODO: uncomment when issue #55 is solved (N^2 request will increase cardinality of path_ metrics too much otherwise) - // typeFromCtxFn := promhttp.WithLabelFromCtx("type", func(ctx context.Context) string { - // return ctx.Value(kubenurseTypeKey{}).(string) - // }) + typeFromCtxFn := promhttp.WithLabelFromCtx("type", func(ctx context.Context) string { + return ctx.Value(kubenurseTypeKey{}).(string) + }) rt := next // variable pinning :) essential, to prevent always re-instrumenting the original variable - rt = promhttp.InstrumentRoundTripperCounter(httpclientReqTotal, rt) - rt = promhttp.InstrumentRoundTripperDuration(httpclientReqDuration, rt) + rt = promhttp.InstrumentRoundTripperCounter(httpclientReqTotal, rt, typeFromCtxFn) + rt = promhttp.InstrumentRoundTripperDuration(httpclientReqDuration, rt, typeFromCtxFn) + return rt.RoundTrip(r) }) } diff --git a/internal/servicecheck/neighbours.go b/internal/servicecheck/neighbours.go index c09c75c3..0f8145ec 100644 --- a/internal/servicecheck/neighbours.go +++ b/internal/servicecheck/neighbours.go @@ -1,7 +1,10 @@ package servicecheck import ( + "container/heap" "context" + "crypto/sha256" + "encoding/binary" "fmt" "os" @@ -11,16 +14,23 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +//nolint:gochecknoglobals // used during testing +var ( + osHostname = os.Hostname + currentNode string +) + // Neighbour represents a kubenurse which should be reachable type Neighbour struct { PodName string PodIP string HostIP string NodeName string + NodeHash uint64 } // GetNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector. -func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]Neighbour, error) { +func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) { // Get all pods pods := v1.PodList{} selector, _ := labels.Parse(labelSelector) @@ -33,9 +43,9 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st return nil, fmt.Errorf("list pods: %w", err) } - var neighbours = make([]Neighbour, 0, len(pods.Items)) + var neighbours = make([]*Neighbour, 0, len(pods.Items)) - var hostname, _ = os.Hostname() + var hostname, _ = osHostname() // process pods for idx := range pods.Items { @@ -55,7 +65,8 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st continue } - if pod.Name == hostname { // only quey other pods, not the currently running pod + if pod.Name == hostname { // only query other pods, not the currently running pod + currentNode = pod.Spec.NodeName continue } @@ -64,8 +75,9 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st PodIP: pod.Status.PodIP, HostIP: pod.Status.HostIP, NodeName: pod.Spec.NodeName, + NodeHash: sha256Uint64(pod.Spec.NodeName), } - neighbours = append(neighbours, n) + neighbours = append(neighbours, &n) } return neighbours, nil @@ -73,10 +85,12 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st // checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes // which are not schedulable are excluded from this check to avoid possible false errors. -func (c *Checker) checkNeighbours(nh []Neighbour) { - for _, neighbour := range nh { - neighbour := neighbour // pin +func (c *Checker) checkNeighbours(nh []*Neighbour) { + if c.NeighbourLimit > 0 && len(nh) > c.NeighbourLimit { + nh = c.filterNeighbours(nh) + } + for _, neighbour := range nh { check := func(ctx context.Context) (string, error) { if c.UseTLS { return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy") @@ -88,3 +102,56 @@ func (c *Checker) checkNeighbours(nh []Neighbour) { _, _ = c.measure(check, "path_"+neighbour.NodeName) } } + +func (c *Checker) filterNeighbours(nh []*Neighbour) []*Neighbour { + m := make(map[uint64]*Neighbour, c.NeighbourLimit+1) + + sl := make(Uint64Heap, 0, c.NeighbourLimit+1) + h := &sl + currentNodeHash := sha256Uint64(currentNode) + + heap.Init(h) + + for _, n := range nh { + adjHash := n.NodeHash - currentNodeHash + m[adjHash] = n + + heap.Push(h, adjHash) + + if len(*h) > c.NeighbourLimit { + p := heap.Pop(h).(uint64) + delete(m, p) + } + } + + filteredNeighbours := make([]*Neighbour, 0, c.NeighbourLimit) + + for _, n := range m { + filteredNeighbours = append(filteredNeighbours, n) + } + + return filteredNeighbours +} + +func sha256Uint64(s string) uint64 { + h := sha256.Sum256([]byte(s)) + return binary.BigEndian.Uint64(h[:8]) +} + +type Uint64Heap []uint64 + +func (h Uint64Heap) Len() int { return len(h) } +func (h Uint64Heap) Less(i, j int) bool { return h[i] > h[j] } // we want a max-heap, therefore the inversed condition +func (h Uint64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *Uint64Heap) Push(x any) { + *h = append(*h, x.(uint64)) +} + +func (h *Uint64Heap) Pop() any { + n := len(*h) + x := (*h)[n-1] + *h = (*h)[0 : n-1] + + return x +} diff --git a/internal/servicecheck/neighbours_test.go b/internal/servicecheck/neighbours_test.go new file mode 100644 index 00000000..dd17007b --- /dev/null +++ b/internal/servicecheck/neighbours_test.go @@ -0,0 +1,74 @@ +package servicecheck + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func generateNeighbours(n int) (nh []*Neighbour) { + + nh = make([]*Neighbour, 0, n) + + for i := range n { + nodeName := fmt.Sprintf("a1-k8s-abcd%03d.domain.tld", i) + neigh := Neighbour{ + NodeName: nodeName, + NodeHash: sha256Uint64(nodeName), + } + nh = append(nh, &neigh) + } + + return +} + +func BenchmarkNodeFiltering(b *testing.B) { + n := 10_000 + neighbourLimit := 10 + nh := generateNeighbours(n) + require.NotNil(b, nh) + + checker := Checker{ + NeighbourLimit: neighbourLimit, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + currentNode = nh[i%len(nh)].NodeName + b.StartTimer() + checker.filterNeighbours(nh) + b.StopTimer() + } +} + +func TestNodeFiltering(t *testing.T) { + + n := 1_000 + neighbourLimit := 10 + nh := generateNeighbours(n) + require.NotNil(t, nh) + + checker := Checker{ + NeighbourLimit: neighbourLimit, + } + + t.Run("all nodes should get NEIGHBOUR_LIMIT checks", func(t *testing.T) { + counter := make(map[string]int, n) + + for i := range n { + currentNode = nh[i].NodeName + filtered := checker.filterNeighbours(nh) + require.Equal(t, neighbourLimit, len(filtered)) + + for _, neigh := range filtered { + counter[neigh.NodeName]++ + } + } + + for _, count := range counter { + require.Equal(t, neighbourLimit, count, "one node didn't receive exactly NEIGHBOUR_LIMIT checks") + } + + }) +} diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index eee65e37..dfb176f9 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -114,7 +114,7 @@ func (c *Checker) Run() (Result, bool) { if c.SkipCheckNeighbourhood { res.NeighbourhoodState = skippedStr } else { - res.Neighbourhood, err = c.GetNeighbours(context.TODO(), c.KubenurseNamespace, c.NeighbourFilter) + res.Neighbourhood, err = c.GetNeighbours(context.Background(), c.KubenurseNamespace, c.NeighbourFilter) haserr = haserr || (err != nil) // Neighbourhood special error treating diff --git a/internal/servicecheck/types.go b/internal/servicecheck/types.go index fede2c3c..98151e98 100644 --- a/internal/servicecheck/types.go +++ b/internal/servicecheck/types.go @@ -29,6 +29,7 @@ type Checker struct { // Neighbourhood KubenurseNamespace string NeighbourFilter string + NeighbourLimit int allowUnschedulable bool SkipCheckNeighbourhood bool @@ -57,12 +58,12 @@ type Checker struct { // Result contains the result of a performed check run type Result struct { - APIServerDirect string `json:"api_server_direct"` - APIServerDNS string `json:"api_server_dns"` - MeIngress string `json:"me_ingress"` - MeService string `json:"me_service"` - NeighbourhoodState string `json:"neighbourhood_state"` - Neighbourhood []Neighbour `json:"neighbourhood"` + APIServerDirect string `json:"api_server_direct"` + APIServerDNS string `json:"api_server_dns"` + MeIngress string `json:"me_ingress"` + MeService string `json:"me_service"` + NeighbourhoodState string `json:"neighbourhood_state"` + Neighbourhood []*Neighbour `json:"neighbourhood"` } // Check is the signature used by all checks that the checker can execute.