Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/july improvements #152

Merged
merged 10 commits into from
Jul 23, 2024
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The following command can be used to install kubenurse with Helm: `helm upgrade
| neighbour_limit | Sets `KUBENURSE_NEIGHBOUR_LIMIT` environment variable | `10` |
| histogram_buckets | Sets `KUBENURSE_HISTOGRAM_BUCKETS` environment variable | |
| extra_ca | Sets `KUBENURSE_EXTRA_CA` environment variable | |
| extra_checks | Sets `KUBENURSE_EXTRA_CHECKS` environment variable | |
| check_api_server_direct | Sets `KUBENURSE_CHECK_API_SERVER_DIRECT` environment variable | `true` |
| check_api_server_dns | Sets `KUBENURSE_CHECK_API_SERVER_DNS` environment variable | `true` |
| check_me_ingress | Sets `KUBENURSE_CHECK_ME_INGRESS` environment variable | `true` |
Expand All @@ -146,6 +147,7 @@ The following command can be used to install kubenurse with Helm: `helm upgrade
- `KUBENURSE_SERVICE_URL`: An URL to the kubenurse in order to check the Kubernetes service
- `KUBENURSE_INSECURE`: If "true", TLS connections will not validate the certificate
- `KUBENURSE_EXTRA_CA`: Additional CA cert path for TLS connections
- `KUBENURSE_EXTRA_CHECKS`: Additional checks, specified as a list (separated by a vertical bar `|`) where each entry of the list has the format: `<metric_name>:<url_to_check>`. For example `google:https://www.google.ch/|cloudflare:https://www.cloudflare.com/`
- `KUBENURSE_NAMESPACE`: Namespace in which to look for the neighbour kubenurses
- `KUBENURSE_NEIGHBOUR_FILTER`: A Kubernetes label selector (eg. `app=kubenurse`) to filter neighbour kubenurses
- `KUBENURSE_NEIGHBOUR_LIMIT`: The maximum number of neighbours each kubenurse will query
Expand All @@ -166,6 +168,7 @@ Following variables are injected to the Pod by Kubernetes and should not be defi

- `KUBERNETES_SERVICE_HOST`: Host to communicate to the kube-apiserver
- `KUBERNETES_SERVICE_PORT`: Port to communicate to the kube-apiserver

</details>

## HTTP Endpoints
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ toolchain go1.22.1
require (
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
k8s.io/api v0.30.1
k8s.io/apimachinery v0.30.1
k8s.io/client-go v0.30.1
sigs.k8s.io/controller-runtime v0.18.3
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
k8s.io/client-go v0.30.3
sigs.k8s.io/controller-runtime v0.18.4
)

require (
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.30.1 h1:kCm/6mADMdbAxmIh0LBjS54nQBE+U4KmbCfIkF5CpJY=
k8s.io/api v0.30.1/go.mod h1:ddbN2C0+0DIiPntan/bye3SW3PdwLa11/0yqwvuRrJM=
k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ=
k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04=
k8s.io/apiextensions-apiserver v0.30.1 h1:4fAJZ9985BmpJG6PkoxVRpXv9vmPUOVzl614xarePws=
k8s.io/apiextensions-apiserver v0.30.1/go.mod h1:R4GuSrlhgq43oRY9sF2IToFh7PVlF1JjfWdoG3pixk4=
k8s.io/apimachinery v0.30.1 h1:ZQStsEfo4n65yAdlGTfP/uSHMQSoYzU/oeEbkmF7P2U=
k8s.io/apimachinery v0.30.1/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
k8s.io/client-go v0.30.1 h1:uC/Ir6A3R46wdkgCV3vbLyNOYyCJ8oZnjtJGKfytl/Q=
k8s.io/client-go v0.30.1/go.mod h1:wrAqLNs2trwiCH/wxxmT/x3hKVH9PuV0GGW0oDoHVqc=
k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc=
k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k=
k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/controller-runtime v0.18.3 h1:B5Wmmo8WMWK7izei+2LlXLVDGzMwAHBNLX68lwtlSR4=
sigs.k8s.io/controller-runtime v0.18.3/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg=
sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHvm5BZw=
sigs.k8s.io/controller-runtime v0.18.4/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
Expand Down
27 changes: 8 additions & 19 deletions internal/kubenurse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@ import (

func (s *Server) readyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, _ *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()

if s.ready {
if s.ready.Load() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(http.StatusServiceUnavailable)
}
}
}
Expand All @@ -29,13 +26,7 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) {
UserAgent string `json:"user_agent"`
RequestURI string `json:"request_uri"`
RemoteAddr string `json:"remote_addr"`

// checker.Result
servicecheck.Result

// kubediscovery
NeighbourhoodState string `json:"neighbourhood_state"`
Neighbourhood []*servicecheck.Neighbour `json:"neighbourhood"`
Result map[string]any `json:"last_check_result"`
}

