Skip to content

Commit

Permalink
Merge pull request #856 from akutz/fix/do-not-reconcile-in-progress-i…
Browse files Browse the repository at this point in the history
…tems
  • Loading branch information
akutz authored Jan 13, 2025
2 parents 92bec24 + 469de67 commit d895ca6
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}()

if !vm.DeletionTimestamp.IsZero() {
err = r.ReconcileDelete(vmCtx)
return ctrl.Result{}, err
return ctrl.Result{}, r.ReconcileDelete(vmCtx)
}

if err = r.ReconcileNormal(vmCtx); err != nil && !ignoredCreateErr(err) {
Expand Down Expand Up @@ -574,7 +573,7 @@ func getIsDefaultVMClassController(ctx context.Context) bool {
// ignoredCreateErr is written this way in order to illustrate coverage more
// accurately.
func ignoredCreateErr(err error) bool {
if errors.Is(err, providers.ErrDuplicateCreate) {
if errors.Is(err, providers.ErrReconcileInProgress) {
return true
}
if errors.Is(err, providers.ErrTooManyCreates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func intgTestsReconcile() {
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrDuplicateCreate
return providers.ErrReconcileInProgress
},
)
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/providers/vm_provider_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ var (
// threads/goroutines have reached the allowed limit.
ErrTooManyCreates = errors.New("too many creates")

// ErrDuplicateCreate is returned from the CreateOrUpdateVirtualMachineAsync
// function if it is called for a VM while a create goroutine for that VM is
// already executing.
ErrDuplicateCreate = errors.New("duplicate create")
// ErrReconcileInProgress is returned from the
// CreateOrUpdateVirtualMachine and DeleteVirtualMachine functions when
// the VM is still being reconciled in a background thread.
ErrReconcileInProgress = errors.New("reconcile already in progress")
)

// VirtualMachineProviderInterface is a pluggable interface for VM Providers.
Expand Down
30 changes: 22 additions & 8 deletions pkg/providers/vsphere/vmprovider_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ var (
createCountLock sync.Mutex
concurrentCreateCount int

// currentlyCreating tracks the VMs currently being created in a
// currentlyReconciling tracks the VMs currently being created in a
// non-blocking goroutine.
currentlyCreating sync.Map
currentlyReconciling sync.Map

// SkipVMImageCLProviderCheck skips the checks that a VM Image has a Content Library item provider
// since a VirtualMachineImage created for a VM template won't have either. This has been broken for
Expand Down Expand Up @@ -112,6 +112,14 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
vm *vmopv1.VirtualMachine,
async bool) (chan error, error) {

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyReconciling.Load(vmNamespacedName); ok {
// Do not process the VM again if it is already being reconciled in a
// goroutine.
return nil, providers.ErrReconcileInProgress
}

vmCtx := pkgctx.VirtualMachineContext{
Context: context.WithValue(
ctx,
Expand Down Expand Up @@ -192,24 +200,22 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
createArgs)
}

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyCreating.LoadOrStore(vmNamespacedName, struct{}{}); ok {
if _, ok := currentlyReconciling.LoadOrStore(vmNamespacedName, struct{}{}); ok {
// If the VM is already being created in a goroutine, then there is no
// need to create it again.
//
// However, we need to make sure we decrement the number of concurrent
// creates before returning.
cleanupFn()
return nil, providers.ErrDuplicateCreate
return nil, providers.ErrReconcileInProgress
}

vmCtx.Logger.V(4).Info("Doing a non-blocking create")

// Update the cleanup function to include indicating a concurrent create is
// no longer occurring.
cleanupFn = func() {
currentlyCreating.Delete(vmNamespacedName)
currentlyReconciling.Delete(vmNamespacedName)
decrementConcurrentCreatesFn()
}

Expand Down Expand Up @@ -245,9 +251,17 @@ func (vs *vSphereVMProvider) DeleteVirtualMachine(
ctx context.Context,
vm *vmopv1.VirtualMachine) error {

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyReconciling.Load(vmNamespacedName); ok {
// If the VM is already being reconciled in a goroutine then it cannot
// be deleted yet.
return providers.ErrReconcileInProgress
}

vmCtx := pkgctx.VirtualMachineContext{
Context: context.WithValue(ctx, vimtypes.ID{}, vs.getOpID(vm, "deleteVM")),
Logger: log.WithValues("vmName", vm.NamespacedName()),
Logger: log.WithValues("vmName", vmNamespacedName),
VM: vm,
}

Expand Down
114 changes: 109 additions & 5 deletions pkg/providers/vsphere/vmprovider_vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -1361,6 +1362,109 @@ func vmTests() {
// TODO: More assertions!
})

When("using async create", func() {
BeforeEach(func() {
pkgcfg.SetContext(parentCtx, func(config *pkgcfg.Config) {
config.AsyncCreateDisabled = false
config.AsyncSignalDisabled = false
config.Features.WorkloadDomainIsolation = true
})
})
JustBeforeEach(func() {
pkgcfg.SetContext(ctx, func(config *pkgcfg.Config) {
config.MaxDeployThreadsOnProvider = 16
})
})

It("should succeed", func() {
Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed())
Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})

When("there is an error getting the pre-reqs", func() {
It("should not prevent a subsequent create attempt from going through", func() {
imgName := vm.Spec.Image.Name
vm.Spec.Image.Name = "does-not-exist"
err := createOrUpdateVM(ctx, vmProvider, vm)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(
"clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found: " +
"clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found"))
vm.Spec.Image.Name = imgName
Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed())
Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})
})

// Please note this test uses FlakeAttempts(5) due to the
// validation of some predictable-over-time behavior.
When("there is a reconcile in progress", FlakeAttempts(5), func() {
When("there is a duplicate create", func() {
It("should return ErrReconcileInProgress", func() {
var (
errs []error
errsMu sync.Mutex
done sync.WaitGroup
start = make(chan struct{})
)

// Set up five goroutines that race to
// create the VM first.
for i := 0; i < 5; i++ {
done.Add(1)
go func(copyOfVM *vmopv1.VirtualMachine) {
defer done.Done()
<-start
err := createOrUpdateVM(ctx, vmProvider, copyOfVM)
if err != nil {
errsMu.Lock()
errs = append(errs, err)
errsMu.Unlock()
} else {
vm = copyOfVM
}
}(vm.DeepCopy())
}

close(start)

done.Wait()

Expect(errs).To(HaveLen(4))

Expect(errs).Should(ConsistOf(
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
))

Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})
})

When("there is a delete during async create", func() {
It("should return ErrReconcileInProgress", func() {
chanCreateErrs, createErr := vmProvider.CreateOrUpdateVirtualMachineAsync(ctx, vm)
deleteErr := vmProvider.DeleteVirtualMachine(ctx, vm)

Expect(createErr).ToNot(HaveOccurred())
Expect(errors.Is(deleteErr, providers.ErrReconcileInProgress))

var createErrs []error
for e := range chanCreateErrs {
if e != nil {
createErrs = append(createErrs, e)
}
}
Expect(createErrs).Should(BeEmpty())

Expect(vmProvider.DeleteVirtualMachine(ctx, vm)).To(Succeed())
})
})
})
})

It("TKG VM", func() {
if vm.Labels == nil {
vm.Labels = make(map[string]string)
Expand Down Expand Up @@ -1528,7 +1632,7 @@ func vmTests() {
config.MaxDeployThreadsOnProvider = 16
})
})
It("should return ErrDuplicateCreate", func() {
It("should return ErrReconcileInProgress", func() {
var (
errs []error
errsMu sync.Mutex
Expand Down Expand Up @@ -1561,10 +1665,10 @@ func vmTests() {
Expect(errs).To(HaveLen(4))

Expect(errs).Should(ConsistOf(
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
))

Expect(vm.Status.Crypto).ToNot(BeNil())
Expand Down

0 comments on commit d895ca6

Please sign in to comment.