From 23ca922641a338056e5a5d7d4b91aae35f1ad585 Mon Sep 17 00:00:00 2001 From: qliang Date: Mon, 1 Apr 2024 02:20:06 +0000 Subject: [PATCH] add etcd reconcile logic Signed-off-by: nasusoba add machine controller to reconcile node etcd removal Signed-off-by: nasusoba Add comment and small fix Signed-off-by: nasusoba slightly fixed the comment Signed-off-by: nasusoba --- controlplane/controllers/const.go | 2 + .../kthreescontrolplane_controller.go | 59 ++++++- .../controllers/machine_controller.go | 125 ++++++++++++++ controlplane/controllers/remediation.go | 29 ++-- controlplane/controllers/scale.go | 26 ++- controlplane/main.go | 12 ++ pkg/k3s/workload_cluster.go | 162 +++++++++++++++++- pkg/k3s/workload_cluster_etcd.go | 67 +++++++- test/e2e/node_scale_test.go | 41 +++-- 9 files changed, 459 insertions(+), 64 deletions(-) create mode 100644 controlplane/controllers/machine_controller.go diff --git a/controlplane/controllers/const.go b/controlplane/controllers/const.go index ed6deed8..d3b0cfd4 100644 --- a/controlplane/controllers/const.go +++ b/controlplane/controllers/const.go @@ -30,4 +30,6 @@ const ( // dependentCertRequeueAfter is how long to wait before checking again to see if // dependent certificates have been created. dependentCertRequeueAfter = 30 * time.Second + + k3sHookName = "k3s" ) diff --git a/controlplane/controllers/kthreescontrolplane_controller.go b/controlplane/controllers/kthreescontrolplane_controller.go index 51d2e340..2bf1a205 100644 --- a/controlplane/controllers/kthreescontrolplane_controller.go +++ b/controlplane/controllers/kthreescontrolplane_controller.go @@ -18,12 +18,12 @@ package controllers import ( "context" - "errors" "fmt" "strings" "time" "github.com/go-logr/logr" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -480,9 +480,9 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster * // Ensures the number of etcd members is in sync with the number of machines/nodes. // NOTE: This is usually required after a machine deletion. - // if result, err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil || !result.IsZero() { - // return result, err - // } + if err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil { + return reconcile.Result{}, err + } // Reconcile unhealthy machines by triggering deletion and requeue if it is considered safe to remediate, // otherwise continue with the other KCP operations. @@ -655,6 +655,57 @@ func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx cont return nil } +// reconcileEtcdMembers ensures the number of etcd members is in sync with the number of machines/nodes. +// This is usually required after a machine deletion. +// +// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this. +func (r *KThreesControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *k3s.ControlPlane) error { + log := ctrl.LoggerFrom(ctx) + + // If etcd is not managed by KCP this is a no-op. + if !controlPlane.IsEtcdManaged() { + return nil + } + + // If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet. + if controlPlane.Machines.Len() == 0 { + return nil + } + + // Collect all the node names. + nodeNames := []string{} + for _, machine := range controlPlane.Machines { + if machine.Status.NodeRef == nil { + // If there are provisioning machines (machines without a node yet), return. + return nil + } + nodeNames = append(nodeNames, machine.Status.NodeRef.Name) + } + + // Potential inconsistencies between the list of members and the list of machines/nodes are + // surfaced using the EtcdClusterHealthyCondition; if this condition is true, meaning no inconsistencies exists, return early. + if conditions.IsTrue(controlPlane.KCP, controlplanev1.EtcdClusterHealthyCondition) { + return nil + } + + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) + if err != nil { + // Failing at connecting to the workload cluster can mean workload cluster is unhealthy for a variety of reasons such as etcd quorum loss. + return errors.Wrap(err, "cannot get remote client to workload cluster") + } + + removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames) + if err != nil { + return errors.Wrap(err, "failed attempt to reconcile etcd members") + } + + if len(removedMembers) > 0 { + log.Info("Etcd members without nodes removed from the cluster", "members", removedMembers) + } + + return nil +} + func (r *KThreesControlPlaneReconciler) upgradeControlPlane( ctx context.Context, cluster *clusterv1.Cluster, diff --git a/controlplane/controllers/machine_controller.go b/controlplane/controllers/machine_controller.go new file mode 100644 index 00000000..9482679f --- /dev/null +++ b/controlplane/controllers/machine_controller.go @@ -0,0 +1,125 @@ +package controllers + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" + "sigs.k8s.io/cluster-api/util/conditions" + "sigs.k8s.io/cluster-api/util/patch" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s" +) + +// KThreesControlPlaneReconciler reconciles a KThreesControlPlane object. +type MachineReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme + + EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration + + managementCluster k3s.ManagementCluster + managementClusterUncached k3s.ManagementCluster +} + +func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error { + _, err := ctrl.NewControllerManagedBy(mgr). + For(&clusterv1.Machine{}). + Build(r) + + if r.managementCluster == nil { + r.managementCluster = &k3s.Management{ + Client: r.Client, + EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, + } + } + + if r.managementClusterUncached == nil { + r.managementClusterUncached = &k3s.Management{ + Client: mgr.GetAPIReader(), + EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, + } + } + + return err +} + +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete +func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.Log.WithValues("namespace", req.Namespace, "machine", req.Name) + + m := &clusterv1.Machine{} + if err := r.Client.Get(ctx, req.NamespacedName, m); err != nil { + if apierrors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return ctrl.Result{}, nil + } + + // Error reading the object - requeue the request. + return ctrl.Result{}, err + } + + if m.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + // if machine registered PreTerminate hook, wait for capi to drain and deattach volume, then remove etcd member + if annotations.HasWithPrefix(clusterv1.PreTerminateDeleteHookAnnotationPrefix, m.ObjectMeta.Annotations) && + m.ObjectMeta.Annotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] == k3sHookName { + if !conditions.IsTrue(m, clusterv1.DrainingSucceededCondition) || !conditions.IsTrue(m, clusterv1.VolumeDetachSucceededCondition) { + logger.Info("wait for machine drain and detech volume operation complete.") + return ctrl.Result{}, nil + } + + cluster, err := util.GetClusterFromMetadata(ctx, r.Client, m.ObjectMeta) + if err != nil { + logger.Info("unable to get cluster.") + return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster") + } + + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) + if err != nil { + logger.Error(err, "failed to create client to workload cluster") + return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") + } + + etcdRemoved, err := workloadCluster.RemoveEtcdMemberForMachine(ctx, m) + if err != nil { + logger.Error(err, "failed to remove etcd member for machine") + return ctrl.Result{}, err + } + if !etcdRemoved { + logger.Info("wait k3s embedded etcd controller to remove etcd") + return ctrl.Result{Requeue: true}, err + } + logger.Info("etcd remove etcd member succeeded", "node", m.Status.NodeRef.Name) + + patchHelper, err := patch.NewHelper(m, r.Client) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") + } + + mAnnotations := m.GetAnnotations() + delete(mAnnotations, clusterv1.PreTerminateDeleteHookAnnotationPrefix) + m.SetAnnotations(mAnnotations) + if err := patchHelper.Patch(ctx, m); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed patch machine") + } + } + + return ctrl.Result{}, nil +} diff --git a/controlplane/controllers/remediation.go b/controlplane/controllers/remediation.go index d85e8a57..57bd9a80 100644 --- a/controlplane/controllers/remediation.go +++ b/controlplane/controllers/remediation.go @@ -28,6 +28,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/patch" @@ -167,13 +168,10 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C // Start remediating the unhealthy control plane machine by deleting it. // A new machine will come up completing the operation as part of the regular reconcile. - // TODO figure out etcd complexities // If the control plane is initialized, before deleting the machine: // - if the machine hosts the etcd leader, forward etcd leadership to another machine. // - delete the etcd member hosted on the machine being deleted. - // - remove the etcd member from the kubeadm config map (only for kubernetes version older than v1.22.0) - /** - workloadCluster, err := controlPlane.GetWorkloadCluster(ctx) + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) if err != nil { log.Error(err, "Failed to create client to workload cluster") return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") @@ -193,23 +191,20 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error()) return ctrl.Result{}, err } - if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToBeRemediated); err != nil { - log.Error(err, "Failed to remove etcd member for machine") - conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return ctrl.Result{}, err + + patchHelper, err := patch.NewHelper(machineToBeRemediated, r.Client) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") } - } - parsedVersion, err := semver.ParseTolerant(controlPlane.KCP.Spec.Version) - if err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to parse kubernetes version %q", controlPlane.KCP.Spec.Version) - } + mAnnotations := machineToBeRemediated.GetAnnotations() + mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName + machineToBeRemediated.SetAnnotations(mAnnotations) - if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToBeRemediated, parsedVersion); err != nil { - log.Error(err, "Failed to remove machine from kubeadm ConfigMap") - return ctrl.Result{}, err + if err := patchHelper.Patch(ctx, machineToBeRemediated); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook") + } } - **/ } // Delete the machine diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index d72f6879..4e64fdae 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -19,10 +19,10 @@ package controllers import ( "context" "encoding/json" - "errors" "fmt" "strings" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conditions" + "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta1" @@ -132,12 +133,19 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name) return ctrl.Result{}, err } - logger.Info("etcd move etcd leader succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name) - if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil { - logger.Error(err, "Failed to remove etcd member for machine") - return ctrl.Result{}, err + + patchHelper, err := patch.NewHelper(machineToDelete, r.Client) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") + } + + mAnnotations := machineToDelete.GetAnnotations() + mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName + machineToDelete.SetAnnotations(mAnnotations) + + if err := patchHelper.Patch(ctx, machineToDelete); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook") } - logger.Info("etcd remove etcd member succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name) } logger = logger.WithValues("machine", machineToDelete) @@ -177,6 +185,12 @@ func (r *KThreesControlPlaneReconciler) preflightChecks(_ context.Context, contr // Check machine health conditions; if there are conditions with False or Unknown, then wait. allMachineHealthConditions := []clusterv1.ConditionType{controlplanev1.MachineAgentHealthyCondition} + if controlPlane.IsEtcdManaged() { + allMachineHealthConditions = append(allMachineHealthConditions, + controlplanev1.MachineEtcdMemberHealthyCondition, + ) + } + machineErrors := []error{} loopmachines: diff --git a/controlplane/main.go b/controlplane/main.go index b97c6ef0..7ba1ae6c 100644 --- a/controlplane/main.go +++ b/controlplane/main.go @@ -110,6 +110,18 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "KThreesControlPlane") os.Exit(1) } + + ctrMachineLogger := ctrl.Log.WithName("controllers").WithName("Machine") + if err = (&controllers.MachineReconciler{ + Client: mgr.GetClient(), + Log: ctrMachineLogger, + Scheme: mgr.GetScheme(), + EtcdDialTimeout: etcdDialTimeout, + EtcdCallTimeout: etcdCallTimeout, + }).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Machine") + os.Exit(1) + } // +kubebuilder:scaffold:builder setupLog.Info("starting manager") diff --git a/pkg/k3s/workload_cluster.go b/pkg/k3s/workload_cluster.go index de97243b..ed6d917c 100644 --- a/pkg/k3s/workload_cluster.go +++ b/pkg/k3s/workload_cluster.go @@ -18,6 +18,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/rest" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/certs" @@ -25,6 +26,8 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta1" + "github.com/k3s-io/cluster-api-k3s/pkg/etcd" + etcdutil "github.com/k3s-io/cluster-api-k3s/pkg/etcd/util" ) const ( @@ -46,7 +49,7 @@ type WorkloadCluster interface { UpdateEtcdConditions(ctx context.Context, controlPlane *ControlPlane) // Etcd tasks - RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error + RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) (bool, error) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) @@ -57,7 +60,9 @@ type WorkloadCluster interface { type Workload struct { WorkloadCluster - Client ctrlclient.Client + Client ctrlclient.Client + ClientRestConfig *rest.Config + CoreDNSMigrator coreDNSMigrator etcdClientGenerator etcdClientFor } @@ -129,12 +134,6 @@ func (w *Workload) UpdateAgentConditions(ctx context.Context, controlPlane *Cont controlplanev1.MachineAgentHealthyCondition, } - /** TODO: figure out etcd - if controlPlane.IsEtcdManaged() { - allMachinePodConditions = append(allMachinePodConditions, controlplanev1.MachineEtcdPodHealthyCondition) - } - **/ - // NOTE: this fun uses control plane nodes from the workload cluster as a source of truth for the current state. controlPlaneNodes, err := w.getControlPlaneNodes(ctx) if err != nil { @@ -347,7 +346,18 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane return } + // Update conditions for etcd members on the nodes. + var ( + // kcpErrors is used to store errors that can't be reported on any machine. + kcpErrors []string + // clusterID is used to store and compare the etcd's cluster id. + clusterID *uint64 + // members is used to store the list of etcd members and compare with all the other nodes in the cluster. + members []*etcd.Member + ) + for _, node := range controlPlaneNodes.Items { + // Search for the machine corresponding to the node. var machine *clusterv1.Machine for _, m := range controlPlane.Machines { if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name { @@ -361,6 +371,7 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane if hasProvisioningMachine(controlPlane.Machines) { continue } + kcpErrors = append(kcpErrors, fmt.Sprintf("Control plane node %s does not have a corresponding machine", node.Name)) continue } @@ -370,8 +381,143 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane continue } + currentMembers, err := w.getCurrentEtcdMembers(ctx, machine, node.Name) + if err != nil { + continue + } + + // Check if the list of members IDs reported is the same as all other members. + // NOTE: the first member reporting this information is the baseline for this information. + if members == nil { + members = currentMembers + } + if !etcdutil.MemberEqual(members, currentMembers) { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member reports the cluster is composed by members %s, but all previously seen etcd members are reporting %s", etcdutil.MemberNames(currentMembers), etcdutil.MemberNames(members)) + continue + } + + // Retrieve the member and check for alarms. + // NB. The member for this node always exists given forFirstAvailableNode(node) used above + member := etcdutil.MemberForName(currentMembers, node.Name) + if member == nil { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member reports the cluster is composed by members %s, but the member itself (%s) is not included", etcdutil.MemberNames(currentMembers), node.Name) + continue + } + if len(member.Alarms) > 0 { + alarmList := []string{} + for _, alarm := range member.Alarms { + switch alarm { + case etcd.AlarmOK: + continue + default: + alarmList = append(alarmList, etcd.AlarmTypeName[alarm]) + } + } + if len(alarmList) > 0 { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Etcd member reports alarms: %s", strings.Join(alarmList, ", ")) + continue + } + } + + // Check if the member belongs to the same cluster as all other members. + // NOTE: the first member reporting this information is the baseline for this information. + if clusterID == nil { + clusterID = &member.ClusterID + } + if *clusterID != member.ClusterID { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member has cluster ID %d, but all previously seen etcd members have cluster ID %d", member.ClusterID, *clusterID) + continue + } + conditions.MarkTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) } + + // Make sure that the list of etcd members and machines is consistent. + kcpErrors = compareMachinesAndMembers(controlPlane, members, kcpErrors) + + // Aggregate components error from machines at KCP level + aggregateFromMachinesToKCP(aggregateFromMachinesToKCPInput{ + controlPlane: controlPlane, + machineConditions: []clusterv1.ConditionType{controlplanev1.MachineEtcdMemberHealthyCondition}, + kcpErrors: kcpErrors, + condition: controlplanev1.EtcdClusterHealthyCondition, + unhealthyReason: controlplanev1.EtcdClusterUnhealthyReason, + unknownReason: controlplanev1.EtcdClusterUnknownReason, + note: "etcd member", + }) +} + +func (w *Workload) getCurrentEtcdMembers(ctx context.Context, machine *clusterv1.Machine, nodeName string) ([]*etcd.Member, error) { + // Create the etcd Client for the etcd Pod scheduled on the Node + etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, []string{nodeName}) + if err != nil { + conditions.MarkUnknown(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberInspectionFailedReason, "Failed to connect to the etcd pod on the %s node: %s", nodeName, err) + return nil, errors.Wrapf(err, "failed to get current etcd members: failed to connect to the etcd pod on the %s node", nodeName) + } + defer etcdClient.Close() + + // While creating a new client, forFirstAvailableNode retrieves the status for the endpoint; check if the endpoint has errors. + if len(etcdClient.Errors) > 0 { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Etcd member status reports errors: %s", strings.Join(etcdClient.Errors, ", ")) + return nil, errors.Errorf("failed to get current etcd members: etcd member status reports errors: %s", strings.Join(etcdClient.Errors, ", ")) + } + + // Gets the list etcd members known by this member. + currentMembers, err := etcdClient.Members(ctx) + if err != nil { + // NB. We should never be in here, given that we just received answer to the etcd calls included in forFirstAvailableNode; + // however, we are considering the calls to Members a signal of etcd not being stable. + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Failed get answer from the etcd member on the %s node", nodeName) + return nil, errors.Errorf("failed to get current etcd members: failed get answer from the etcd member on the %s node", nodeName) + } + + return currentMembers, nil +} + +func compareMachinesAndMembers(controlPlane *ControlPlane, members []*etcd.Member, kcpErrors []string) []string { + // NOTE: We run this check only if we actually know the list of members, otherwise the first for loop + // could generate a false negative when reporting missing etcd members. + if members == nil { + return kcpErrors + } + + // Check Machine -> Etcd member. + for _, machine := range controlPlane.Machines { + if machine.Status.NodeRef == nil { + continue + } + found := false + for _, member := range members { + nodeNameFromMember := etcdutil.NodeNameFromMember(member) + if machine.Status.NodeRef.Name == nodeNameFromMember { + found = true + break + } + } + if !found { + conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Missing etcd member") + } + } + + // Check Etcd member -> Machine. + for _, member := range members { + found := false + nodeNameFromMember := etcdutil.NodeNameFromMember(member) + for _, machine := range controlPlane.Machines { + if machine.Status.NodeRef != nil && machine.Status.NodeRef.Name == nodeNameFromMember { + found = true + break + } + } + if !found { + name := nodeNameFromMember + if name == "" { + name = fmt.Sprintf("%d (Name not yet assigned)", member.ID) + } + kcpErrors = append(kcpErrors, fmt.Sprintf("etcd member %s does not have a corresponding machine", name)) + } + } + return kcpErrors } func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) { diff --git a/pkg/k3s/workload_cluster_etcd.go b/pkg/k3s/workload_cluster_etcd.go index 30080035..b2920794 100644 --- a/pkg/k3s/workload_cluster_etcd.go +++ b/pkg/k3s/workload_cluster_etcd.go @@ -18,22 +18,31 @@ package k3s import ( "context" + "fmt" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util/patch" + ctrl "sigs.k8s.io/controller-runtime" "github.com/k3s-io/cluster-api-k3s/pkg/etcd" etcdutil "github.com/k3s-io/cluster-api-k3s/pkg/etcd/util" ) +const ( + EtcdRemoveAnnotation = "etcd.k3s.cattle.io/remove" + EtcdRemovedNodeAnnotation = "etcd.k3s.cattle.io/removed-node-name" +) + type etcdClientFor interface { forFirstAvailableNode(ctx context.Context, nodeNames []string) (*etcd.Client, error) forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) } // ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes. -// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them. +// If there are any such members, it deletes them from etcd so that k3s controlplane does not run etcd health checks on them. func (w *Workload) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) { allRemovedMembers := []string{} allErrs := []error{} @@ -47,6 +56,8 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) } func (w *Workload) reconcileEtcdMember(ctx context.Context, nodeNames []string, nodeName string) ([]string, []error) { + log := ctrl.LoggerFrom(ctx) + // Create the etcd Client for the etcd Pod scheduled on the Node etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, []string{nodeName}) if err != nil { @@ -66,7 +77,7 @@ func (w *Workload) reconcileEtcdMember(ctx context.Context, nodeNames []string, loopmembers: for _, member := range members { curNodeName := etcdutil.NodeNameFromMember(member) - // If this member is just added, it has a empty name until the etcd pod starts. Ignore it. + // If this member is just added, it has a empty name until the etcd starts. Ignore it. if curNodeName == "" { continue } @@ -80,24 +91,25 @@ loopmembers: // If we're here, the node cannot be found. removedMembers = append(removedMembers, curNodeName) - if err := w.removeMemberForNode(ctx, curNodeName); err != nil { + log.Info("removing etcd from nonexisting node", "node", curNodeName) + if err := w.removeMemberForNonExistingNode(ctx, curNodeName); err != nil { errs = append(errs, err) } } return removedMembers, errs } -// RemoveEtcdMemberForMachine removes the etcd member from the target cluster's etcd cluster. +// RemoveEtcdMemberForMachine removes the etcd member from the target cluster's etcd cluster, and returns true if the member has been removed. // Removing the last remaining member of the cluster is not supported. -func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error { +func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) (bool, error) { if machine == nil || machine.Status.NodeRef == nil { // Nothing to do, no node for Machine - return nil + return true, nil } return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name) } -func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { +func (w *Workload) removeMemberForNonExistingNode(ctx context.Context, name string) error { controlPlaneNodes, err := w.getControlPlaneNodes(ctx) if err != nil { return err @@ -106,13 +118,13 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { return ErrControlPlaneMinNodes } - // Exclude node being removed from etcd client node list var remainingNodes []string for _, n := range controlPlaneNodes.Items { if n.Name != name { remainingNodes = append(remainingNodes, n.Name) } } + etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, remainingNodes) if err != nil { return errors.Wrap(err, "failed to create etcd client") @@ -138,6 +150,45 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { return nil } +func (w *Workload) removeMemberForNode(ctx context.Context, name string) (bool, error) { + controlPlaneNodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return false, err + } + if len(controlPlaneNodes.Items) < 2 { + return false, ErrControlPlaneMinNodes + } + + var removingNode corev1.Node + for _, n := range controlPlaneNodes.Items { + if n.Name == name { + removingNode = n + } + } + + if removingNode.Name != name { + return false, errors.New(fmt.Sprintf("node %s not found", name)) + } + + annotations := removingNode.GetAnnotations() + if _, ok := annotations[EtcdRemovedNodeAnnotation]; ok { + return true, nil + } + + patchHelper, err := patch.NewHelper(&removingNode, w.Client) + if err != nil { + return false, errors.Wrapf(err, "failed to create patch helper for node") + } + + annotations[EtcdRemoveAnnotation] = "true" + removingNode.SetAnnotations(annotations) + if err := patchHelper.Patch(ctx, &removingNode); err != nil { + return false, errors.Wrapf(err, "failed patch node") + } + + return false, nil +} + // ForwardEtcdLeadership forwards etcd leadership to the first follower. func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error { if machine == nil || machine.Status.NodeRef == nil { diff --git a/test/e2e/node_scale_test.go b/test/e2e/node_scale_test.go index 5d55935d..3a02f77e 100644 --- a/test/e2e/node_scale_test.go +++ b/test/e2e/node_scale_test.go @@ -136,26 +136,25 @@ var _ = Describe("Workload cluster scaling", func() { WaitForMachineDeployments: e2eConfig.GetIntervals(specName, "wait-worker-nodes"), }, result) - // TODO: enable after scaling down control planes working - // By("Scaling down control planes to 1") - - // ApplyClusterTemplateAndWait(ctx, ApplyClusterTemplateAndWaitInput{ - // ClusterProxy: bootstrapClusterProxy, - // ConfigCluster: clusterctl.ConfigClusterInput{ - // LogFolder: clusterctlLogFolder, - // ClusterctlConfigPath: clusterctlConfigPath, - // KubeconfigPath: bootstrapClusterProxy.GetKubeconfigPath(), - // InfrastructureProvider: infrastructureProvider, - // Namespace: namespace.Name, - // ClusterName: clusterName, - // KubernetesVersion: e2eConfig.GetVariable(KubernetesVersion), - // ControlPlaneMachineCount: pointer.Int64Ptr(1), - // WorkerMachineCount: pointer.Int64Ptr(3), - // }, - // WaitForClusterIntervals: e2eConfig.GetIntervals(specName, "wait-cluster"), - // WaitForControlPlaneIntervals: e2eConfig.GetIntervals(specName, "wait-control-plane"), - // WaitForMachineDeployments: e2eConfig.GetIntervals(specName, "wait-worker-nodes"), - // }, result) + By("Scaling down control planes to 1") + + ApplyClusterTemplateAndWait(ctx, ApplyClusterTemplateAndWaitInput{ + ClusterProxy: bootstrapClusterProxy, + ConfigCluster: clusterctl.ConfigClusterInput{ + LogFolder: clusterctlLogFolder, + ClusterctlConfigPath: clusterctlConfigPath, + KubeconfigPath: bootstrapClusterProxy.GetKubeconfigPath(), + InfrastructureProvider: infrastructureProvider, + Namespace: namespace.Name, + ClusterName: clusterName, + KubernetesVersion: e2eConfig.GetVariable(KubernetesVersion), + ControlPlaneMachineCount: pointer.Int64Ptr(1), + WorkerMachineCount: pointer.Int64Ptr(3), + }, + WaitForClusterIntervals: e2eConfig.GetIntervals(specName, "wait-cluster"), + WaitForControlPlaneIntervals: e2eConfig.GetIntervals(specName, "wait-control-plane"), + WaitForMachineDeployments: e2eConfig.GetIntervals(specName, "wait-worker-nodes"), + }, result) By("Scaling down worker nodes to 1") @@ -169,7 +168,7 @@ var _ = Describe("Workload cluster scaling", func() { Namespace: namespace.Name, ClusterName: clusterName, KubernetesVersion: e2eConfig.GetVariable(KubernetesVersion), - ControlPlaneMachineCount: pointer.Int64Ptr(3), // TODO: change to 1 after scaling down control planes working + ControlPlaneMachineCount: pointer.Int64Ptr(1), WorkerMachineCount: pointer.Int64Ptr(1), }, WaitForClusterIntervals: e2eConfig.GetIntervals(specName, "wait-cluster"),