res := s.checker.LastCheckResult
Expand All @@ -46,13 +37,11 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) {

// Add additional data
out := Output{
Result: *res,
Headers: r.Header,
UserAgent: r.UserAgent(),
RequestURI: r.RequestURI,
RemoteAddr: r.RemoteAddr,
Neighbourhood: res.Neighbourhood,
NeighbourhoodState: res.NeighbourhoodState,
Result: res,
Headers: r.Header,
UserAgent: r.UserAgent(),
RequestURI: r.RequestURI,
RemoteAddr: r.RemoteAddr,
}
out.Hostname, _ = os.Hostname()

Expand Down
3 changes: 1 addition & 2 deletions internal/kubenurse/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kubenurse

import (
"context"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -14,7 +13,7 @@ func TestServerHandler(t *testing.T) {
r := require.New(t)

fakeClient := fake.NewFakeClient()
kubenurse, err := New(context.Background(), fakeClient)
kubenurse, err := New(fakeClient)

r.NoError(err)
r.NotNil(kubenurse)
Expand Down
51 changes: 36 additions & 15 deletions internal/kubenurse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/postfinance/kubenurse/internal/servicecheck"
Expand All @@ -34,9 +35,7 @@ type Server struct {
// If we want to consider kubenurses on unschedulable nodes
allowUnschedulable bool

// Mutex to protect ready flag
mu *sync.Mutex
ready bool
ready atomic.Bool

// Neighbourhood incoming checks
neighbouringIncomingChecks prometheus.Gauge
Expand All @@ -60,7 +59,7 @@ type Server struct {
// * KUBENURSE_CHECK_ME_SERVICE
// * KUBENURSE_CHECK_NEIGHBOURHOOD
// * KUBENURSE_CHECK_INTERVAL
func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity
func New(c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity
mux := http.NewServeMux()

checkInterval := defaultCheckInterval
Expand Down Expand Up @@ -94,10 +93,10 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
useTLS: os.Getenv("KUBENURSE_USE_TLS") == "true",
allowUnschedulable: os.Getenv("KUBENURSE_ALLOW_UNSCHEDULABLE") == "true",
checkInterval: checkInterval,
mu: new(sync.Mutex),
ready: true,
ready: atomic.Bool{},
}

server.ready.Store(true)
server.neighboursTTLCache.Init(60 * time.Second)

promRegistry := prometheus.NewRegistry()
Expand Down Expand Up @@ -135,7 +134,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
}

// setup checker
chk, err := servicecheck.New(ctx, c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets)
chk, err := servicecheck.New(c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,6 +176,19 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle

chk.UseTLS = server.useTLS

// Extra checks parsing
if extraChecks := os.Getenv("KUBENURSE_EXTRA_CHECKS"); extraChecks != "" {
for _, extraCheck := range strings.Split(extraChecks, "|") {
requestType, url, fnd := strings.Cut(extraCheck, ":")
if !fnd {
slog.Error("couldn't parse one of extraChecks", "extraCheck", extraCheck)
return nil, fmt.Errorf("extra checks parsing - missing colon ':' between metric name and url")
}

chk.ExtraChecks[requestType] = url
}
}

server.checker = chk

// setup http routes
Expand All @@ -190,7 +202,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
}

// Run starts the periodic checker and the http/https server(s) and blocks until Shutdown was called.
func (s *Server) Run() error {
func (s *Server) Run(ctx context.Context) error {
var (
wg sync.WaitGroup
errc = make(chan error, 2) // max two errors can happen
Expand All @@ -212,7 +224,17 @@ func (s *Server) Run() error {
go func() {
defer wg.Done()

s.checker.RunScheduled(s.checkInterval)
ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.checker.Run(ctx)
case <-ctx.Done():
return
}
}
}()

wg.Add(1)
Expand Down Expand Up @@ -260,19 +282,18 @@ func (s *Server) Run() error {
}

// Shutdown disables the readiness probe and then gracefully halts the kubenurse http/https server(s).
func (s *Server) Shutdown(ctx context.Context) error {
s.mu.Lock()
s.ready = false
s.mu.Unlock()
func (s *Server) Shutdown() error {
s.ready.Store(false)

// wait before actually shutting down the http/s server, as the updated
// endpoints for the kubenurse service might not have propagated everywhere
// (other kubenurse/ingress controller) yet, which will lead to
// me_ingress or path errors in other pods
time.Sleep(s.checker.ShutdownDuration)

// stop the scheduled checker
s.checker.StopScheduled()
// background ctx since, the "root" context is already canceled
ctx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

if err := s.http.Shutdown(ctx); err != nil {
return fmt.Errorf("stop http server: %w", err)
Expand Down
16 changes: 13 additions & 3 deletions internal/kubenurse/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kubenurse

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -11,25 +13,33 @@ import (
func TestCombined(t *testing.T) {
r := require.New(t)

os.Setenv("KUBENURSE_EXTRA_CHECKS", "cloudy_endpoint:http://cloudy.enpdoint:1234/test|ep_number_two:http://interesting.endpoint:8080/abcd")
fakeClient := fake.NewFakeClient()
kubenurse, err := New(context.Background(), fakeClient)
kubenurse, err := New(fakeClient)
r.NoError(err)
r.NotNil(kubenurse)

r.Equal(map[string]string{
"ep_number_two": "http://interesting.endpoint:8080/abcd",
"cloudy_endpoint": "http://cloudy.enpdoint:1234/test",
}, kubenurse.checker.ExtraChecks)

t.Run("start/stop", func(t *testing.T) {
r := require.New(t)
errc := make(chan error, 1)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
go func() {
// blocks until shutdown is called
err := kubenurse.Run()
err := kubenurse.Run(ctx)

errc <- err
close(errc)
cancel()
}()

// Shutdown, Run() should stop after function completes
err := kubenurse.Shutdown(context.Background())
err := kubenurse.Shutdown()
r.NoError(err)

err = <-errc // blocks until kubenurse.Run() finishes and eventually returns an error
Expand Down
7 changes: 6 additions & 1 deletion internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durHis
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
[]string{"code", "method", "type"},
[]string{"code", "type"},
)

httpclientReqDuration := prometheus.NewHistogramVec(
Expand Down Expand Up @@ -144,6 +144,11 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durHis
}
} else {
eventType := "round_trip_error"
labels := map[string]string{
"code": eventType, // we reuse round_trip_error as status code to prevent introducing a new label
"type": kubenurseRequestType,
}
httpclientReqTotal.With(labels).Inc() // also increment the total counter, as InstrumentRoundTripperCounter only instruments successful requests
// errorCounter.WithLabelValues(eventType, kubenurseRequestType).Inc()
// normally, errors are already accounted for in the ClientTrace section.
// we still log the error, so in the future we can compare the log entries and see if somehow
Expand Down
24 changes: 2 additions & 22 deletions internal/servicecheck/neighbours.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Neighbour struct {
NodeHash uint64
}

// GetNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector.
func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) {
// getNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector.
func (c *Checker) getNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) {
// Get all pods
pods := v1.PodList{}
selector, _ := labels.Parse(labelSelector)
Expand Down Expand Up @@ -87,26 +87,6 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st
return neighbours, nil
}

// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes
// which are not schedulable are excluded from this check to avoid possible false errors.
func (c *Checker) checkNeighbours(nh []*Neighbour) {
if c.NeighbourLimit > 0 && len(nh) > c.NeighbourLimit {
nh = c.filterNeighbours(nh)
}

for _, neighbour := range nh {
check := func(ctx context.Context) (string, error) {
if c.UseTLS {
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy", true)
}

return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy", true)
}

c.measure(check, "path_"+neighbour.NodeName)
}
}

func (c *Checker) filterNeighbours(nh []*Neighbour) []*Neighbour {
m := make(map[uint64]*Neighbour, c.NeighbourLimit+1)

Expand Down
Loading
Loading