diff --git a/README.md b/README.md index d56a9d4..573a05d 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ The following command can be used to install kubenurse with Helm: `helm upgrade | neighbour_limit | Sets `KUBENURSE_NEIGHBOUR_LIMIT` environment variable | `10` | | histogram_buckets | Sets `KUBENURSE_HISTOGRAM_BUCKETS` environment variable | | | extra_ca | Sets `KUBENURSE_EXTRA_CA` environment variable | | +| extra_checks | Sets `KUBENURSE_EXTRA_CHECKS` environment variable | | | check_api_server_direct | Sets `KUBENURSE_CHECK_API_SERVER_DIRECT` environment variable | `true` | | check_api_server_dns | Sets `KUBENURSE_CHECK_API_SERVER_DNS` environment variable | `true` | | check_me_ingress | Sets `KUBENURSE_CHECK_ME_INGRESS` environment variable | `true` | @@ -146,6 +147,7 @@ The following command can be used to install kubenurse with Helm: `helm upgrade - `KUBENURSE_SERVICE_URL`: An URL to the kubenurse in order to check the Kubernetes service - `KUBENURSE_INSECURE`: If "true", TLS connections will not validate the certificate - `KUBENURSE_EXTRA_CA`: Additional CA cert path for TLS connections +- `KUBENURSE_EXTRA_CHECKS`: Additional checks, specified as a list (separated by a vertical bar `|`) where each entry of the list has the format: `:`. For example `google:https://www.google.ch/|cloudflare:https://www.cloudflare.com/` - `KUBENURSE_NAMESPACE`: Namespace in which to look for the neighbour kubenurses - `KUBENURSE_NEIGHBOUR_FILTER`: A Kubernetes label selector (eg. `app=kubenurse`) to filter neighbour kubenurses - `KUBENURSE_NEIGHBOUR_LIMIT`: The maximum number of neighbours each kubenurse will query @@ -166,6 +168,7 @@ Following variables are injected to the Pod by Kubernetes and should not be defi - `KUBERNETES_SERVICE_HOST`: Host to communicate to the kube-apiserver - `KUBERNETES_SERVICE_PORT`: Port to communicate to the kube-apiserver + ## HTTP Endpoints diff --git a/go.mod b/go.mod index 141d903..971f536 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,10 @@ toolchain go1.22.1 require ( github.com/prometheus/client_golang v1.19.1 github.com/stretchr/testify v1.9.0 - k8s.io/api v0.30.1 - k8s.io/apimachinery v0.30.1 - k8s.io/client-go v0.30.1 - sigs.k8s.io/controller-runtime v0.18.3 + k8s.io/api v0.30.3 + k8s.io/apimachinery v0.30.3 + k8s.io/client-go v0.30.3 + sigs.k8s.io/controller-runtime v0.18.4 ) require ( diff --git a/go.sum b/go.sum index c67423d..34786d4 100644 --- a/go.sum +++ b/go.sum @@ -166,22 +166,22 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.30.1 h1:kCm/6mADMdbAxmIh0LBjS54nQBE+U4KmbCfIkF5CpJY= -k8s.io/api v0.30.1/go.mod h1:ddbN2C0+0DIiPntan/bye3SW3PdwLa11/0yqwvuRrJM= +k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= +k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= k8s.io/apiextensions-apiserver v0.30.1 h1:4fAJZ9985BmpJG6PkoxVRpXv9vmPUOVzl614xarePws= k8s.io/apiextensions-apiserver v0.30.1/go.mod h1:R4GuSrlhgq43oRY9sF2IToFh7PVlF1JjfWdoG3pixk4= -k8s.io/apimachinery v0.30.1 h1:ZQStsEfo4n65yAdlGTfP/uSHMQSoYzU/oeEbkmF7P2U= -k8s.io/apimachinery v0.30.1/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= -k8s.io/client-go v0.30.1 h1:uC/Ir6A3R46wdkgCV3vbLyNOYyCJ8oZnjtJGKfytl/Q= -k8s.io/client-go v0.30.1/go.mod h1:wrAqLNs2trwiCH/wxxmT/x3hKVH9PuV0GGW0oDoHVqc= +k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= +k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= +k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -sigs.k8s.io/controller-runtime v0.18.3 h1:B5Wmmo8WMWK7izei+2LlXLVDGzMwAHBNLX68lwtlSR4= -sigs.k8s.io/controller-runtime v0.18.3/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= +sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHvm5BZw= +sigs.k8s.io/controller-runtime v0.18.4/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/internal/kubenurse/handler.go b/internal/kubenurse/handler.go index e9013f1..831354f 100644 --- a/internal/kubenurse/handler.go +++ b/internal/kubenurse/handler.go @@ -10,13 +10,10 @@ import ( func (s *Server) readyHandler() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, _ *http.Request) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.ready { + if s.ready.Load() { w.WriteHeader(http.StatusOK) } else { - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(http.StatusServiceUnavailable) } } } @@ -29,13 +26,7 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) { UserAgent string `json:"user_agent"` RequestURI string `json:"request_uri"` RemoteAddr string `json:"remote_addr"` - - // checker.Result - servicecheck.Result - - // kubediscovery - NeighbourhoodState string `json:"neighbourhood_state"` - Neighbourhood []*servicecheck.Neighbour `json:"neighbourhood"` + Result map[string]any `json:"last_check_result"` } res := s.checker.LastCheckResult @@ -46,13 +37,11 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) { // Add additional data out := Output{ - Result: *res, - Headers: r.Header, - UserAgent: r.UserAgent(), - RequestURI: r.RequestURI, - RemoteAddr: r.RemoteAddr, - Neighbourhood: res.Neighbourhood, - NeighbourhoodState: res.NeighbourhoodState, + Result: res, + Headers: r.Header, + UserAgent: r.UserAgent(), + RequestURI: r.RequestURI, + RemoteAddr: r.RemoteAddr, } out.Hostname, _ = os.Hostname() diff --git a/internal/kubenurse/handler_test.go b/internal/kubenurse/handler_test.go index c459b0c..2f58605 100644 --- a/internal/kubenurse/handler_test.go +++ b/internal/kubenurse/handler_test.go @@ -1,7 +1,6 @@ package kubenurse import ( - "context" "net/http" "net/http/httptest" "testing" @@ -14,7 +13,7 @@ func TestServerHandler(t *testing.T) { r := require.New(t) fakeClient := fake.NewFakeClient() - kubenurse, err := New(context.Background(), fakeClient) + kubenurse, err := New(fakeClient) r.NoError(err) r.NotNil(kubenurse) diff --git a/internal/kubenurse/server.go b/internal/kubenurse/server.go index 1bc4fb8..b9c3f59 100644 --- a/internal/kubenurse/server.go +++ b/internal/kubenurse/server.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/postfinance/kubenurse/internal/servicecheck" @@ -34,9 +35,7 @@ type Server struct { // If we want to consider kubenurses on unschedulable nodes allowUnschedulable bool - // Mutex to protect ready flag - mu *sync.Mutex - ready bool + ready atomic.Bool // Neighbourhood incoming checks neighbouringIncomingChecks prometheus.Gauge @@ -60,7 +59,7 @@ type Server struct { // * KUBENURSE_CHECK_ME_SERVICE // * KUBENURSE_CHECK_NEIGHBOURHOOD // * KUBENURSE_CHECK_INTERVAL -func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity +func New(c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity mux := http.NewServeMux() checkInterval := defaultCheckInterval @@ -94,10 +93,10 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle useTLS: os.Getenv("KUBENURSE_USE_TLS") == "true", allowUnschedulable: os.Getenv("KUBENURSE_ALLOW_UNSCHEDULABLE") == "true", checkInterval: checkInterval, - mu: new(sync.Mutex), - ready: true, + ready: atomic.Bool{}, } + server.ready.Store(true) server.neighboursTTLCache.Init(60 * time.Second) promRegistry := prometheus.NewRegistry() @@ -135,7 +134,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle } // setup checker - chk, err := servicecheck.New(ctx, c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets) + chk, err := servicecheck.New(c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets) if err != nil { return nil, err } @@ -177,6 +176,19 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle chk.UseTLS = server.useTLS + // Extra checks parsing + if extraChecks := os.Getenv("KUBENURSE_EXTRA_CHECKS"); extraChecks != "" { + for _, extraCheck := range strings.Split(extraChecks, "|") { + requestType, url, fnd := strings.Cut(extraCheck, ":") + if !fnd { + slog.Error("couldn't parse one of extraChecks", "extraCheck", extraCheck) + return nil, fmt.Errorf("extra checks parsing - missing colon ':' between metric name and url") + } + + chk.ExtraChecks[requestType] = url + } + } + server.checker = chk // setup http routes @@ -190,7 +202,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle } // Run starts the periodic checker and the http/https server(s) and blocks until Shutdown was called. -func (s *Server) Run() error { +func (s *Server) Run(ctx context.Context) error { var ( wg sync.WaitGroup errc = make(chan error, 2) // max two errors can happen @@ -212,7 +224,17 @@ func (s *Server) Run() error { go func() { defer wg.Done() - s.checker.RunScheduled(s.checkInterval) + ticker := time.NewTicker(s.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.checker.Run(ctx) + case <-ctx.Done(): + return + } + } }() wg.Add(1) @@ -260,10 +282,8 @@ func (s *Server) Run() error { } // Shutdown disables the readiness probe and then gracefully halts the kubenurse http/https server(s). -func (s *Server) Shutdown(ctx context.Context) error { - s.mu.Lock() - s.ready = false - s.mu.Unlock() +func (s *Server) Shutdown() error { + s.ready.Store(false) // wait before actually shutting down the http/s server, as the updated // endpoints for the kubenurse service might not have propagated everywhere @@ -271,8 +291,9 @@ func (s *Server) Shutdown(ctx context.Context) error { // me_ingress or path errors in other pods time.Sleep(s.checker.ShutdownDuration) - // stop the scheduled checker - s.checker.StopScheduled() + // background ctx since, the "root" context is already canceled + ctx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() if err := s.http.Shutdown(ctx); err != nil { return fmt.Errorf("stop http server: %w", err) diff --git a/internal/kubenurse/server_test.go b/internal/kubenurse/server_test.go index 5e38c1a..9034d4b 100644 --- a/internal/kubenurse/server_test.go +++ b/internal/kubenurse/server_test.go @@ -2,7 +2,9 @@ package kubenurse import ( "context" + "os" "testing" + "time" "github.com/stretchr/testify/require" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -11,25 +13,33 @@ import ( func TestCombined(t *testing.T) { r := require.New(t) + os.Setenv("KUBENURSE_EXTRA_CHECKS", "cloudy_endpoint:http://cloudy.enpdoint:1234/test|ep_number_two:http://interesting.endpoint:8080/abcd") fakeClient := fake.NewFakeClient() - kubenurse, err := New(context.Background(), fakeClient) + kubenurse, err := New(fakeClient) r.NoError(err) r.NotNil(kubenurse) + r.Equal(map[string]string{ + "ep_number_two": "http://interesting.endpoint:8080/abcd", + "cloudy_endpoint": "http://cloudy.enpdoint:1234/test", + }, kubenurse.checker.ExtraChecks) + t.Run("start/stop", func(t *testing.T) { r := require.New(t) errc := make(chan error, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) go func() { // blocks until shutdown is called - err := kubenurse.Run() + err := kubenurse.Run(ctx) errc <- err close(errc) + cancel() }() // Shutdown, Run() should stop after function completes - err := kubenurse.Shutdown(context.Background()) + err := kubenurse.Shutdown() r.NoError(err) err = <-errc // blocks until kubenurse.Run() finishes and eventually returns an error diff --git a/internal/servicecheck/httptrace.go b/internal/servicecheck/httptrace.go index 636ca02..fc66e5c 100644 --- a/internal/servicecheck/httptrace.go +++ b/internal/servicecheck/httptrace.go @@ -34,7 +34,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durHis Name: "httpclient_requests_total", Help: "A counter for requests from the kubenurse http client.", }, - []string{"code", "method", "type"}, + []string{"code", "type"}, ) httpclientReqDuration := prometheus.NewHistogramVec( @@ -144,6 +144,11 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durHis } } else { eventType := "round_trip_error" + labels := map[string]string{ + "code": eventType, // we reuse round_trip_error as status code to prevent introducing a new label + "type": kubenurseRequestType, + } + httpclientReqTotal.With(labels).Inc() // also increment the total counter, as InstrumentRoundTripperCounter only instruments successful requests // errorCounter.WithLabelValues(eventType, kubenurseRequestType).Inc() // normally, errors are already accounted for in the ClientTrace section. // we still log the error, so in the future we can compare the log entries and see if somehow diff --git a/internal/servicecheck/neighbours.go b/internal/servicecheck/neighbours.go index b8fedee..e696b1a 100644 --- a/internal/servicecheck/neighbours.go +++ b/internal/servicecheck/neighbours.go @@ -33,8 +33,8 @@ type Neighbour struct { NodeHash uint64 } -// GetNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector. -func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) { +// getNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector. +func (c *Checker) getNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) { // Get all pods pods := v1.PodList{} selector, _ := labels.Parse(labelSelector) @@ -87,26 +87,6 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st return neighbours, nil } -// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes -// which are not schedulable are excluded from this check to avoid possible false errors. -func (c *Checker) checkNeighbours(nh []*Neighbour) { - if c.NeighbourLimit > 0 && len(nh) > c.NeighbourLimit { - nh = c.filterNeighbours(nh) - } - - for _, neighbour := range nh { - check := func(ctx context.Context) (string, error) { - if c.UseTLS { - return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy", true) - } - - return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy", true) - } - - c.measure(check, "path_"+neighbour.NodeName) - } -} - func (c *Checker) filterNeighbours(nh []*Neighbour) []*Neighbour { m := make(map[uint64]*Neighbour, c.NeighbourLimit+1) diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index 85e55bf..5197176 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -24,7 +25,7 @@ const ( // New configures the checker with a httpClient and a cache timeout for check // results. Other parameters of the Checker struct need to be configured separately. -func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, +func New(cl client.Client, promRegistry *prometheus.Registry, allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) { // setup http transport tlsConfig, err := generateTLSConfig(os.Getenv("KUBENURSE_EXTRA_CA")) @@ -62,67 +63,82 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, client: cl, httpClient: httpClient, cacheTTL: cacheTTL, - stop: make(chan struct{}), + ExtraChecks: make(map[string]string), }, nil } // Run runs all servicechecks and returns the result togeter with a boolean which indicates success. The cache // is respected. -func (c *Checker) Run() { +func (c *Checker) Run(ctx context.Context) { // Run Checks - res := Result{} + result := sync.Map{} - res.APIServerDirect = c.measure(c.APIServerDirect, "api_server_direct") - res.APIServerDNS = c.measure(c.APIServerDNS, "api_server_dns") - res.MeIngress = c.measure(c.MeIngress, "me_ingress") - res.MeService = c.measure(c.MeService, "me_service") + wg := sync.WaitGroup{} + + // Cache result (used for /alive handler) + defer func() { + res := make(map[string]any) + + result.Range(func(key, value any) bool { + k, _ := key.(string) + res[k] = value + + return true + }) + + c.LastCheckResult = res + }() + + wg.Add(4) + + go c.measure(ctx, &wg, &result, c.APIServerDirect, APIServerDirect) + go c.measure(ctx, &wg, &result, c.APIServerDNS, APIServerDNS) + go c.measure(ctx, &wg, &result, c.MeIngress, meIngress) + go c.measure(ctx, &wg, &result, c.MeService, meService) + + wg.Add(len(c.ExtraChecks)) + + for metricName, url := range c.ExtraChecks { + go c.measure(ctx, &wg, &result, + func(ctx context.Context) string { return c.doRequest(ctx, url, false) }, + metricName) + } if c.SkipCheckNeighbourhood { - res.NeighbourhoodState = skippedStr - } else { - var err error - res.Neighbourhood, err = c.GetNeighbours(context.Background(), c.KubenurseNamespace, c.NeighbourFilter) - - // Neighbourhood special error treating - if err != nil { - res.NeighbourhoodState = err.Error() - } else { - res.NeighbourhoodState = okStr - - // Check all neighbours if the neighbourhood was discovered - c.checkNeighbours(res.Neighbourhood) - } + result.Store(NeighbourhoodState, skippedStr) + return } - // Cache result (used for /alive handler) - c.LastCheckResult = &res -} + neighbours, err := c.getNeighbours(ctx, c.KubenurseNamespace, c.NeighbourFilter) + if err != nil { + result.Store(NeighbourhoodState, err.Error()) + return + } + + result.Store(NeighbourhoodState, okStr) + result.Store(Neighbourhood, neighbours) + + if c.NeighbourLimit > 0 && len(neighbours) > c.NeighbourLimit { + neighbours = c.filterNeighbours(neighbours) + } -// RunScheduled runs the checks in the specified interval which can be used to keep the metrics up-to-date. This -// function does not return until StopScheduled is called. -func (c *Checker) RunScheduled(d time.Duration) { - ticker := time.NewTicker(d) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.Run() - case <-c.stop: - return + wg.Add((len(neighbours))) + + for _, neighbour := range neighbours { + check := func(ctx context.Context) string { + return c.doRequest(ctx, podIPtoURL(neighbour.PodIP, c.UseTLS), true) } + + go c.measure(ctx, &wg, &result, check, "path_"+neighbour.NodeName) } -} -// StopScheduled is used to stop the scheduled run of checks. -func (c *Checker) StopScheduled() { - close(c.stop) + wg.Wait() } // APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link -func (c *Checker) APIServerDirect(ctx context.Context) (string, error) { +func (c *Checker) APIServerDirect(ctx context.Context) string { if c.SkipCheckAPIServerDirect { - return skippedStr, nil + return skippedStr } apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort) @@ -131,9 +147,9 @@ func (c *Checker) APIServerDirect(ctx context.Context) (string, error) { } // APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL -func (c *Checker) APIServerDNS(ctx context.Context) (string, error) { +func (c *Checker) APIServerDNS(ctx context.Context) string { if c.SkipCheckAPIServerDNS { - return skippedStr, nil + return skippedStr } apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort) @@ -142,31 +158,37 @@ func (c *Checker) APIServerDNS(ctx context.Context) (string, error) { } // MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress -func (c *Checker) MeIngress(ctx context.Context) (string, error) { +func (c *Checker) MeIngress(ctx context.Context) string { if c.SkipCheckMeIngress { - return skippedStr, nil + return skippedStr } return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy", false) //nolint:goconst // readability } // MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service -func (c *Checker) MeService(ctx context.Context) (string, error) { +func (c *Checker) MeService(ctx context.Context) string { if c.SkipCheckMeService { - return skippedStr, nil + return skippedStr } return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy", false) } // measure implements metric collections for the check -func (c *Checker) measure(check Check, requestType string) string { +func (c *Checker) measure(ctx context.Context, wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) { // Add our label (check type) to the context so our http tracer can annotate // metrics and errors based with the label - ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, requestType) + defer wg.Done() - // Execute check - res, _ := check(ctx) // this error is ignored as it is already logged in httptrace + ctx = context.WithValue(ctx, kubenurseTypeKey{}, requestType) + res.Store(requestType, check(ctx)) +} + +func podIPtoURL(podIP string, useTLS bool) string { + if useTLS { + return "https://" + podIP + ":8443/alwayshappy" + } - return res + return "http://" + podIP + ":8080/alwayshappy" } diff --git a/internal/servicecheck/servicecheck_test.go b/internal/servicecheck/servicecheck_test.go index 0e2ec1f..c7a6efd 100644 --- a/internal/servicecheck/servicecheck_test.go +++ b/internal/servicecheck/servicecheck_test.go @@ -36,29 +36,19 @@ func TestCombined(t *testing.T) { // fake client, with a dummy neighbour pod fakeClient := fake.NewFakeClient(&fakeNeighbourPod) - checker, err := New(context.Background(), fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets) + checker, err := New(fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets) + checker.ExtraChecks = map[string]string{ + "check_number_two": "http://interesting.endpoint:8080/abcd", + "cloudy_check": "http://cloudy.enpdoint:1234/test", + } r.NoError(err) r.NotNil(checker) t.Run("run", func(t *testing.T) { r := require.New(t) - checker.Run() + checker.Run(context.Background()) - r.Len(checker.LastCheckResult.Neighbourhood, 1) - }) - - t.Run("scheduled", func(t *testing.T) { - stopped := make(chan struct{}) - - go func() { - // blocks until StopScheduled() - checker.RunScheduled(time.Second * 5) - - close(stopped) - }() - - checker.StopScheduled() - - <-stopped + r.Equal(okStr, checker.LastCheckResult[NeighbourhoodState]) + r.Equal(errStr, checker.LastCheckResult["cloudy_check"]) // test extra endpoint functionality }) } diff --git a/internal/servicecheck/transport.go b/internal/servicecheck/transport.go index 06c445b..996f3da 100644 --- a/internal/servicecheck/transport.go +++ b/internal/servicecheck/transport.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "log/slog" "net/http" "os" "strings" @@ -18,11 +19,12 @@ const ( ) // doRequest does an http request only to get the http status code -func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader bool) (string, error) { +func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader bool) string { // Read Bearer Token file from ServiceAccount token, err := os.ReadFile(K8sTokenFile) if err != nil { - return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err) + slog.Error("error in doRequest while reading k8sTokenFile", "err", err) + return errStr } req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) @@ -39,17 +41,17 @@ func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader boo resp, err := c.httpClient.Do(req) if err != nil { - return err.Error(), err + return err.Error() } // Body is non-nil if err is nil, so close it _ = resp.Body.Close() if resp.StatusCode == http.StatusOK { - return okStr, nil + return okStr } - return resp.Status, errors.New(resp.Status) + return resp.Status } // generateTLSConfig returns a TLSConfig including K8s CA and the user-defined extraCA diff --git a/internal/servicecheck/types.go b/internal/servicecheck/types.go index 09bea73..9a6f22f 100644 --- a/internal/servicecheck/types.go +++ b/internal/servicecheck/types.go @@ -8,6 +8,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + NeighbourhoodState = "neighbourhood_state" + Neighbourhood = "neighbourhood" + meService = "me_service" + meIngress = "me_ingress" + APIServerDirect = "api_server_direct" + APIServerDNS = "api_server_dns" +) + // Checker implements the kubenurse checker type Checker struct { // Ingress and service config @@ -16,7 +25,7 @@ type Checker struct { SkipCheckMeIngress bool SkipCheckMeService bool - // shutdownDuration defines the time during which kubenurse will wait before stopping + // shutdownDuration defines the time during which kubenurse will accept https requests during shutdown ShutdownDuration time.Duration // Kubernetes API @@ -32,6 +41,9 @@ type Checker struct { allowUnschedulable bool SkipCheckNeighbourhood bool + // Additional endpoints + ExtraChecks map[string]string + // TLS UseTLS bool @@ -42,24 +54,11 @@ type Checker struct { httpClient *http.Client // LastCheckResult represents a cached check result - LastCheckResult *Result + LastCheckResult map[string]any // cacheTTL defines the TTL of how long a cached result is valid cacheTTL time.Duration - - // stop is used to cancel RunScheduled - stop chan struct{} -} - -// Result contains the result of a performed check run -type Result struct { - APIServerDirect string `json:"api_server_direct"` - APIServerDNS string `json:"api_server_dns"` - MeIngress string `json:"me_ingress"` - MeService string `json:"me_service"` - NeighbourhoodState string `json:"neighbourhood_state"` - Neighbourhood []*Neighbour `json:"neighbourhood"` } // Check is the signature used by all checks that the checker can execute. -type Check func(ctx context.Context) (string, error) +type Check func(ctx context.Context) string diff --git a/main.go b/main.go index 2fe7c87..3829ded 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/postfinance/kubenurse/internal/kubenurse" corev1 "k8s.io/api/core/v1" @@ -62,7 +61,7 @@ func main() { return } - server, err := kubenurse.New(ctx, c) + server, err := kubenurse.New(c) if err != nil { slog.Error("error in kubenurse.New call", "err", err) return @@ -73,17 +72,13 @@ func main() { slog.Info("shutting down, received signal to stop") - // background ctx since, the "root" context is already canceled - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownCancel() - - if err := server.Shutdown(shutdownCtx); err != nil { + if err := server.Shutdown(); err != nil { slog.Error("error during graceful shutdown", "err", err) } }() // blocks, until the server is stopped by calling Shutdown() - if err := server.Run(); err != nil { + if err := server.Run(ctx); err != nil { slog.Error("error while running kubenurse", "err", err) } }