Skip to content

Commit

Permalink
VM Watcher Service improvements
Browse files Browse the repository at this point in the history
When building the initial container MoID list, include the IDs so
that we can match the Add() and Remove() of the same ID. We had a
bug here in that if the Zone was initially added to a NS when at
start up time, any later removal of the Zone from a NS would remove
the watch of that Zone across all namespaces. This allows the watcher
Add() and Remove() to be idempotent which is needed for some infra
Zone controller described later.

Tweak how the infra Zone controller handles the finalizer. The VC
watcher itself is ephemeral since it will go away when the VC client
is closed. But we need the Zone finalizer so that we can remove it
from the watcher when the Zone goes away. Therefore, call Add() or
Remove() on the Zone regardless if the finalizer was already present
or not.

Better handle a Zone with a stale FolderMoID: if a folder does not
exist on VC, the watcher will fail to start. We filter out invalid
container MoIDs to the watcher; the Zone controller will later keep
retrying to add that FolderMoID to the watcher.

Much of change is fallout from 14c1ac3 when we changed the VM
watcher service to not os.Exit() whenever the watcher could not
be started.
  • Loading branch information
Bryan Venteicher committed Jan 14, 2025
1 parent 5bd4b7a commit 90c3aed
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 84 deletions.
62 changes: 32 additions & 30 deletions controllers/infra/zone/zone_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,48 +123,50 @@ func (r *Reconciler) ReconcileDelete(
ctx context.Context,
obj *topologyv1.Zone) (ctrl.Result, error) {

if controllerutil.ContainsFinalizer(obj, Finalizer) {
if val := obj.Spec.ManagedVMs.FolderMoID; val != "" {
if err := watcher.Remove(
ctx,
vimtypes.ManagedObjectReference{
Type: "Folder",
Value: val,
},
fmt.Sprintf("%s/%s", obj.Namespace, obj.Name)); err != nil {

if !errors.Is(err, watcher.ErrAsyncSignalDisabled) {
return ctrl.Result{}, err
}
if val := obj.Spec.ManagedVMs.FolderMoID; val != "" {
if err := watcher.Remove(
ctx,
vimtypes.ManagedObjectReference{
Type: "Folder",
Value: val,
},
fmt.Sprintf("%s/%s", obj.Namespace, obj.Name)); err != nil {

if !errors.Is(err, watcher.ErrAsyncSignalDisabled) {
// We don't ignore watcher.ErrNoWatcher here to interlock with the vm watcher
// service that is in the process of restarting the watcher.
return ctrl.Result{}, err
}
}
controllerutil.RemoveFinalizer(obj, Finalizer)
}
controllerutil.RemoveFinalizer(obj, Finalizer)

return ctrl.Result{}, nil
}

func (r *Reconciler) ReconcileNormal(
ctx context.Context,
obj *topologyv1.Zone) (ctrl.Result, error) {

if !controllerutil.ContainsFinalizer(obj, Finalizer) {
// The finalizer is not added until we are able to successfully
// add a watch to the zone's VM Service folder.
if val := obj.Spec.ManagedVMs.FolderMoID; val != "" {
if err := watcher.Add(
ctx,
vimtypes.ManagedObjectReference{
Type: "Folder",
Value: val,
},
fmt.Sprintf("%s/%s", obj.Namespace, obj.Name)); err != nil {

if !errors.Is(err, watcher.ErrAsyncSignalDisabled) {
return ctrl.Result{}, err
}
if controllerutil.AddFinalizer(obj, Finalizer) {
// Ensure the finalizer is present before watching this zone.
return ctrl.Result{}, nil
}

if val := obj.Spec.ManagedVMs.FolderMoID; val != "" {
if err := watcher.Add(
ctx,
vimtypes.ManagedObjectReference{
Type: "Folder",
Value: val,
},
fmt.Sprintf("%s/%s", obj.Namespace, obj.Name)); err != nil {

if !errors.Is(err, watcher.ErrAsyncSignalDisabled) {
return ctrl.Result{}, err
}
controllerutil.AddFinalizer(obj, Finalizer)
}
}

return ctrl.Result{}, nil
}
3 changes: 1 addition & 2 deletions controllers/infra/zone/zone_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ var _ = Describe(
When("no vms exist in the zone's vm service folder", func() {

const (
vmName = "my-vm-1"
fakeString = "fake"
vmName = "my-vm-1"
)

var (
Expand Down
50 changes: 28 additions & 22 deletions pkg/util/vsphere/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ type Watcher struct {
lv *view.ListView
cv map[moRef]*view.ContainerView

// cvr is used to count the number of times a container has been added to
// the list view. If the Remove function is called on a container with a ref
// count of one, then the container will be removed from the list view and
// cvr is used to keep track of what opaque IDs are using the list view of
// each container. If the Remove function is called on a container with the
// last ID, then the container will be removed from the list view and
// destroyed.
// cvr will have a container moref only if cv does, so cv should be checked
// first.
cvr map[moRef]map[string]struct{}

ignoredExtraConfigKeys map[string]struct{}
Expand Down Expand Up @@ -158,7 +160,7 @@ func newWatcher(
watchedPropertyPaths []string,
additionalIgnoredExtraConfigKeys []string,
lookupNamespacedName lookupNamespacedNameFn,
containerRefs ...moRef) (*Watcher, error) {
containerRefsWithIDs map[moRef][]string) (*Watcher, error) {

if watchedPropertyPaths == nil {
watchedPropertyPaths = DefaultWatchedPropertyPaths()
Expand All @@ -172,7 +174,7 @@ func newWatcher(

// For each container reference, create a container view and add it to
// the list view's initial list of members.
cvs, cvr, err := toContainerViewMap(ctx, vm, containerRefs...)
cvs, cvr, err := toContainerViewMap(ctx, vm, containerRefsWithIDs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,15 +229,15 @@ func (w *Watcher) close() {

// Start begins watching a vSphere server for updates to VM Service managed VMs.
// If watchedPropertyPaths is nil, DefaultWatchedPropertyPaths will be used.
// The containerRefs parameter may be used to start the watcher with an initial
// list of entities to watch.
// The containerRefsWithIDs parameter may be used to start the watcher with an
// initial list of entities to watch.
func Start(
ctx context.Context,
client *vim25.Client,
watchedPropertyPaths []string,
additionalIgnoredExtraConfigKeys []string,
lookupNamespacedName lookupNamespacedNameFn,
containerRefs ...moRef) (*Watcher, error) {
containerRefsWithIDs map[moRef][]string) (*Watcher, error) {

logger := logr.FromContextOrDiscard(ctx).WithName("vSphereWatcher")

Expand All @@ -247,7 +249,7 @@ func Start(
watchedPropertyPaths,
additionalIgnoredExtraConfigKeys,
lookupNamespacedName,
containerRefs...)
containerRefsWithIDs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -512,9 +514,9 @@ func (w *Watcher) remove(_ context.Context, ref moRef, id string) error {
return nil
}

// Only remove the container from the list view if it has a ref count of
// one.
if len(w.cvr[ref]) > 1 {
// Only remove the container from the list view if this ref is the
// last user.
if _, ok := w.cvr[ref][id]; !ok || len(w.cvr[ref]) > 1 {
delete(w.cvr[ref], id)
return nil
}
Expand All @@ -530,41 +532,45 @@ func (w *Watcher) remove(_ context.Context, ref moRef, id string) error {

delete(w.cv, ref)
delete(w.cvr[ref], id)
if len(w.cvr[ref]) == 0 {
delete(w.cvr, ref)
}

return nil
}

func toContainerViewMap(
ctx context.Context,
vm *view.Manager,
containerRefs ...moRef) (map[moRef]*view.ContainerView, map[moRef]map[string]struct{}, error) {
containerRefsWithIDs map[moRef][]string) (map[moRef]*view.ContainerView, map[moRef]map[string]struct{}, error) {

var (
cvMap = map[moRef]*view.ContainerView{}
cvRefsMap = map[moRef]map[string]struct{}{}
)

if len(containerRefs) == 0 {
if len(containerRefsWithIDs) == 0 {
return cvMap, cvRefsMap, nil
}

var resultErr error
for i := range containerRefs {
if _, ok := cvMap[containerRefs[i]]; ok {
// Ignore duplicates.
continue
}
for moref, ids := range containerRefsWithIDs {
cv, err := vm.CreateContainerView(
ctx,
containerRefs[i],
moref,
[]string{virtualMachineType},
true)
if err != nil {
resultErr = err
break
}
cvMap[containerRefs[i]] = cv
cvRefsMap[containerRefs[i]] = map[string]struct{}{}
cvMap[moref] = cv

idSet := make(map[string]struct{}, len(ids))
for _, id := range ids {
idSet[id] = struct{}{}
}
cvRefsMap[moref] = idSet
}

if resultErr != nil {
Expand Down
73 changes: 57 additions & 16 deletions pkg/util/vsphere/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"net/url"
"os"
"strconv"
"sync"
"time"

Expand All @@ -33,6 +34,10 @@ const (
fakeStr = "fake"
)

func idStr(i int) string {
return "id-" + strconv.Itoa(i)
}

var _ = Describe("Start", func() {
var (
ctx context.Context
Expand Down Expand Up @@ -173,7 +178,10 @@ var _ = Describe("Start", func() {
}
return watcher.LookupNamespacedNameResult{}
},
cluster1.Reference())
map[vimtypes.ManagedObjectReference][]string{
cluster1.Reference(): {idStr(0)},
},
)
Expect(err).ToNot(HaveOccurred())
Expect(w).ToNot(BeNil())
})
Expand Down Expand Up @@ -278,6 +286,16 @@ var _ = Describe("Start", func() {
})
})

When("removal of container that does not exist", func() {
Specify("no error or results", func() {
moRef := cluster1vm1.Reference()
moRef.Value = "bogus"
Expect(watcher.Remove(ctx, moRef, idStr(0))).To(Succeed())
assertNoError()
assertNoResult()
})
})

When("one vm has namespace/name information", func() {
BeforeEach(func() {
addNamespaceName(cluster1vm1, "my-namespace-1", "my-name-1")
Expand Down Expand Up @@ -410,7 +428,7 @@ var _ = Describe("Start", func() {
assertNoResult()

// Remove the cluster from the scope of the watcher.
Expect(watcher.Remove(ctx, cluster1.Reference(), fakeStr)).To(Succeed())
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(0))).To(Succeed())

// Add a non-ignored ExtraConfig key.
t, err := cluster1vm1.Reconfigure(
Expand All @@ -436,14 +454,16 @@ var _ = Describe("Start", func() {

// Assert no more results are signaled.
assertNoResult()

// Duplicate remove should noop.
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(0))).To(Succeed())
})
})

When("the container has multiple adds", func() {
JustBeforeEach(func() {
Expect(watcher.Add(ctx, cluster1.Reference(), fakeStr+"1")).To(Succeed())
Expect(watcher.Add(ctx, cluster1.Reference(), fakeStr+"2")).To(Succeed())
Expect(watcher.Add(ctx, cluster1.Reference(), fakeStr+"3")).To(Succeed())
Expect(watcher.Add(ctx, cluster1.Reference(), idStr(1))).To(Succeed())
Expect(watcher.Add(ctx, cluster1.Reference(), idStr(2))).To(Succeed())
})
Specify("it should need an equal number of removes before it stops being watched", func() {
// Assert that a result is signaled due to the VM entering the
Expand All @@ -454,16 +474,16 @@ var _ = Describe("Start", func() {
assertNoResult()

// Remove the cluster from the scope of the watcher.
Expect(watcher.Remove(ctx, cluster1.Reference(), fakeStr+"1")).To(Succeed())
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(0))).To(Succeed())

// Add a non-ignored ExtraConfig key.
t, err := cluster1vm1.Reconfigure(
ctx,
vimtypes.VirtualMachineConfigSpec{
ExtraConfig: []vimtypes.BaseOptionValue{
&vimtypes.OptionValue{
Key: "guestinfo.1",
Value: "1",
Key: "guestinfo.0",
Value: "0",
},
},
},
Expand All @@ -482,15 +502,15 @@ var _ = Describe("Start", func() {
assertNoResult()

//
// Do this again.
// Do this again with next ID.
//
Expect(watcher.Remove(ctx, cluster1.Reference(), fakeStr+"2")).To(Succeed())
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(1))).To(Succeed())
t, err = cluster1vm1.Reconfigure(
ctx,
vimtypes.VirtualMachineConfigSpec{
ExtraConfig: []vimtypes.BaseOptionValue{
&vimtypes.OptionValue{
Key: "guestinfo.2",
Key: "guestinfo.1",
Value: "1",
},
},
Expand All @@ -503,16 +523,37 @@ var _ = Describe("Start", func() {
assertNoResult()

//
// Remove the container a third time, and it should finally be
// be removed from the watch.
// Do it again with the same ID. This should be a noop so we still expect events.
//
Expect(watcher.Remove(ctx, cluster1.Reference(), fakeStr+"3")).To(Succeed())
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(1))).To(Succeed())
t, err = cluster1vm1.Reconfigure(
ctx,
vimtypes.VirtualMachineConfigSpec{
ExtraConfig: []vimtypes.BaseOptionValue{
&vimtypes.OptionValue{
Key: "guestinfo.3",
Key: "guestinfo.1",
Value: "2",
},
},
},
)
ExpectWithOffset(1, err).ShouldNot(HaveOccurred())
ExpectWithOffset(1, t.WaitEx(ctx)).To(Succeed())
assertNoError()
assertResult(cluster1vm1, "my-namespace-1", "my-name-1")
assertNoResult()

//
// Remove the container with the final ID, and it should finally be
// removed from the watch.
//
Expect(watcher.Remove(ctx, cluster1.Reference(), idStr(2))).To(Succeed())
t, err = cluster1vm1.Reconfigure(
ctx,
vimtypes.VirtualMachineConfigSpec{
ExtraConfig: []vimtypes.BaseOptionValue{
&vimtypes.OptionValue{
Key: "guestinfo.2",
Value: "1",
},
},
Expand Down Expand Up @@ -597,7 +638,7 @@ var _ = Describe("Start", func() {

When("the container is added to the watcher", func() {
JustBeforeEach(func() {
Expect(watcher.Add(ctx, cluster2.Reference(), fakeStr)).To(Succeed())
Expect(watcher.Add(ctx, cluster2.Reference(), idStr(0))).To(Succeed())
})
When("the lookup function returns verified=false", func() {
Specify("the result channel should receive a result", func() {
Expand Down
Loading

0 comments on commit 90c3aed

Please sign in to comment.