Skip to content

Commit

Permalink
Optimize the creation of virtual machines when insufficient storage d…
Browse files Browse the repository at this point in the history
…etected for ELF cluster
  • Loading branch information
haijianyang committed Jan 24, 2024
1 parent 66ba3c9 commit a8f8d38
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 20 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ const (
// waiting for ELF cluster with sufficient memory to create or power on VM.
WaitingForELFClusterWithSufficientMemoryReason = "WaitingForELFClusterWithSufficientMemory"

// WaitingForELFClusterWithSufficientStorageReason (Severity=Info) documents an ElfMachine
// waiting for ELF cluster with sufficient storage to create or power on VM.
WaitingForELFClusterWithSufficientStorageReason = "WaitingForELFClusterWithSufficientStorage"

// WaitingForAvailableHostRequiredByPlacementGroupReason (Severity=Info) documents an ElfMachine
// waiting for an available host required by placement group to create VM.
WaitingForAvailableHostRequiredByPlacementGroupReason = "WaitingForAvailableHostRequiredByPlacementGroup"
Expand Down
16 changes: 12 additions & 4 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *

if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
releaseTicketForCreateVM(ctx.ElfMachine.Name)
recordElfClusterStorageInsufficient(ctx, false)
recordElfClusterMemoryInsufficient(ctx, false)

if err := recordPlacementGroupPolicyNotSatisfied(ctx, false); err != nil {
Expand Down Expand Up @@ -958,17 +959,24 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx *context.MachineContext
case service.IsCloneVMTask(task):
releaseTicketForCreateVM(ctx.ElfMachine.Name)

if service.IsVMDuplicateError(errorMessage) {
setVMDuplicate(ctx.ElfMachine.Name)
}

if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if ctx.ElfMachine.RequiresGPUDevices() {
unlockGPUDevicesLockedByVM(ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Name)
}
}

switch {
case service.IsVMDuplicateError(errorMessage):
setVMDuplicate(ctx.ElfMachine.Name)
case service.IsStorageInsufficientError(errorMessage):
recordElfClusterStorageInsufficient(ctx, true)
message := fmt.Sprintf("Insufficient storage detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
ctx.Logger.Info(message)

return errors.New(message)
case service.IsMemoryInsufficientError(errorMessage):
recordElfClusterMemoryInsufficient(ctx, true)
message := fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
Expand Down
45 changes: 38 additions & 7 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ var _ = Describe("ElfMachineReconciler", func() {
resetMemoryCache()
vm := fake.NewTowerVM()
vm.Name = &elfMachine.Name
elfCluster.Spec.Cluster = clusterKey
elfCluster.Spec.Cluster = clusterInsufficientStorageKey
task := fake.NewTowerTask()
withTaskVM := fake.NewWithTaskVM(vm, task)
ctrlutil.AddFinalizer(elfMachine, infrav1.MachineFinalizer)
Expand All @@ -271,12 +271,26 @@ var _ = Describe("ElfMachineReconciler", func() {

machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
machineContext.VMService = mockVMService
recordIsUnmet(machineContext, clusterKey, true)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
recordIsUnmet(machineContext, clusterInsufficientStorageKey, true)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
elfMachineKey := capiutil.ObjectKey(elfMachine)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
Expect(result.RequeueAfter).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("Insufficient storage detected for the ELF cluster"))
expireELFScheduleVMError(machineContext, clusterInsufficientStorageKey)

logBuffer.Reset()
elfCluster.Spec.Cluster = clusterInsufficientMemoryKey
ctrlContext = newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
machineContext = newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
recordIsUnmet(machineContext, clusterInsufficientMemoryKey, true)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
Expect(result.RequeueAfter).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))

logBuffer = new(bytes.Buffer)
Expand All @@ -285,7 +299,7 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().Get(*vm.ID).Return(vm, nil)
mockVMService.EXPECT().GetTask(*task.ID).Return(task, nil)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
expireELFScheduleVMError(machineContext, clusterKey)
expireELFScheduleVMError(machineContext, clusterInsufficientMemoryKey)

reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
Expand Down Expand Up @@ -826,20 +840,20 @@ var _ = Describe("ElfMachineReconciler", func() {
vm := fake.NewTowerVM()
vm.Host = &models.NestedHost{ID: service.TowerString(fake.ID())}
elfMachine.Status.VMRef = *vm.LocalID
elfCluster.Spec.Cluster = clusterKey
elfCluster.Spec.Cluster = clusterInsufficientMemoryKey
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
machineContext.VMService = mockVMService
recordIsUnmet(machineContext, clusterKey, true)
recordIsUnmet(machineContext, clusterInsufficientMemoryKey, true)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
err := reconciler.powerOnVM(machineContext, vm)
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))

task := fake.NewTowerTask()
mockVMService.EXPECT().PowerOn(elfMachine.Status.VMRef, "").Return(task, nil)
expireELFScheduleVMError(machineContext, clusterKey)
expireELFScheduleVMError(machineContext, clusterInsufficientMemoryKey)
err = reconciler.powerOnVM(machineContext, vm)
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("and the retry silence period passes, will try to power on the VM again"))
Expand Down Expand Up @@ -3185,11 +3199,28 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err.Error()).To(ContainSubstring("The placement group policy can not be satisfied"))
Expect(logBuffer.String()).To(ContainSubstring("VM task failed"))

logBuffer.Reset()
ok, msg, err := isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeTrue())
Expect(msg).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))
Expect(err).ShouldNot(HaveOccurred())

