Skip to content

Commit

Permalink
Background reconcile awareness support
Browse files Browse the repository at this point in the history
This patch adds support to the VirtualMachine controller for
background reconciliation awareness. Rather, the controller is
now aware if a VM is already being reconciled in a background
thread due to an async operation and will not allow the VM to be
reconciled again until the async operation is complete.
  • Loading branch information
akutz committed Jan 13, 2025
1 parent 0936429 commit 492155c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}()

if !vm.DeletionTimestamp.IsZero() {
err = r.ReconcileDelete(vmCtx)
return ctrl.Result{}, err
return ctrl.Result{}, r.ReconcileDelete(vmCtx)
}

if err = r.ReconcileNormal(vmCtx); err != nil && !ignoredCreateErr(err) {
Expand Down Expand Up @@ -574,7 +573,7 @@ func getIsDefaultVMClassController(ctx context.Context) bool {
// ignoredCreateErr is written this way in order to illustrate coverage more
// accurately.
func ignoredCreateErr(err error) bool {
if errors.Is(err, providers.ErrDuplicateCreate) {
if errors.Is(err, providers.ErrReconcileInProgress) {
return true
}
if errors.Is(err, providers.ErrTooManyCreates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func intgTestsReconcile() {
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrDuplicateCreate
return providers.ErrReconcileInProgress
},
)
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/providers/vm_provider_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ var (
// threads/goroutines have reached the allowed limit.
ErrTooManyCreates = errors.New("too many creates")

// ErrDuplicateCreate is returned from the CreateOrUpdateVirtualMachineAsync
// function if it is called for a VM while a create goroutine for that VM is
// already executing.
ErrDuplicateCreate = errors.New("duplicate create")
// ErrReconcileInProgress is returned from the
// CreateOrUpdateVirtualMachine and DeleteVirtualMachine functions when
// the VM is still being reconciled in a background thread.
ErrReconcileInProgress = errors.New("reconcile already in progress")
)

// VirtualMachineProviderInterface is a pluggable interface for VM Providers.
Expand Down
30 changes: 22 additions & 8 deletions pkg/providers/vsphere/vmprovider_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ var (
createCountLock sync.Mutex
concurrentCreateCount int

// currentlyCreating tracks the VMs currently being created in a
// currentlyReconciling tracks the VMs currently being created in a
// non-blocking goroutine.
currentlyCreating sync.Map
currentlyReconciling sync.Map

// SkipVMImageCLProviderCheck skips the checks that a VM Image has a Content Library item provider
// since a VirtualMachineImage created for a VM template won't have either. This has been broken for
Expand Down Expand Up @@ -112,6 +112,14 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
vm *vmopv1.VirtualMachine,
async bool) (chan error, error) {

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyReconciling.Load(vmNamespacedName); ok {
// Do not process the VM again if it is already being reconciled in a
// goroutine.
return nil, providers.ErrReconcileInProgress
}

vmCtx := pkgctx.VirtualMachineContext{
Context: context.WithValue(
ctx,
Expand Down Expand Up @@ -192,24 +200,22 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
createArgs)
}

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyCreating.LoadOrStore(vmNamespacedName, struct{}{}); ok {
if _, ok := currentlyReconciling.LoadOrStore(vmNamespacedName, struct{}{}); ok {
// If the VM is already being created in a goroutine, then there is no
// need to create it again.
//
// However, we need to make sure we decrement the number of concurrent
// creates before returning.
cleanupFn()
return nil, providers.ErrDuplicateCreate
return nil, providers.ErrReconcileInProgress
}

vmCtx.Logger.V(4).Info("Doing a non-blocking create")

// Update the cleanup function to include indicating a concurrent create is
// no longer occurring.
cleanupFn = func() {
currentlyCreating.Delete(vmNamespacedName)
currentlyReconciling.Delete(vmNamespacedName)
decrementConcurrentCreatesFn()
}

Expand Down Expand Up @@ -245,9 +251,17 @@ func (vs *vSphereVMProvider) DeleteVirtualMachine(
ctx context.Context,
vm *vmopv1.VirtualMachine) error {

vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyReconciling.Load(vmNamespacedName); ok {
// If the VM is already being reconciled in a goroutine then it cannot
// be deleted yet.
return providers.ErrReconcileInProgress
}

vmCtx := pkgctx.VirtualMachineContext{
Context: context.WithValue(ctx, vimtypes.ID{}, vs.getOpID(vm, "deleteVM")),
Logger: log.WithValues("vmName", vm.NamespacedName()),
Logger: log.WithValues("vmName", vmNamespacedName),
VM: vm,
}

Expand Down
125 changes: 120 additions & 5 deletions pkg/providers/vsphere/vmprovider_vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -1361,6 +1362,120 @@ func vmTests() {
// TODO: More assertions!
})

When("using async create", func() {
BeforeEach(func() {
pkgcfg.SetContext(parentCtx, func(config *pkgcfg.Config) {
config.AsyncCreateDisabled = false
config.AsyncSignalDisabled = false
config.Features.WorkloadDomainIsolation = true
})
})
It("should succeed", func() {
Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed())
Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})

// Please note this test uses FlakeAttempts(5) due to the
// validation of some predictable-over-time behavior.
When("there is a duplicate create", FlakeAttempts(5), func() {
JustBeforeEach(func() {
pkgcfg.SetContext(ctx, func(config *pkgcfg.Config) {
config.MaxDeployThreadsOnProvider = 16
})
})
It("should return ErrReconcileInProgress", func() {
var (
errs []error
errsMu sync.Mutex
done sync.WaitGroup
start = make(chan struct{})
)

// Set up five goroutines that race to
// create the VM first.
for i := 0; i < 5; i++ {
done.Add(1)
go func(copyOfVM *vmopv1.VirtualMachine) {
defer done.Done()
<-start
err := createOrUpdateVM(ctx, vmProvider, copyOfVM)
if err != nil {
errsMu.Lock()
errs = append(errs, err)
errsMu.Unlock()
} else {
vm = copyOfVM
}
}(vm.DeepCopy())
}

close(start)

done.Wait()

Expect(errs).To(HaveLen(4))

Expect(errs).Should(ConsistOf(
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
))

Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})
})

// Please note this test uses FlakeAttempts(5) due to the
// validation of some predictable-over-time behavior.
When("there is a delete during async create", FlakeAttempts(5), func() {
JustBeforeEach(func() {
pkgcfg.SetContext(ctx, func(config *pkgcfg.Config) {
config.MaxDeployThreadsOnProvider = 16
})

})
It("should return ErrReconcileInProgress", func() {
chanCreateErrs, createErr := vmProvider.CreateOrUpdateVirtualMachineAsync(ctx, vm)
deleteErr := vmProvider.DeleteVirtualMachine(ctx, vm)

Expect(createErr).ToNot(HaveOccurred())
Expect(errors.Is(deleteErr, providers.ErrReconcileInProgress))

var createErrs []error
for e := range chanCreateErrs {
if e != nil {
createErrs = append(createErrs, e)
}
}
Expect(createErrs).Should(BeEmpty())

Expect(vmProvider.DeleteVirtualMachine(ctx, vm)).To(Succeed())
})
})

When("there is an error getting the pre-reqs", func() {
JustBeforeEach(func() {
pkgcfg.SetContext(ctx, func(config *pkgcfg.Config) {
config.MaxDeployThreadsOnProvider = 16
})

})
It("should not prevent a subsequent create attempt from going through", func() {
imgName := vm.Spec.Image.Name
vm.Spec.Image.Name = "does-not-exist"
err := createOrUpdateVM(ctx, vmProvider, vm)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(
"clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found: " +
"clustervirtualmachineimages.vmoperator.vmware.com \"does-not-exist\" not found"))
vm.Spec.Image.Name = imgName
Expect(createOrUpdateVM(ctx, vmProvider, vm)).To(Succeed())
Expect(vm.Status.UniqueID).ToNot(BeEmpty())
})
})
})

It("TKG VM", func() {
if vm.Labels == nil {
vm.Labels = make(map[string]string)
Expand Down Expand Up @@ -1528,7 +1643,7 @@ func vmTests() {
config.MaxDeployThreadsOnProvider = 16
})
})
It("should return ErrDuplicateCreate", func() {
It("should return ErrReconcileInProgress", func() {
var (
errs []error
errsMu sync.Mutex
Expand Down Expand Up @@ -1561,10 +1676,10 @@ func vmTests() {
Expect(errs).To(HaveLen(4))

Expect(errs).Should(ConsistOf(
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrDuplicateCreate,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
providers.ErrReconcileInProgress,
))

Expect(vm.Status.Crypto).ToNot(BeNil())
Expand Down

0 comments on commit 492155c

Please sign in to comment.