Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify node removal logic with removal annotation [Etcd proxy] #103

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controlplane/controllers/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ const (
// dependentCertRequeueAfter is how long to wait before checking again to see if
// dependent certificates have been created.
dependentCertRequeueAfter = 30 * time.Second

k3sHookName = "k3s"
)
59 changes: 55 additions & 4 deletions controlplane/controllers/kthreescontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -480,9 +480,9 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Ensures the number of etcd members is in sync with the number of machines/nodes.
// NOTE: This is usually required after a machine deletion.
// if result, err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil || !result.IsZero() {
// return result, err
// }
if err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil {
return reconcile.Result{}, err
}

// Reconcile unhealthy machines by triggering deletion and requeue if it is considered safe to remediate,
// otherwise continue with the other KCP operations.
Expand Down Expand Up @@ -655,6 +655,57 @@ func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx cont
return nil
}

// reconcileEtcdMembers ensures the number of etcd members is in sync with the number of machines/nodes.
// This is usually required after a machine deletion.
//
// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this.
func (r *KThreesControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *k3s.ControlPlane) error {
log := ctrl.LoggerFrom(ctx)

// If etcd is not managed by KCP this is a no-op.
if !controlPlane.IsEtcdManaged() {
return nil
}

// If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet.
if controlPlane.Machines.Len() == 0 {
return nil
}

// Collect all the node names.
nodeNames := []string{}
for _, machine := range controlPlane.Machines {
if machine.Status.NodeRef == nil {
// If there are provisioning machines (machines without a node yet), return.
return nil
}
nodeNames = append(nodeNames, machine.Status.NodeRef.Name)
}

// Potential inconsistencies between the list of members and the list of machines/nodes are
// surfaced using the EtcdClusterHealthyCondition; if this condition is true, meaning no inconsistencies exists, return early.
if conditions.IsTrue(controlPlane.KCP, controlplanev1.EtcdClusterHealthyCondition) {
return nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
// Failing at connecting to the workload cluster can mean workload cluster is unhealthy for a variety of reasons such as etcd quorum loss.
return errors.Wrap(err, "cannot get remote client to workload cluster")
}

removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames)
if err != nil {
return errors.Wrap(err, "failed attempt to reconcile etcd members")
}

if len(removedMembers) > 0 {
log.Info("Etcd members without nodes removed from the cluster", "members", removedMembers)
}

return nil
}

func (r *KThreesControlPlaneReconciler) upgradeControlPlane(
ctx context.Context,
cluster *clusterv1.Cluster,
Expand Down
125 changes: 125 additions & 0 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package controllers

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s"
)

// KThreesControlPlaneReconciler reconciles a KThreesControlPlane object.
type MachineReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

managementCluster k3s.ManagementCluster
managementClusterUncached k3s.ManagementCluster
}

func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Machine{}).
Build(r)

if r.managementCluster == nil {
r.managementCluster = &k3s.Management{
Client: r.Client,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

if r.managementClusterUncached == nil {
r.managementClusterUncached = &k3s.Management{
Client: mgr.GetAPIReader(),
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

return err
}

// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete
func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", req.Namespace, "machine", req.Name)

m := &clusterv1.Machine{}
if err := r.Client.Get(ctx, req.NamespacedName, m); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}

// Error reading the object - requeue the request.
return ctrl.Result{}, err
}

if m.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

// if machine registered PreTerminate hook, wait for capi to drain and deattach volume, then remove etcd member
if annotations.HasWithPrefix(clusterv1.PreTerminateDeleteHookAnnotationPrefix, m.ObjectMeta.Annotations) &&
m.ObjectMeta.Annotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] == k3sHookName {
if !conditions.IsTrue(m, clusterv1.DrainingSucceededCondition) || !conditions.IsTrue(m, clusterv1.VolumeDetachSucceededCondition) {
logger.Info("wait for machine drain and detech volume operation complete.")
return ctrl.Result{}, nil
}

cluster, err := util.GetClusterFromMetadata(ctx, r.Client, m.ObjectMeta)
if err != nil {
logger.Info("unable to get cluster.")
return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

etcdRemoved, err := workloadCluster.RemoveEtcdMemberForMachine(ctx, m)
if err != nil {
logger.Error(err, "failed to remove etcd member for machine")
return ctrl.Result{}, err
}
if !etcdRemoved {
logger.Info("wait k3s embedded etcd controller to remove etcd")
return ctrl.Result{Requeue: true}, err
}
logger.Info("etcd remove etcd member succeeded", "node", m.Status.NodeRef.Name)

patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}

mAnnotations := m.GetAnnotations()
delete(mAnnotations, clusterv1.PreTerminateDeleteHookAnnotationPrefix)
m.SetAnnotations(mAnnotations)
if err := patchHelper.Patch(ctx, m); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine")
}
}

