Skip to content

Commit

Permalink
fix remediation
Browse files Browse the repository at this point in the history
Signed-off-by: nasusoba <[email protected]>

fix lint

Signed-off-by: nasusoba <[email protected]>

fix comment
  • Loading branch information
nasusoba committed May 14, 2024
1 parent 0527d86 commit 8b823ce
Show file tree
Hide file tree
Showing 13 changed files with 645 additions and 70 deletions.
29 changes: 2 additions & 27 deletions bootstrap/controllers/kthreesconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/k3s-io/cluster-api-k3s/pkg/cloudinit"
"github.com/k3s-io/cluster-api-k3s/pkg/etcd"
"github.com/k3s-io/cluster-api-k3s/pkg/k3s"
"github.com/k3s-io/cluster-api-k3s/pkg/kubeconfig"
"github.com/k3s-io/cluster-api-k3s/pkg/locking"
"github.com/k3s-io/cluster-api-k3s/pkg/secret"
"github.com/k3s-io/cluster-api-k3s/pkg/token"
Expand Down Expand Up @@ -177,9 +176,6 @@ func (r *KThreesConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
log.Info("Cluster infrastructure is not ready, waiting")
conditions.MarkFalse(config, bootstrapv1.DataSecretAvailableCondition, bootstrapv1.WaitingForClusterInfrastructureReason, clusterv1.ConditionSeverityInfo, "")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
// Migrate plaintext data to secret.
case config.Status.BootstrapData != nil && config.Status.DataSecretName == nil:
return ctrl.Result{}, r.storeBootstrapData(ctx, scope, config.Status.BootstrapData)
// Reconcile status for machines that already have a secret reference, but our status isn't up to date.
// This case solves the pivoting scenario (or a backup restore) which doesn't preserve the status subresource on objects.
case configOwner.DataSecretName() != nil && (!config.Status.Ready || config.Status.DataSecretName == nil):
Expand Down Expand Up @@ -500,13 +496,12 @@ func (r *KThreesConfigReconciler) handleClusterNotInitialized(ctx context.Contex
return ctrl.Result{}, err
}

// TODO: move to controlplane provider
return r.reconcileKubeconfig(ctx, scope)
return ctrl.Result{}, nil
}

