From ac605b019581da0d6e5a17b947bb0933e588df3a Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Mon, 22 Apr 2019 15:47:34 -0700 Subject: [PATCH] Factor unmanaged dispatcher out of the managed dispatcher This will enable reuse for deletion handling. The primary difference between the 2 contexts is that events cannot be recorded against a federated resource if it is marked for deletion, so logging and error handling need to be used instead. --- pkg/controller/sync/dispatch/managed.go | 53 ++------- pkg/controller/sync/dispatch/operation.go | 9 +- pkg/controller/sync/dispatch/unmanaged.go | 129 ++++++++++++++++++++++ 3 files changed, 147 insertions(+), 44 deletions(-) create mode 100644 pkg/controller/sync/dispatch/unmanaged.go diff --git a/pkg/controller/sync/dispatch/managed.go b/pkg/controller/sync/dispatch/managed.go index 1b9904bb25..de3a3671f5 100644 --- a/pkg/controller/sync/dispatch/managed.go +++ b/pkg/controller/sync/dispatch/managed.go @@ -45,11 +45,9 @@ type FederatedResourceForDispatch interface { // ManagedDispatcher dispatches operations to member clusters for resources // managed by a federated resource. type ManagedDispatcher interface { - OperationDispatcher + UnmanagedDispatcher Create(clusterName string) - Delete(clusterName string) - RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured) Update(clusterName string, clusterObj *unstructured.Unstructured) VersionMap() map[string]string } @@ -57,9 +55,10 @@ type ManagedDispatcher interface { type managedDispatcherImpl struct { sync.RWMutex - dispatcher *operationDispatcherImpl - fedResource FederatedResourceForDispatch - versionMap map[string]string + dispatcher *operationDispatcherImpl + unmanagedDispatcher *unmanagedDispatcherImpl + fedResource FederatedResourceForDispatch + versionMap map[string]string } func NewManagedDispatcher(clientAccessor clientAccessorFunc, fedResource FederatedResourceForDispatch) ManagedDispatcher { @@ -68,6 +67,7 @@ func NewManagedDispatcher(clientAccessor clientAccessorFunc, fedResource Federat versionMap: make(map[string]string), } d.dispatcher = newOperationDispatcher(clientAccessor, d) + d.unmanagedDispatcher = newUnmanagedDispatcher(d.dispatcher, d, fedResource.TargetKind(), fedResource.TargetName()) return d } @@ -157,56 +157,23 @@ func (d *managedDispatcherImpl) Update(clusterName string, clusterObj *unstructu } func (d *managedDispatcherImpl) Delete(clusterName string) { - d.dispatcher.incrementOperationsInitiated() - const op = "delete" - const opContinuous = "Deleting" - go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus { - d.RecordEvent(clusterName, op, opContinuous) - - qualifiedName := d.fedResource.TargetName() - err := client.Resources(qualifiedName.Namespace).Delete(qualifiedName.Name, &metav1.DeleteOptions{}) - if apierrors.IsNotFound(err) { - err = nil - } - if err != nil { - d.RecordError(clusterName, op, err) - return util.StatusError - } - return util.StatusAllOK - }) + d.unmanagedDispatcher.Delete(clusterName) } func (d *managedDispatcherImpl) RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured) { - d.dispatcher.incrementOperationsInitiated() - const op = "remove managed label from" - const opContinuous = "Removing managed label from" - go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus { - d.RecordEvent(clusterName, op, opContinuous) - - // Avoid mutating the resource in the informer cache - updateObj := clusterObj.DeepCopy() - - util.RemoveManagedLabel(updateObj) - - _, err := client.Resources(updateObj.GetNamespace()).Update(updateObj, metav1.UpdateOptions{}) - if err != nil { - d.RecordError(clusterName, op, err) - return util.StatusError - } - return util.StatusAllOK - }) + d.unmanagedDispatcher.RemoveManagedLabel(clusterName, clusterObj) } func (d *managedDispatcherImpl) RecordError(clusterName, operation string, err error) { args := []interface{}{operation, d.fedResource.TargetKind(), d.fedResource.TargetName(), clusterName} eventType := fmt.Sprintf("%sInClusterFailed", strings.Replace(strings.Title(operation), " ", "", -1)) - d.fedResource.RecordError(eventType, errors.Wrapf(err, "Failed to %s %s %q in cluster %q", args...)) + d.fedResource.RecordError(eventType, errors.Wrapf(err, "Failed to "+eventTemplate, args...)) } func (d *managedDispatcherImpl) RecordEvent(clusterName, operation, operationContinuous string) { args := []interface{}{operationContinuous, d.fedResource.TargetKind(), d.fedResource.TargetName(), clusterName} eventType := fmt.Sprintf("%sInCluster", strings.Replace(strings.Title(operation), " ", "", -1)) - d.fedResource.RecordEvent(eventType, "%s %s %q in cluster %q", args...) + d.fedResource.RecordEvent(eventType, eventTemplate, args...) } func (d *managedDispatcherImpl) VersionMap() map[string]string { diff --git a/pkg/controller/sync/dispatch/operation.go b/pkg/controller/sync/dispatch/operation.go index a29ab78e94..0147e9e4f9 100644 --- a/pkg/controller/sync/dispatch/operation.go +++ b/pkg/controller/sync/dispatch/operation.go @@ -22,6 +22,8 @@ import ( "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "github.com/kubernetes-sigs/federation-v2/pkg/controller/util" ) @@ -29,6 +31,7 @@ type clientAccessorFunc func(clusterName string) (util.ResourceClient, error) type DispatchRecorder interface { RecordError(clusterName, operation string, err error) + RecordEvent(clusterName, operation, operationContinuous string) } // OperationDispatcher provides an interface to wait for operations @@ -93,7 +96,11 @@ func (d *operationDispatcherImpl) clusterOperation(clusterName, op string, opFun client, err := d.clientAccessor(clusterName) if err != nil { wrappedErr := errors.Wrapf(err, "Error retrieving client for cluster") - d.recorder.RecordError(clusterName, op, wrappedErr) + if d.recorder == nil { + runtime.HandleError(wrappedErr) + } else { + d.recorder.RecordError(clusterName, op, wrappedErr) + } d.resultChan <- util.StatusError return } diff --git a/pkg/controller/sync/dispatch/unmanaged.go b/pkg/controller/sync/dispatch/unmanaged.go new file mode 100644 index 0000000000..fb3631b945 --- /dev/null +++ b/pkg/controller/sync/dispatch/unmanaged.go @@ -0,0 +1,129 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dispatch + +import ( + "github.com/golang/glog" + "github.com/pkg/errors" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/kubernetes-sigs/federation-v2/pkg/controller/util" +) + +const eventTemplate = "%s %s %q in cluster %q" + +// UnmanagedDispatcher dispatches operations to member clusters for +// resources that are no longer managed by a federated resource. +type UnmanagedDispatcher interface { + OperationDispatcher + + Delete(clusterName string) + RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured) +} + +type unmanagedDispatcherImpl struct { + dispatcher *operationDispatcherImpl + + targetName util.QualifiedName + targetKind string + + recorder DispatchRecorder +} + +func NewUnmanagedDispatcher(clientAccessor clientAccessorFunc, targetKind string, targetName util.QualifiedName) UnmanagedDispatcher { + dispatcher := newOperationDispatcher(clientAccessor, nil) + return newUnmanagedDispatcher(dispatcher, nil, targetKind, targetName) +} + +func newUnmanagedDispatcher(dispatcher *operationDispatcherImpl, recorder DispatchRecorder, targetKind string, targetName util.QualifiedName) *unmanagedDispatcherImpl { + return &unmanagedDispatcherImpl{ + dispatcher: dispatcher, + targetName: targetName, + targetKind: targetKind, + recorder: recorder, + } +} + +func (d *unmanagedDispatcherImpl) Wait() (bool, error) { + return d.dispatcher.Wait() +} + +func (d *unmanagedDispatcherImpl) Delete(clusterName string) { + d.dispatcher.incrementOperationsInitiated() + const op = "delete" + const opContinuous = "Deleting" + go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus { + if d.recorder == nil { + glog.V(2).Infof(eventTemplate, opContinuous, d.targetKind, d.targetName, clusterName) + } else { + d.recorder.RecordEvent(clusterName, op, opContinuous) + } + + err := client.Resources(d.targetName.Namespace).Delete(d.targetName.Name, &metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + err = nil + } + if err != nil { + if d.recorder == nil { + wrappedErr := d.wrapOperationError(err, clusterName, op) + runtime.HandleError(wrappedErr) + } else { + d.recorder.RecordError(clusterName, op, err) + } + return util.StatusError + } + return util.StatusAllOK + }) +} + +func (d *unmanagedDispatcherImpl) RemoveManagedLabel(clusterName string, clusterObj *unstructured.Unstructured) { + d.dispatcher.incrementOperationsInitiated() + const op = "remove managed label from" + const opContinuous = "Removing managed label from" + go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus { + if d.recorder == nil { + glog.V(2).Infof(eventTemplate, opContinuous, d.targetKind, d.targetName, clusterName) + } else { + d.recorder.RecordEvent(clusterName, op, opContinuous) + } + + // Avoid mutating the resource in the informer cache + updateObj := clusterObj.DeepCopy() + + util.RemoveManagedLabel(updateObj) + + _, err := client.Resources(updateObj.GetNamespace()).Update(updateObj, metav1.UpdateOptions{}) + if err != nil { + if d.recorder == nil { + wrappedErr := d.wrapOperationError(err, clusterName, op) + runtime.HandleError(wrappedErr) + } else { + d.recorder.RecordError(clusterName, op, err) + } + return util.StatusError + } + return util.StatusAllOK + }) +} + +func (d *unmanagedDispatcherImpl) wrapOperationError(err error, clusterName, operation string) error { + return errors.Wrapf(err, "Failed to "+eventTemplate, operation, d.targetKind, d.targetName, clusterName) +}