return ctrl.Result{}, nil
}
29 changes: 12 additions & 17 deletions controlplane/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -167,13 +168,10 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
// Start remediating the unhealthy control plane machine by deleting it.
// A new machine will come up completing the operation as part of the regular reconcile.

// TODO figure out etcd complexities
// If the control plane is initialized, before deleting the machine:
// - if the machine hosts the etcd leader, forward etcd leadership to another machine.
// - delete the etcd member hosted on the machine being deleted.
// - remove the etcd member from the kubeadm config map (only for kubernetes version older than v1.22.0)
/**
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
log.Error(err, "Failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
Expand All @@ -193,23 +191,20 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err
}
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToBeRemediated); err != nil {
log.Error(err, "Failed to remove etcd member for machine")
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err

patchHelper, err := patch.NewHelper(machineToBeRemediated, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}
}

parsedVersion, err := semver.ParseTolerant(controlPlane.KCP.Spec.Version)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to parse kubernetes version %q", controlPlane.KCP.Spec.Version)
}
mAnnotations := machineToBeRemediated.GetAnnotations()
mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName
machineToBeRemediated.SetAnnotations(mAnnotations)

if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToBeRemediated, parsedVersion); err != nil {
log.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
if err := patchHelper.Patch(ctx, machineToBeRemediated); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook")
}
}
**/
}

// Delete the machine
Expand Down
26 changes: 20 additions & 6 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package controllers
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"

bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta1"
Expand Down Expand Up @@ -132,12 +133,19 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane(
logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name)
return ctrl.Result{}, err
}
logger.Info("etcd move etcd leader succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name)
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove etcd member for machine")
return ctrl.Result{}, err

patchHelper, err := patch.NewHelper(machineToDelete, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}

mAnnotations := machineToDelete.GetAnnotations()
mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName
machineToDelete.SetAnnotations(mAnnotations)

if err := patchHelper.Patch(ctx, machineToDelete); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook")
}
logger.Info("etcd remove etcd member succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name)
}

logger = logger.WithValues("machine", machineToDelete)
Expand Down Expand Up @@ -177,6 +185,12 @@ func (r *KThreesControlPlaneReconciler) preflightChecks(_ context.Context, contr

// Check machine health conditions; if there are conditions with False or Unknown, then wait.
allMachineHealthConditions := []clusterv1.ConditionType{controlplanev1.MachineAgentHealthyCondition}
if controlPlane.IsEtcdManaged() {
allMachineHealthConditions = append(allMachineHealthConditions,
controlplanev1.MachineEtcdMemberHealthyCondition,
)
}

machineErrors := []error{}

loopmachines:
Expand Down
12 changes: 12 additions & 0 deletions controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "KThreesControlPlane")
os.Exit(1)
}

ctrMachineLogger := ctrl.Log.WithName("controllers").WithName("Machine")
if err = (&controllers.MachineReconciler{
Client: mgr.GetClient(),
Log: ctrMachineLogger,
Scheme: mgr.GetScheme(),
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Machine")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
Expand Down
Loading
Loading