Skip to content

Commit

Permalink
Extract operations into internal pkg
Browse files Browse the repository at this point in the history
This reduces the global variables, and regroups all the
operations in a single place. This will allow further
refactor to represent all the k8s operations kured needs
on a single node.

Signed-off-by: Jean-Philippe Evrard <[email protected]>
  • Loading branch information
evrardjp committed Nov 11, 2024
1 parent fab030e commit b6d3ba5
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 208 deletions.
214 changes: 9 additions & 205 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package main

import (
"context"
"encoding/json"
"fmt"
"github.com/kubereboot/kured/internal/daemonsetlock"
"github.com/kubereboot/kured/internal/k8soperations"
"github.com/kubereboot/kured/internal/notifications"
"github.com/kubereboot/kured/internal/taints"
"github.com/kubereboot/kured/internal/timewindow"
"github.com/kubereboot/kured/pkg/blockers"
"github.com/kubereboot/kured/pkg/checkers"
Expand All @@ -17,10 +16,8 @@ import (
flag "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubectldrain "k8s.io/kubectl/pkg/drain"
"log"
"log/slog"
"net/http"
Expand Down Expand Up @@ -92,8 +89,7 @@ var (

const (
// KuredNodeLockAnnotation is the canonical string value for the kured node-lock annotation
KuredNodeLockAnnotation string = "kured.dev/kured-node-lock"
KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain"
KuredNodeLockAnnotation string = "kured.dev/kured-node-lock"
// KuredRebootInProgressAnnotation is the canonical string value for the kured reboot-in-progress annotation
KuredRebootInProgressAnnotation string = "kured.dev/kured-reboot-in-progress"
// KuredMostRecentRebootNeededAnnotation is the canonical string value for the kured most-recent-reboot-needed annotation
Expand Down Expand Up @@ -339,202 +335,9 @@ func LoadFromEnv() {

}

type slogWriter struct {
stream string
message string
}

func (sw slogWriter) Write(p []byte) (n int, err error) {
output := string(p)
switch sw.stream {
case "stdout":
slog.Info(sw.message, "node", nodeID, "stdout", output)
case "stderr":
slog.Info(sw.message, "node", nodeID, "stderr", output)
}
return len(p), nil
}

func drain(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error {
nodename := node.GetName()

if preRebootNodeLabels != nil {
err := updateNodeLabels(client, node, preRebootNodeLabels)
if err != nil {
return fmt.Errorf("stopping drain due to problem with node labels %v", err)
}
}

if drainDelay > 0 {
slog.Debug("Delaying drain", "delay", drainDelay, "node", nodename)
time.Sleep(drainDelay)
}

slog.Info("Starting drain", "node", nodename)

notifier.Send(fmt.Sprintf(messageTemplateDrain, nodename), "Starting drain")

kubectlStdOutLogger := &slogWriter{message: "draining: results", stream: "stdout"}
kubectlStdErrLogger := &slogWriter{message: "draining: results", stream: "stderr"}

drainer := &kubectldrain.Helper{
Client: client,
Ctx: context.Background(),
GracePeriodSeconds: drainGracePeriod,
PodSelector: drainPodSelector,
SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds,
Force: true,
DeleteEmptyDirData: true,
IgnoreAllDaemonSets: true,
ErrOut: kubectlStdErrLogger,
Out: kubectlStdOutLogger,
Timeout: drainTimeout,
}

// Add previous state of the node Spec.Unschedulable into an annotation
// If an annotation was present, it means that either the cordon or drain failed,
// hence it does not need to reapply: It might override what the user has set
// (for example if the cordon succeeded but the drain failed)
if _, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]; !ok {
// Store State of the node before cordon changes it
annotations := map[string]string{KuredNodeWasUnschedulableBeforeDrainAnnotation: strconv.FormatBool(node.Spec.Unschedulable)}
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
err := addNodeAnnotations(client, nodeID, annotations)
if err != nil {
return fmt.Errorf("error saving state of the node %s, %v", nodename, err)
}
}

if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
return fmt.Errorf("error cordonning node %s, %v", nodename, err)
}

if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil {
return fmt.Errorf("error draining node %s: %v", nodename, err)
}
return nil
}

func uncordon(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error {
// Revert cordon spec change with the help of node annotation
annotationContent, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]
if !ok {
// If no node annotations, uncordon will not act.
// Do not uncordon if you do not know previous state, it could bring nodes under maintenance online!
return nil
}

wasUnschedulable, err := strconv.ParseBool(annotationContent)
if err != nil {
return fmt.Errorf("annotation was edited and cannot be converted back to bool %v, cannot uncordon (unrecoverable)", err)
}

if wasUnschedulable {
// Just delete the annotation, keep Cordonned
err := deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation)
if err != nil {
return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in cordonned state forever %v", err)
}
return nil
}

nodeName := node.GetName()
kubectlStdOutLogger := &slogWriter{message: "uncordon: results", stream: "stdout"}
kubectlStdErrLogger := &slogWriter{message: "uncordon: results", stream: "stderr"}

drainer := &kubectldrain.Helper{
Client: client,
ErrOut: kubectlStdErrLogger,
Out: kubectlStdOutLogger,
Ctx: context.Background(),
}
if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil {
return fmt.Errorf("error uncordonning node %s: %v", nodeName, err)
} else if postRebootNodeLabels != nil {
err := updateNodeLabels(client, node, postRebootNodeLabels)
return fmt.Errorf("error updating node (%s) labels, needs manual intervention %v", nodeName, err)
}

err = deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation)
if err != nil {
return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in current state forever %v", err)
}
notifier.Send(fmt.Sprintf(messageTemplateUncordon, nodeID), "Node uncordonned successfully")
return nil
}

func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error retrieving node object via k8s API: %v", err)
}
for k, v := range annotations {
node.Annotations[k] = v
slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName())
}