func (r *KThreesConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
if r.KThreesInitLock == nil {
r.KThreesInitLock = locking.NewControlPlaneInitMutex(ctrl.Log.WithName("init-locker"), mgr.GetClient())
r.KThreesInitLock = locking.NewControlPlaneInitMutex(mgr.GetClient())
}

return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -558,26 +553,6 @@ func (r *KThreesConfigReconciler) storeBootstrapData(ctx context.Context, scope
return nil
}

func (r *KThreesConfigReconciler) reconcileKubeconfig(ctx context.Context, scope *Scope) (ctrl.Result, error) {
logger := r.Log.WithValues("cluster", scope.Cluster.Name, "namespace", scope.Cluster.Namespace)

_, err := secret.Get(ctx, r.Client, util.ObjectKey(scope.Cluster), secret.Kubeconfig)
switch {
case apierrors.IsNotFound(err):
if err := kubeconfig.CreateSecret(ctx, r.Client, scope.Cluster); err != nil {
if errors.Is(err, kubeconfig.ErrDependentCertificateNotFound) {
logger.Info("could not find secret for cluster, requeuing", "secret", secret.ClusterCA)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
return ctrl.Result{}, err
}
case err != nil:
return ctrl.Result{}, fmt.Errorf("failed to retrieve Kubeconfig Secret for Cluster %q in namespace %q: %w", scope.Cluster.Name, scope.Cluster.Namespace, err)
}

return ctrl.Result{}, nil
}

func (r *KThreesConfigReconciler) reconcileTopLevelObjectSettings(_ *clusterv1.Cluster, machine *clusterv1.Machine, config *bootstrapv1.KThreesConfig) {
log := r.Log.WithValues("kthreesconfig", fmt.Sprintf("%s/%s", config.Namespace, config.Name))

Expand Down
33 changes: 30 additions & 3 deletions controlplane/controllers/kthreescontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ func (r *KThreesControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
// because the main defer may take too much time to get cluster status
// Patch ObservedGeneration only if the reconciliation completed successfully
patchOpts := []patch.Option{}
if err == nil {
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
}
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
if err := patchHelper.Patch(ctx, kcp, patchOpts...); err != nil {
logger.Error(err, "Failed to patch KThreesControlPlane to add finalizer")
return reconcile.Result{}, err
Expand Down Expand Up @@ -403,6 +401,35 @@ func (r *KThreesControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
conditions.MarkTrue(kcp, controlplanev1.AvailableCondition)
}

// Surface lastRemediation data in status.
// LastRemediation is the remediation currently in progress, in any, or the
// most recent of the remediation we are keeping track on machines.
var lastRemediation *RemediationData

if v, ok := controlPlane.KCP.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
}
lastRemediation = remediationData
} else {
for _, m := range controlPlane.Machines.UnsortedList() {
if v, ok := m.Annotations[controlplanev1.RemediationForAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
}
if lastRemediation == nil || lastRemediation.Timestamp.Time.Before(remediationData.Timestamp.Time) {
lastRemediation = remediationData
}
}
}
}

if lastRemediation != nil {
controlPlane.KCP.Status.LastRemediation = lastRemediation.ToStatus()
}

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

// if machine registered PreTerminate hook, wait for capi to drain and deattach volume, then remove etcd member
// if machine registered PreTerminate hook, wait for capi asks to resolve PreTerminateDeleteHook
if annotations.HasWithPrefix(clusterv1.PreTerminateDeleteHookAnnotationPrefix, m.ObjectMeta.Annotations) &&
m.ObjectMeta.Annotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] == k3sHookName {
if !conditions.IsTrue(m, clusterv1.DrainingSucceededCondition) || !conditions.IsTrue(m, clusterv1.VolumeDetachSucceededCondition) {
if !conditions.IsFalse(m, clusterv1.PreTerminateDeleteHookSucceededCondition) {
logger.Info("wait for machine drain and detech volume operation complete.")
return ctrl.Result{}, nil
}
Expand All @@ -106,7 +106,9 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Info("wait k3s embedded etcd controller to remove etcd")
return ctrl.Result{Requeue: true}, err
}
logger.Info("etcd remove etcd member succeeded", "node", m.Status.NodeRef.Name)

// It is possible that the machine has no machine ref yet, will record the machine name in log
logger.Info("etcd remove etcd member succeeded", "machine name", m.Name)

patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
Expand Down
80 changes: 68 additions & 12 deletions controlplane/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -41,7 +42,7 @@ import (
// reconcileUnhealthyMachines tries to remediate KThreesControlPlane unhealthy machines
// based on the process described in https://github.com/kubernetes-sigs/cluster-api/blob/main/docs/proposals/20191017-kubeadm-based-control-plane.md#remediation-using-delete-and-recreate
// taken from the kubeadm codebase and adapted for the k3s provider.
func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Context, controlPlane *k3s.ControlPlane) (ret ctrl.Result, retErr error) {
func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Context, controlPlane *k3s.ControlPlane) (ret ctrl.Result, retErr error) { //nolint:gocyclo
log := ctrl.LoggerFrom(ctx)
reconciliationTime := time.Now().UTC()

Expand Down Expand Up @@ -81,12 +82,13 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
return ctrl.Result{}, nil
}

// Select the machine to be remediated, which is the oldest machine marked as unhealthy.
// Select the machine to be remediated, which is the oldest machine marked as unhealthy not yet provisioned (if any)
// or the oldest machine marked as unhealthy.
//
// NOTE: The current solution is considered acceptable for the most frequent use case (only one unhealthy machine),
// however, in the future this could potentially be improved for the scenario where more than one unhealthy machine exists
// by considering which machine has lower impact on etcd quorum.
machineToBeRemediated := unhealthyMachines.Oldest()
machineToBeRemediated := getMachineToBeRemediated(unhealthyMachines)

// Returns if the machine is in the process of being deleted.
if !machineToBeRemediated.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -147,6 +149,13 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
return ctrl.Result{}, nil
}

// The cluster MUST NOT have healthy machines still being provisioned. This rule prevents KCP taking actions while the cluster is in a transitional state.
if controlPlane.HasHealthyMachineStillProvisioning() {
log.Info("A control plane machine needs remediation, but there are other control-plane machines being provisioned. Skipping remediation")
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.WaitingForRemediationReason, clusterv1.ConditionSeverityWarning, "KCP waiting for control plane machine provisioning to complete before triggering remediation")
return ctrl.Result{}, nil
}

