diff --git a/controllers/virtualmachine/virtualmachine/virtualmachine_controller.go b/controllers/virtualmachine/virtualmachine/virtualmachine_controller.go index f22e8df01..9bf74a352 100644 --- a/controllers/virtualmachine/virtualmachine/virtualmachine_controller.go +++ b/controllers/virtualmachine/virtualmachine/virtualmachine_controller.go @@ -334,8 +334,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re }() if !vm.DeletionTimestamp.IsZero() { - err = r.ReconcileDelete(vmCtx) - return ctrl.Result{}, err + return ctrl.Result{}, r.ReconcileDelete(vmCtx) } if err = r.ReconcileNormal(vmCtx); err != nil && !ignoredCreateErr(err) { @@ -574,7 +573,7 @@ func getIsDefaultVMClassController(ctx context.Context) bool { // ignoredCreateErr is written this way in order to illustrate coverage more // accurately. func ignoredCreateErr(err error) bool { - if errors.Is(err, providers.ErrDuplicateCreate) { + if errors.Is(err, providers.ErrReconcileInProgress) { return true } if errors.Is(err, providers.ErrTooManyCreates) { diff --git a/controllers/virtualmachine/virtualmachine/virtualmachine_controller_intg_test.go b/controllers/virtualmachine/virtualmachine/virtualmachine_controller_intg_test.go index 46a7db8e6..c5ba056eb 100644 --- a/controllers/virtualmachine/virtualmachine/virtualmachine_controller_intg_test.go +++ b/controllers/virtualmachine/virtualmachine/virtualmachine_controller_intg_test.go @@ -168,7 +168,7 @@ func intgTestsReconcile() { intgFakeVMProvider, func(ctx context.Context, vm *vmopv1.VirtualMachine) error { atomic.AddInt32(&createAttempts, 1) - return providers.ErrDuplicateCreate + return providers.ErrReconcileInProgress }, ) }) diff --git a/pkg/providers/vm_provider_interface.go b/pkg/providers/vm_provider_interface.go index cbd5cffa2..6e08ba4d3 100644 --- a/pkg/providers/vm_provider_interface.go +++ b/pkg/providers/vm_provider_interface.go @@ -24,10 +24,10 @@ var ( // threads/goroutines have reached the allowed limit. ErrTooManyCreates = errors.New("too many creates") - // ErrDuplicateCreate is returned from the CreateOrUpdateVirtualMachineAsync - // function if it is called for a VM while a create goroutine for that VM is - // already executing. - ErrDuplicateCreate = errors.New("duplicate create") + // ErrReconcileInProgress is returned from the + // CreateOrUpdateVirtualMachine and DeleteVirtualMachine functions when + // the VM is still being reconciled in a background thread. + ErrReconcileInProgress = errors.New("reconcile already in progress") ) // VirtualMachineProviderInterface is a pluggable interface for VM Providers. diff --git a/pkg/providers/vsphere/vmprovider_vm.go b/pkg/providers/vsphere/vmprovider_vm.go index c46e0c80c..63e1e8287 100644 --- a/pkg/providers/vsphere/vmprovider_vm.go +++ b/pkg/providers/vsphere/vmprovider_vm.go @@ -82,9 +82,9 @@ var ( createCountLock sync.Mutex concurrentCreateCount int - // currentlyCreating tracks the VMs currently being created in a + // currentlyReconciling tracks the VMs currently being created in a // non-blocking goroutine. - currentlyCreating sync.Map + currentlyReconciling sync.Map // SkipVMImageCLProviderCheck skips the checks that a VM Image has a Content Library item provider // since a VirtualMachineImage created for a VM template won't have either. This has been broken for @@ -112,6 +112,14 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( vm *vmopv1.VirtualMachine, async bool) (chan error, error) { + vmNamespacedName := vm.NamespacedName() + + if _, ok := currentlyReconciling.Load(vmNamespacedName); ok { + // Do not process the VM again if it is already being reconciled in a + // goroutine. + return nil, providers.ErrReconcileInProgress + } + vmCtx := pkgctx.VirtualMachineContext{ Context: context.WithValue( ctx, @@ -192,16 +200,14 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( createArgs) } - vmNamespacedName := vm.NamespacedName() - - if _, ok := currentlyCreating.LoadOrStore(vmNamespacedName, struct{}{}); ok { + if _, ok := currentlyReconciling.LoadOrStore(vmNamespacedName, struct{}{}); ok { // If the VM is already being created in a goroutine, then there is no // need to create it again. // // However, we need to make sure we decrement the number of concurrent // creates before returning. cleanupFn() - return nil, providers.ErrDuplicateCreate + return nil, providers.ErrReconcileInProgress } vmCtx.Logger.V(4).Info("Doing a non-blocking create") @@ -209,7 +215,7 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( // Update the cleanup function to include indicating a concurrent create is // no longer occurring. cleanupFn = func() { - currentlyCreating.Delete(vmNamespacedName) + currentlyReconciling.Delete(vmNamespacedName) decrementConcurrentCreatesFn() } @@ -245,9 +251,17 @@ func (vs *vSphereVMProvider) DeleteVirtualMachine( ctx context.Context, vm *vmopv1.VirtualMachine) error { + vmNamespacedName := vm.NamespacedName() + + if _, ok := currentlyReconciling.Load(vmNamespacedName); ok { + // If the VM is already being reconciled in a goroutine then it cannot + // be deleted yet. + return providers.ErrReconcileInProgress + } + vmCtx := pkgctx.VirtualMachineContext{ Context: context.WithValue(ctx, vimtypes.ID{}, vs.getOpID(vm, "deleteVM")), - Logger: log.WithValues("vmName", vm.NamespacedName()), + Logger: log.WithValues("vmName", vmNamespacedName), VM: vm, } diff --git a/pkg/providers/vsphere/vmprovider_vm_test.go b/pkg/providers/vsphere/vmprovider_vm_test.go index b967671ad..66b060134 100644 --- a/pkg/providers/vsphere/vmprovider_vm_test.go +++ b/pkg/providers/vsphere/vmprovider_vm_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "math/rand" "sync" @@ -1361,6 +1362,109 @@ func vmTests() { // TODO: More assertions! }) + When("using async create", func() { + BeforeEach(func() { + pkgcfg.SetContext(parentCtx, func(config *pkgcfg.Config) { + config.AsyncCreateDisabled = false + config.AsyncSignalDisabled = false + config.Features.WorkloadDomainIsolation = true + }) + }) + JustBeforeEach(func() { + pkgcfg.SetContext(ctx, func(config *pkgcfg.Config) { + config.MaxDeployThreadsOnProvider = 16 + }) + }) + + It("should succeed", func() { + Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed()) + Expect(vm.Status.UniqueID).ToNot(BeEmpty()) + }) + + When("there is an error getting the pre-reqs", func() { + It("should not prevent a subsequent create attempt from going through", func() { + imgName := vm.Spec.Image.Name + vm.Spec.Image.Name = "does-not-exist" + err := createOrUpdateVM(ctx, vmProvider, vm) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError( + "clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found: " + + "clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found")) + vm.Spec.Image.Name = imgName + Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed()) + Expect(vm.Status.UniqueID).ToNot(BeEmpty()) + }) + }) + + // Please note this test uses FlakeAttempts(5) due to the + // validation of some predictable-over-time behavior. + When("there is a reconcile in progress", FlakeAttempts(5), func() { + When("there is a duplicate create", func() { + It("should return ErrReconcileInProgress", func() { + var ( + errs []error + errsMu sync.Mutex + done sync.WaitGroup + start = make(chan struct{}) + ) + + // Set up five goroutines that race to + // create the VM first. + for i := 0; i < 5; i++ { + done.Add(1) + go func(copyOfVM *vmopv1.VirtualMachine) { + defer done.Done() + <-start + err := createOrUpdateVM(ctx, vmProvider, copyOfVM) + if err != nil { + errsMu.Lock() + errs = append(errs, err) + errsMu.Unlock() + } else { + vm = copyOfVM + } + }(vm.DeepCopy()) + } + + close(start) + + done.Wait() + + Expect(errs).To(HaveLen(4)) + + Expect(errs).Should(ConsistOf( + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, + )) + + Expect(vm.Status.UniqueID).ToNot(BeEmpty()) + }) + }) + + When("there is a delete during async create", func() { + It("should return ErrReconcileInProgress", func() { + chanCreateErrs, createErr := vmProvider.CreateOrUpdateVirtualMachineAsync(ctx, vm) + deleteErr := vmProvider.DeleteVirtualMachine(ctx, vm) + + Expect(createErr).ToNot(HaveOccurred()) + Expect(errors.Is(deleteErr, providers.ErrReconcileInProgress)) + + var createErrs []error + for e := range chanCreateErrs { + if e != nil { + createErrs = append(createErrs, e) + } + } + Expect(createErrs).Should(BeEmpty()) + + Expect(vmProvider.DeleteVirtualMachine(ctx, vm)).To(Succeed()) + }) + }) + }) + }) + It("TKG VM", func() { if vm.Labels == nil { vm.Labels = make(map[string]string) @@ -1528,7 +1632,7 @@ func vmTests() { config.MaxDeployThreadsOnProvider = 16 }) }) - It("should return ErrDuplicateCreate", func() { + It("should return ErrReconcileInProgress", func() { var ( errs []error errsMu sync.Mutex @@ -1561,10 +1665,10 @@ func vmTests() { Expect(errs).To(HaveLen(4)) Expect(errs).Should(ConsistOf( - providers.ErrDuplicateCreate, - providers.ErrDuplicateCreate, - providers.ErrDuplicateCreate, - providers.ErrDuplicateCreate, + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, + providers.ErrReconcileInProgress, )) Expect(vm.Status.Crypto).ToNot(BeNil())