resetMemoryCache()
logBuffer.Reset()
elfMachine.Status.TaskRef = *task.ID
task.ErrorMessage = service.TowerString(service.StorageInsufficientError)
ok, err = reconciler.reconcileVMTask(machineContext, nil)
Expect(ok).Should(BeTrue())
Expect(err.Error()).To(ContainSubstring("Insufficient storage detected for the ELF cluster"))
Expect(elfMachine.Status.TaskRef).To(Equal(""))
Expect(logBuffer.String()).To(ContainSubstring("VM task failed"))

logBuffer.Reset()
ok, msg, err = isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeTrue())
Expect(msg).To(ContainSubstring("Insufficient storage detected for the ELF cluster"))
Expect(err).ShouldNot(HaveOccurred())

// Start VM
task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED)
task.Description = service.TowerString("Start VM")
Expand Down
25 changes: 23 additions & 2 deletions controllers/tower_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type clusterResource struct {
//
// Includes these scenarios:
// 1. ELF cluster has insufficient memory.
// 2. Placement group not satisfy policy.
// 2. ELF cluster has insufficient storage.
// 3. Placement group not satisfy policy.
func isELFScheduleVMErrorRecorded(ctx *context.MachineContext) (bool, string, error) {
lock.Lock()
defer lock.Unlock()
Expand All @@ -59,6 +60,10 @@ func isELFScheduleVMErrorRecorded(ctx *context.MachineContext) (bool, string, er
conditions.MarkFalse(ctx.ElfMachine, infrav1.VMProvisionedCondition, infrav1.WaitingForELFClusterWithSufficientMemoryReason, clusterv1.ConditionSeverityInfo, "")

return true, fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster), nil
} else if resource := getClusterResource(getKeyForInsufficientStorageError(ctx.ElfCluster.Spec.Cluster)); resource != nil {
conditions.MarkFalse(ctx.ElfMachine, infrav1.VMProvisionedCondition, infrav1.WaitingForELFClusterWithSufficientStorageReason, clusterv1.ConditionSeverityInfo, "")

return true, fmt.Sprintf("Insufficient storage detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster), nil
}

placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
Expand All @@ -85,6 +90,16 @@ func recordElfClusterMemoryInsufficient(ctx *context.MachineContext, isInsuffici
}
}

// recordElfClusterStorageInsufficient records whether the storage is insufficient.
func recordElfClusterStorageInsufficient(ctx *context.MachineContext, isError bool) {
key := getKeyForInsufficientStorageError(ctx.ElfCluster.Spec.Cluster)
if isError {
inMemoryCache.Set(key, newClusterResource(), resourceDuration)
} else {
inMemoryCache.Delete(key)
}
}

// recordPlacementGroupPolicyNotSatisfied records whether the placement group not satisfy policy.
func recordPlacementGroupPolicyNotSatisfied(ctx *context.MachineContext, isPGPolicyNotSatisfied bool) error {
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
Expand Down Expand Up @@ -116,7 +131,9 @@ func canRetryVMOperation(ctx *context.MachineContext) (bool, error) {
lock.Lock()
defer lock.Unlock()

if ok := canRetry(getKeyForInsufficientMemoryError(ctx.ElfCluster.Spec.Cluster)); ok {
if ok := canRetry(getKeyForInsufficientStorageError(ctx.ElfCluster.Spec.Cluster)); ok {
return true, nil
} else if ok := canRetry(getKeyForInsufficientMemoryError(ctx.ElfCluster.Spec.Cluster)); ok {
return true, nil
}

Expand Down Expand Up @@ -158,6 +175,10 @@ func getClusterResource(key string) *clusterResource {
return nil
}

func getKeyForInsufficientStorageError(clusterID string) string {
return fmt.Sprintf("insufficient:storage:%s", clusterID)
}

func getKeyForInsufficientMemoryError(clusterID string) string {
return fmt.Sprintf("insufficient:memory:%s", clusterID)
}
Expand Down
31 changes: 24 additions & 7 deletions controllers/tower_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
)

const (
clusterKey = "clusterID"
placementGroupKey = "getPlacementGroupName"
clusterKey = "clusterID"
clusterInsufficientMemoryKey = "clusterInsufficientMemory"
clusterInsufficientStorageKey = "clusterInsufficientStorage"
placementGroupKey = "getPlacementGroupName"
)

var _ = Describe("TowerCache", func() {
Expand All @@ -44,7 +46,7 @@ var _ = Describe("TowerCache", func() {
})

It("should set memoryInsufficient/policyNotSatisfied", func() {
for _, name := range []string{clusterKey, placementGroupKey} {
for _, name := range []string{clusterInsufficientMemoryKey, clusterInsufficientStorageKey, placementGroupKey} {
resetMemoryCache()
elfCluster, cluster, elfMachine, machine, secret := fake.NewClusterAndMachineObjects()
elfCluster.Spec.Cluster = name
Expand Down Expand Up @@ -98,7 +100,7 @@ var _ = Describe("TowerCache", func() {
})

It("should return whether need to detect", func() {
for _, name := range []string{clusterKey, placementGroupKey} {
for _, name := range []string{clusterInsufficientMemoryKey, clusterInsufficientStorageKey, placementGroupKey} {
resetMemoryCache()
elfCluster, cluster, elfMachine, machine, secret := fake.NewClusterAndMachineObjects()
elfCluster.Spec.Cluster = name
Expand Down Expand Up @@ -154,13 +156,23 @@ var _ = Describe("TowerCache", func() {
Expect(err).ShouldNot(HaveOccurred())
expectConditions(elfMachine, []conditionAssertion{})

recordIsUnmet(machineContext, clusterKey, true)
elfCluster.Spec.Cluster = clusterInsufficientMemoryKey
recordIsUnmet(machineContext, clusterInsufficientMemoryKey, true)
ok, msg, err = isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeTrue())
Expect(msg).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))
Expect(err).ShouldNot(HaveOccurred())
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForELFClusterWithSufficientMemoryReason}})

resetMemoryCache()
elfCluster.Spec.Cluster = clusterInsufficientStorageKey
recordIsUnmet(machineContext, clusterInsufficientStorageKey, true)
ok, msg, err = isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeTrue())
Expect(msg).To(ContainSubstring("Insufficient storage detected for the ELF cluster clusterInsufficientStorage"))
Expect(err).ShouldNot(HaveOccurred())
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForELFClusterWithSufficientStorageReason}})

resetMemoryCache()
recordIsUnmet(machineContext, placementGroupKey, true)
ok, msg, err = isELFScheduleVMErrorRecorded(machineContext)
Expand Down Expand Up @@ -220,8 +232,10 @@ func removeGPUVMInfosCache(gpuIDs []string) {
}

func getKey(ctx *context.MachineContext, name string) string {
if name == clusterKey {
if name == clusterInsufficientMemoryKey {
return getKeyForInsufficientMemoryError(name)
} else if name == clusterInsufficientStorageKey {
return getKeyForInsufficientStorageError(name)
}

placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
Expand All @@ -231,9 +245,12 @@ func getKey(ctx *context.MachineContext, name string) string {
}

func recordIsUnmet(ctx *context.MachineContext, key string, isUnmet bool) {
if strings.Contains(key, clusterKey) {
if strings.Contains(key, clusterInsufficientMemoryKey) {
recordElfClusterMemoryInsufficient(ctx, isUnmet)
return
} else if strings.Contains(key, clusterInsufficientStorageKey) {
recordElfClusterStorageInsufficient(ctx, isUnmet)
return
}

Expect(recordPlacementGroupPolicyNotSatisfied(ctx, isUnmet)).ShouldNot(HaveOccurred())
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
LabelAddFailed = "LABEL_ADD_FAILED"
CloudInitError = "VM_CLOUD_INIT_CONFIG_ERROR"
MemoryInsufficientError = "HostAvailableMemoryFilter"
StorageInsufficientError = "EAllocSpace"
PlacementGroupError = "PlacementGroupFilter" // SMTX OS <= 5.0.4
PlacementGroupMustError = "PlacementGroupMustFilter"
PlacementGroupPriorError = "PlacementGroupPriorFilter"
Expand Down Expand Up @@ -111,6 +112,10 @@ func ParseGPUAssignFailed(message string) string {
return message[index:]
}

func IsStorageInsufficientError(message string) bool {
return strings.Contains(message, StorageInsufficientError)

Check warning on line 116 in pkg/service/errors.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/errors.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}

func IsMemoryInsufficientError(message string) bool {
return strings.Contains(message, MemoryInsufficientError)
}
Expand Down

0 comments on commit a8f8d38

Please sign in to comment.