// The cluster MUST have no machines with a deletion timestamp. This rule prevents KCP taking actions while the cluster is in a transitional state.
if controlPlane.HasDeletingMachine() {
log.Info("A control plane machine needs remediation, but there are other control-plane machines being deleted. Skipping remediation")
Expand All @@ -157,7 +166,11 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
// Remediation MUST preserve etcd quorum. This rule ensures that KCP will not remove a member that would result in etcd
// losing a majority of members and thus become unable to field new requests.
if controlPlane.IsEtcdManaged() {
canSafelyRemediate := r.canSafelyRemoveEtcdMember(ctx, controlPlane, machineToBeRemediated)
canSafelyRemediate, err := r.canSafelyRemoveEtcdMember(ctx, controlPlane, machineToBeRemediated)
if err != nil {
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err
}
if !canSafelyRemediate {
log.Info("A control plane machine needs remediation, but removing this machine could result in etcd quorum loss. Skipping remediation")
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.WaitingForRemediationReason, clusterv1.ConditionSeverityWarning, "KCP can't remediate this machine because this could result in etcd loosing quorum")
Expand Down Expand Up @@ -232,6 +245,16 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
return ctrl.Result{Requeue: true}, nil
}

// Gets the machine to be remediated, which is the oldest machine marked as unhealthy not yet provisioned (if any)
// or the oldest machine marked as unhealthy.
func getMachineToBeRemediated(unhealthyMachines collections.Machines) *clusterv1.Machine {
machineToBeRemediated := unhealthyMachines.Filter(collections.Not(collections.HasNode())).Oldest()
if machineToBeRemediated == nil {
machineToBeRemediated = unhealthyMachines.Oldest()
}
return machineToBeRemediated
}

// checkRetryLimits checks if KCP is allowed to remediate considering retry limits:
// - Remediation cannot happen because retryPeriod is not yet expired.
// - KCP already reached the maximum number of retries for a machine.
Expand Down Expand Up @@ -346,10 +369,23 @@ func max(x, y time.Duration) time.Duration {
// as well as reconcileControlPlaneConditions before this.
//
// adapted from kubeadm controller and makes the assumption that the set of controplane nodes equals the set of etcd nodes.
func (r *KThreesControlPlaneReconciler) canSafelyRemoveEtcdMember(ctx context.Context, controlPlane *k3s.ControlPlane, machineToBeRemediated *clusterv1.Machine) bool {
func (r *KThreesControlPlaneReconciler) canSafelyRemoveEtcdMember(ctx context.Context, controlPlane *k3s.ControlPlane, machineToBeRemediated *clusterv1.Machine) (bool, error) {
log := ctrl.LoggerFrom(ctx)

currentTotalMembers := len(controlPlane.Machines)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
return false, errors.Wrapf(err, "failed to get client for workload cluster %s", controlPlane.Cluster.Name)
}

// Gets the etcd status

// This makes it possible to have a set of etcd members status different from the MHC unhealthy/unhealthy conditions.
etcdMembers, err := workloadCluster.EtcdMembers(ctx)
if err != nil {
return false, errors.Wrapf(err, "failed to get etcdStatus for workload cluster %s", controlPlane.Cluster.Name)
}

currentTotalMembers := len(etcdMembers)

log.Info("etcd cluster before remediation",
"currentTotalMembers", currentTotalMembers)
Expand All @@ -360,23 +396,43 @@ func (r *KThreesControlPlaneReconciler) canSafelyRemoveEtcdMember(ctx context.Co

healthyMembers := []string{}
unhealthyMembers := []string{}
for _, m := range controlPlane.Machines {
for _, etcdMember := range etcdMembers {
// Skip the machine to be deleted because it won't be part of the target etcd cluster.
if machineToBeRemediated.Status.NodeRef != nil && machineToBeRemediated.Status.NodeRef.Name == m.Status.NodeRef.Name {
if machineToBeRemediated.Status.NodeRef != nil && machineToBeRemediated.Status.NodeRef.Name == etcdMember {
continue
}

// Include the member in the target etcd cluster.
targetTotalMembers++

// Search for the machine corresponding to the etcd member.
var machine *clusterv1.Machine
for _, m := range controlPlane.Machines {
if m.Status.NodeRef != nil && m.Status.NodeRef.Name == etcdMember {
machine = m
break
}
}

// If an etcd member does not have a corresponding machine it is not possible to retrieve etcd member health,
// so KCP is assuming the worst scenario and considering the member unhealthy.
//
// NOTE: This should not happen given that KCP is running reconcileEtcdMembers before calling this method.
if machine == nil {
log.Info("An etcd member does not have a corresponding machine, assuming this member is unhealthy", "MemberName", etcdMember)
targetUnhealthyMembers++
unhealthyMembers = append(unhealthyMembers, fmt.Sprintf("%s (no machine)", etcdMember))
continue
}

// Check member health as reported by machine's health conditions
if !conditions.IsTrue(m, controlplanev1.MachineEtcdMemberHealthyCondition) {
if !conditions.IsTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) {
targetUnhealthyMembers++
unhealthyMembers = append(unhealthyMembers, fmt.Sprintf("%s (%s)", m.Status.NodeRef.Name, m.Name))
unhealthyMembers = append(unhealthyMembers, fmt.Sprintf("%s (%s)", etcdMember, machine.Name))
continue
}

healthyMembers = append(healthyMembers, fmt.Sprintf("%s (%s)", m.Status.NodeRef.Name, m.Name))
healthyMembers = append(healthyMembers, fmt.Sprintf("%s (%s)", etcdMember, machine.Name))
}

// See https://etcd.io/docs/v3.3/faq/#what-is-failure-tolerance for fault tolerance formula explanation.
Expand All @@ -391,7 +447,7 @@ func (r *KThreesControlPlaneReconciler) canSafelyRemoveEtcdMember(ctx context.Co
"targetUnhealthyMembers", targetUnhealthyMembers,
"canSafelyRemediate", canSafelyRemediate)

return canSafelyRemediate
return canSafelyRemediate, nil
}

// RemediationData struct is used to keep track of information stored in the RemediationInProgressAnnotation in KCP
Expand Down
17 changes: 16 additions & 1 deletion controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,31 @@ func (r *KThreesControlPlaneReconciler) generateMachine(ctx context.Context, kcp
},
}

annotations := map[string]string{}

// Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane.
// We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any.
serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig)
if err != nil {
return fmt.Errorf("failed to marshal cluster configuration: %w", err)
}
machine.SetAnnotations(map[string]string{controlplanev1.KThreesServerConfigurationAnnotation: string(serverConfig)})
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig)

// In case this machine is being created as a consequence of a remediation, then add an annotation
// tracking remediating data.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
}

machine.SetAnnotations(annotations)

if err := r.Client.Create(ctx, machine); err != nil {
return fmt.Errorf("failed to create machine: %w", err)
}

// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil
}
5 changes: 5 additions & 0 deletions pkg/k3s/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ func (c *ControlPlane) NeedsReplacementNode() bool {
return len(c.Machines)+1 == int(*c.KCP.Spec.Replicas)
}

