Skip to content

Commit

Permalink
Add support for expand disk
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Feb 22, 2024
1 parent bfe3225 commit df8ea6f
Show file tree
Hide file tree
Showing 19 changed files with 495 additions and 87 deletions.
3 changes: 0 additions & 3 deletions api/v1beta1/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ const (
// 1. ${Tower username}@${Tower auth_config_id}, e.g. caas.smartx@7e98ecbb-779e-43f6-8330-1bc1d29fffc7.
// 2. ${Tower username}, e.g. root. If auth_config_id is not set, it means it is a LOCAL user.
CreatedByAnnotation = "cape.infrastructure.cluster.x-k8s.io/created-by"

// HostAgentJobNameAnnotation is the annotation identifying the name of HostOperationJob.
HostAgentJobNameAnnotation = "cape.infrastructure.cluster.x-k8s.io/host-agent-job-name"
)

// Labels.
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta1/elfmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type ElfMachineStatus struct {
// +optional
GPUDevices []GPUStatus `json:"gpuDevices,omitempty"`

// Resources records the resources allocated for the machine.
// +optional
Resources ResourcesStatus `json:"resources,omitempty"`

// FailureReason will be set in the event that there is a terminal problem
// reconciling the Machine and will contain a succinct value suitable
// for machine interpretation.
Expand Down Expand Up @@ -241,6 +245,11 @@ func (m *ElfMachine) IsFailed() bool {
return m.Status.FailureReason != nil || m.Status.FailureMessage != nil
}

// IsResourcesUpToDate returns whether the machine's resources are as expected.
func (m *ElfMachine) IsResourcesUpToDate() bool {
return m.Spec.DiskGiB == m.Status.Resources.Disk
}

func (m *ElfMachine) SetVMDisconnectionTimestamp(timestamp *metav1.Time) {
if m.Annotations == nil {
m.Annotations = make(map[string]string)
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ type GPUStatus struct {
Name string `json:"name,omitempty"`
}

// ResourcesStatus records the resources allocated to the virtual machine.
type ResourcesStatus struct {
Disk int32 `json:"disk,omitempty"`
}

//+kubebuilder:object:generate=false

// PatchStringValue is for patching resources.
Expand Down
16 changes: 16 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ spec:
ready:
description: Ready is true when the provider resource is ready.
type: boolean
resources:
description: Resources records the resources allocated for the machine.
properties:
disk:
format: int32
type: integer
type: object
taskRef:
description: TaskRef is a managed object reference to a Task related
to the machine. This value is set automatically at runtime and should
Expand Down
20 changes: 20 additions & 0 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ kind: ValidatingWebhookConfiguration
metadata:
name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-infrastructure-cluster-x-k8s-io-v1beta1-elfmachine
failurePolicy: Fail
name: validation.elfmachine.infrastructure.x-k8s.io
rules:
- apiGroups:
- infrastructure.cluster.x-k8s.io
apiVersions:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- elfmachines
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
12 changes: 10 additions & 2 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,23 +980,31 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx *context.MachineContext
if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
case service.IsUpdateVMDiskTask(task, ctx.ElfMachine.Name):
reason := conditions.GetReason(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMDiskReason || reason == infrav1.ExpandingVMDiskFailedReason {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, errorMessage)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
}

switch {
case service.IsVMDuplicateError(errorMessage):
setVMDuplicate(ctx.ElfMachine.Name)
case service.IsMemoryInsufficientError(errorMessage):
recordElfClusterMemoryInsufficient(ctx, true)
message := fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
ctx.Logger.Info(message)

return errors.New(message)
case service.IsPlacementGroupError(errorMessage):
if err := recordPlacementGroupPolicyNotSatisfied(ctx, true); err != nil {
return err
}
message := "The placement group policy can not be satisfied"
ctx.Logger.Info(message)

return errors.New(message)
}

Expand Down
165 changes: 102 additions & 63 deletions controllers/elfmachine_controller_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controllers

import (
"fmt"
"time"

"github.com/pkg/errors"
"github.com/smartxworks/cloudtower-go-sdk/v2/models"
Expand All @@ -29,77 +30,36 @@ import (
"github.com/smartxworks/cluster-api-provider-elf/pkg/context"
"github.com/smartxworks/cluster-api-provider-elf/pkg/hostagent"
"github.com/smartxworks/cluster-api-provider-elf/pkg/service"
annotationsutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/annotations"
machineutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/machine"
)

func (r *ElfMachineReconciler) reconcileVMResources(ctx *context.MachineContext, vm *models.VM) (bool, error) {
if !machineutil.IsUpdatingElfMachineResources(ctx.ElfMachine) {
return true, nil
}

if ok, err := r.reconcieVMVolume(ctx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
return ok, err
}

// Agent needs to wait for the node exists before it can run and execute commands.
if ctx.Machine.Status.Phase != string(clusterv1.MachinePhaseRunning) {
ctx.Logger.Info("Waiting for node exists for host agent running", "phase", ctx.Machine.Status.Phase)
if machineutil.IsUpdatingElfMachineResources(ctx.ElfMachine) &&
ctx.Machine.Status.NodeInfo == nil {
ctx.Logger.Info("Waiting for node exists for host agent expand vm root partition")

return false, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", ctx.Client, client.ObjectKey{Namespace: ctx.Cluster.Namespace, Name: ctx.Cluster.Name})
if err != nil {
return false, err
}

var agentJob *agentv1.HostOperationJob
agentJobName := annotationsutil.HostAgentJobName(ctx.ElfMachine)
if agentJobName != "" {
agentJob, err = hostagent.GetHostJob(ctx, kubeClient, ctx.ElfMachine.Namespace, agentJobName)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
}
if agentJob == nil {
agentJob, err = hostagent.AddNewDiskCapacityToRoot(ctx, kubeClient, ctx.ElfMachine)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

annotationsutil.AddAnnotations(ctx.ElfMachine, map[string]string{infrav1.HostAgentJobNameAnnotation: agentJob.Name})

conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")

ctx.Logger.Info("Waiting for disk to be added new disk capacity to root", "hostAgentJob", agentJob.Name)

return false, nil
if ok, err := r.expandVMRootPartition(ctx); err != nil || !ok {
return ok, err
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
if machineutil.IsUpdatingElfMachineResources(ctx.ElfMachine) {
conditions.MarkTrue(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
ctx.Logger.Info("Add new disk capacity to root succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
annotationsutil.RemoveAnnotation(ctx.ElfMachine, infrav1.HostAgentJobNameAnnotation)
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
ctx.Logger.Info("Add new disk capacity to root failed, will try again", "hostAgentJob", agentJob.Name)

return false, nil
default:
ctx.Logger.Info("Waiting for adding new disk capacity to root job done", "jobStatus", agentJob.Status.Phase)

return false, nil
}

return true, nil
}

// reconcieVMVolume ensures that the vm disk size is as expected.
//
// The conditionType param: VMProvisionedCondition/ResourcesHotUpdatedCondition.
func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm *models.VM, conditionType clusterv1.ConditionType) (bool, error) {
vmDiskIDs := make([]string, len(vm.VMDisks))
for i := 0; i < len(vm.VMDisks); i++ {
Expand All @@ -108,6 +68,8 @@ func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm

vmDisks, err := ctx.VMService.GetVMDisks(vmDiskIDs)
if err != nil {
return false, errors.Wrapf(err, "failed to get disks for vm %s/%s", *vm.ID, *vm.Name)
} else if len(vmDisks) == 0 {
return false, errors.Errorf("no disks found for vm %s/%s", *vm.ID, *vm.Name)
}

Expand All @@ -116,37 +78,114 @@ func (r *ElfMachineReconciler) reconcieVMVolume(ctx *context.MachineContext, vm
return false, err
}

diskSize := service.TowerDisk(ctx.ElfMachine.Spec.DiskGiB)
if *diskSize > *vmVolume.Size {
if service.IsTowerResourcePerformingAnOperation(vmVolume.EntityAsyncStatus) {
ctx.Logger.Info("Waiting for vm volume task done", "volume", fmt.Sprintf("%s/%s", *vmVolume.ID, *vmVolume.Name))
diskSize := service.ByteToGiB(*vmVolume.Size)
ctx.ElfMachine.Status.Resources.Disk = diskSize

return false, nil
}

return false, r.resizeVMVolume(ctx, vmVolume, *diskSize, conditionType)
} else if *diskSize < *vmVolume.Size {
conditions.MarkTrue(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
ctx.Logger.Info(fmt.Sprintf("Current disk capacity is larger than expected, skipping expand vm volume %s/%s", *vmVolume.ID, *vmVolume.Name), "currentSize", *vmVolume.Size, "expectedSize", *diskSize)
if ctx.ElfMachine.Spec.DiskGiB < diskSize {
ctx.Logger.V(3).Info(fmt.Sprintf("Current disk capacity is larger than expected, skipping expand vm volume %s/%s", *vmVolume.ID, *vmVolume.Name), "currentSize", diskSize, "expectedSize", ctx.ElfMachine.Spec.DiskGiB)
} else if ctx.ElfMachine.Spec.DiskGiB > diskSize {
return false, r.resizeVMVolume(ctx, vmVolume, *service.TowerDisk(ctx.ElfMachine.Spec.DiskGiB), conditionType)
}

return true, nil
}

// resizeVMVolume sets the volume to the specified size.
func (r *ElfMachineReconciler) resizeVMVolume(ctx *context.MachineContext, vmVolume *models.VMVolume, diskSize int64, conditionType clusterv1.ConditionType) error {
reason := conditions.GetReason(ctx.ElfMachine, conditionType)
if reason == "" ||
(reason != infrav1.ExpandingVMDiskReason && reason != infrav1.ExpandingVMDiskFailedReason) {
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskReason, clusterv1.ConditionSeverityInfo, "")

// Save the conditionType first, and then expand the disk capacity.
// This prevents the disk expansion from succeeding but failing to save the
// conditionType, causing ElfMachine to not record the conditionType.
return nil
}

if service.IsTowerResourcePerformingAnOperation(vmVolume.EntityAsyncStatus) {
ctx.Logger.Info("Waiting for vm volume task done", "volume", fmt.Sprintf("%s/%s", *vmVolume.ID, *vmVolume.Name))

return nil
}

withTaskVMVolume, err := ctx.VMService.ResizeVMVolume(*vmVolume.ID, diskSize)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskReason, clusterv1.ConditionSeverityWarning, err.Error())
conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

return errors.Wrapf(err, "failed to trigger expand size from %d to %d for vm volume %s/%s", *vmVolume.Size, diskSize, *vmVolume.ID, *vmVolume.Name)
}

conditions.MarkFalse(ctx.ElfMachine, conditionType, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, "")

ctx.ElfMachine.SetTask(*withTaskVMVolume.TaskID)

ctx.Logger.Info(fmt.Sprintf("Waiting for the vm volume %s/%s to be expanded", *vmVolume.ID, *vmVolume.Name), "taskRef", ctx.ElfMachine.Status.TaskRef, "oldSize", *vmVolume.Size, "newSize", diskSize)

return nil
}

// expandVMRootPartition adds new disk capacity to root partition.
func (r *ElfMachineReconciler) expandVMRootPartition(ctx *context.MachineContext) (bool, error) {
reason := conditions.GetReason(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
} else if reason != infrav1.ExpandingVMDiskReason &&
reason != infrav1.ExpandingVMDiskFailedReason &&
reason != infrav1.ExpandingRootPartitionReason &&
reason != infrav1.ExpandingRootPartitionFailedReason {
return true, nil
}

if reason != infrav1.ExpandingRootPartitionFailedReason {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", ctx.Client, client.ObjectKey{Namespace: ctx.Cluster.Namespace, Name: ctx.Cluster.Name})
if err != nil {
return false, err
}

agentJob, err := hostagent.GetHostJob(ctx, kubeClient, ctx.ElfMachine.Namespace, hostagent.GetExpandRootPartitionJobName(ctx.ElfMachine))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}

if agentJob == nil {
agentJob, err = hostagent.ExpandRootPartition(ctx, kubeClient, ctx.ElfMachine)
if err != nil {
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

ctx.Logger.Info("Waiting for expanding root partition", "hostAgentJob", agentJob.Name)

return false, nil
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
ctx.Logger.Info("Expand root partition to root succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
conditions.MarkFalse(ctx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
ctx.Logger.Info("Expand root partition failed, will try again after two minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)

lastExecutionTime := agentJob.Status.LastExecutionTime
if lastExecutionTime == nil {
lastExecutionTime = &agentJob.CreationTimestamp
}
// Two minutes after the job fails, delete the job and try again.
if time.Now().After(lastExecutionTime.Add(2 * time.Minute)) {
if err := kubeClient.Delete(ctx, agentJob); err != nil {
return false, errors.Wrapf(err, "failed to delete expand root partition job %s/%s for retry", agentJob.Namespace, agentJob.Name)
}
}

return false, nil
default:
ctx.Logger.Info("Waiting for expanding root partition job done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)

return false, nil
}

return true, nil
}
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func main() {
return err
}

if err := (&webhooks.ElfMachineValidator{
Client: mgr.GetClient(),
}).SetupWebhookWithManager(mgr); err != nil {
return err
}

if err := (&webhooks.ElfMachineMutation{
Client: mgr.GetClient(),
Logger: mgr.GetLogger().WithName("ElfMachineMutation"),
Expand Down
Loading

0 comments on commit df8ea6f

Please sign in to comment.