bytes, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("error marshalling node object into JSON: %v", err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
var annotationsErr string
for k, v := range annotations {
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
}
return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err)
}
return nil
}

func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error {
// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
// So we replace all instances of "/" with "~1" as per:
// https://tools.ietf.org/html/rfc6901#section-3
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err)
}
return nil
}

func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error {
labelsMap := make(map[string]string)
for _, label := range labels {
k := strings.Split(label, "=")[0]
v := strings.Split(label, "=")[1]
labelsMap[k] = v
slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName())
}

bytes, err := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"labels": labelsMap,
},
})
if err != nil {
return fmt.Errorf("error marshalling node object into JSON: %v", err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
var labelsErr string
for _, label := range labels {
k := strings.Split(label, "=")[0]
v := strings.Split(label, "=")[1]
labelsErr += fmt.Sprintf("%s=%s ", k, v)
}
return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err)
}
return nil
}

func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset, period time.Duration, notifier notifications.Notifier) {

preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)
preferNoScheduleTaint := k8soperations.NewTaint(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)

// No reason to delay the first ticks.
// On top of it, we used to leak a goroutine, which was never garbage collected.
Expand All @@ -559,7 +362,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
continue
}

err = uncordon(client, node, notifier)
err = k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon)
if err != nil {
// Might be a transient API issue or a real problem. Inform the admin
slog.Info("unable to uncordon needs investigation", "node", nodeID, "error", err)
Expand All @@ -579,7 +382,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
// Who reads this? I hope nobody bothers outside real debug cases
slog.Debug(fmt.Sprintf("Deleting node %s annotation %s", nodeID, KuredRebootInProgressAnnotation), "node", nodeID)
err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
err := k8soperations.DeleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
if err != nil {
continue
}
Expand Down Expand Up @@ -627,7 +430,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString}
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString
err := addNodeAnnotations(client, nodeID, annotations)
err := k8soperations.AddNodeAnnotations(client, nodeID, annotations)
if err != nil {
continue
}
Expand Down Expand Up @@ -664,7 +467,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
// }
//}

err = drain(client, node, notifier)
err = k8soperations.Drain(client, node, preRebootNodeLabels, drainTimeout, drainGracePeriod, skipWaitForDeleteTimeoutSeconds, drainPodSelector, drainDelay, messageTemplateDrain, notifier)

if err != nil {
if !forceReboot {
slog.Debug(fmt.Sprintf("Unable to cordon or drain %s: %v, will force-reboot by releasing lock and uncordon until next success", node.GetName(), err), "node", nodeID, "error", err)
Expand All @@ -677,7 +481,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
// If shown, it is helping understand the uncordonning. If the admin seems the node as cordonned
// with this, it needs to take action (for example if the node was previously cordonned!)
slog.Info("Performing a best-effort uncordon after failed cordon and drain", "node", nodeID)
err := uncordon(client, node, notifier)
err := k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon)
if err != nil {
slog.Info("Uncordon failed", "error", err)
}
Expand Down
86 changes: 86 additions & 0 deletions internal/k8soperations/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package k8soperations

import (
"context"
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"log/slog"
"strings"
)

const (
KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain"
)

func AddNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error retrieving node object via k8s API: %v", err)
}
for k, v := range annotations {
node.Annotations[k] = v
slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName())
}

bytes, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("error marshalling node object into JSON: %v", err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
var annotationsErr string
for k, v := range annotations {
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
}
return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err)
}
return nil
}

func DeleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error {
// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
// So we replace all instances of "/" with "~1" as per:
// https://tools.ietf.org/html/rfc6901#section-3
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err)
}
return nil
}

func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error {
labelsMap := make(map[string]string)
for _, label := range labels {
k := strings.Split(label, "=")[0]
v := strings.Split(label, "=")[1]
labelsMap[k] = v
slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName())
}

bytes, err := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"labels": labelsMap,
},
})
if err != nil {
return fmt.Errorf("error marshalling node object into JSON: %v", err)
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
var labelsErr string
for _, label := range labels {
k := strings.Split(label, "=")[0]
v := strings.Split(label, "=")[1]
labelsErr += fmt.Sprintf("%s=%s ", k, v)
}
return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err)
}
return nil
}
Loading

0 comments on commit b6d3ba5

Please sign in to comment.