// HasHealthyMachineStillProvisioning returns true if any healthy machine in the control plane is still in the process of being provisioned.
func (c *ControlPlane) HasHealthyMachineStillProvisioning() bool {
return len(c.HealthyMachines().Filter(collections.Not(collections.HasNode()))) > 0
}

// HasDeletingMachine returns true if any machine in the control plane is in the process of being deleted.
func (c *ControlPlane) HasDeletingMachine() bool {
return len(c.Machines.Filter(collections.HasDeletionTimestamp)) > 0
Expand Down
39 changes: 37 additions & 2 deletions pkg/k3s/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package k3s

import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -167,7 +166,8 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) (bool,
}

if removingNode.Name != name {
return false, errors.New(fmt.Sprintf("node %s not found", name))
// If the node is already removed, treat it as the etcd member is removed.
return true, nil
}

annotations := removingNode.GetAnnotations()
Expand Down Expand Up @@ -236,3 +236,38 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
}
return nil
}

// EtcdMembers returns the current set of members in an etcd cluster.
// It will convert the etcd members to a list of node names,
// and return a list of node names.
//
// NOTE: This methods uses control plane machines/nodes only to get in contact with etcd,
// but then it relies on etcd as ultimate source of truth for the list of members.
// This is intended to allow informed decisions on actions impacting etcd quorum.
func (w *Workload) EtcdMembers(ctx context.Context) ([]string, error) {
nodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to list control plane nodes")
}
nodeNames := make([]string, 0, len(nodes.Items))
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames)
if err != nil {
return nil, errors.Wrap(err, "failed to create etcd client")
}
defer etcdClient.Close()

members, err := etcdClient.Members(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to list etcd members using etcd client")
}

names := []string{}
for _, member := range members {
// Convert etcd member to node name
names = append(names, etcdutil.NodeNameFromMember(member))
}
return names, nil
}
Loading

0 comments on commit 8b823ce

Please sign in to comment.