Skip to content

Commit

Permalink
OCPBUGS-17157: *: filter informers when preconditions are met (#3021)
Browse files Browse the repository at this point in the history
* *: filter informers when preconditions are met

When we can detect at startup time that all of the objects we're about
to look at have the labels we're expecting, we can filter our informer
factories upfront.

Signed-off-by: Steve Kuznetsov <[email protected]>

* test/e2e: improvements

Signed-off-by: Steve Kuznetsov <[email protected]>

---------

Signed-off-by: Steve Kuznetsov <[email protected]>
  • Loading branch information
stevekuznetsov authored Sep 6, 2023
1 parent c55c24d commit c0c61fe
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 51 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
golang.org/x/net v0.10.0
golang.org/x/sync v0.2.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.54.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -208,7 +209,6 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
78 changes: 39 additions & 39 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

canFilter, err := labeller.Validate(ctx, logger, metadataClient)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand Down Expand Up @@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Wire k8s sharedIndexInformers
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
sharedIndexInformers := []cache.SharedIndexInformer{}

// Wire Roles
Expand All @@ -372,6 +383,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
Expand All @@ -392,8 +406,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -407,8 +422,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -422,10 +438,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -439,8 +454,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -463,11 +479,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -500,19 +514,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return false
}
return labeller.HasOLMOwnerRef(cm)
}
}
return false
},
jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs")
if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
}),
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -585,15 +591,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
112 changes: 112 additions & 0 deletions pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package labeller

import (
"context"
"fmt"
"strings"
"sync"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
)

func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool {
if f, ok := filters[gvr]; ok {
return f
}
return func(object metav1.Object) bool {
return false
}
}

func JobFilter(getConfigMap func(namespace, name string) (metav1.Object, error)) func(object metav1.Object) bool {
return func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := getConfigMap(object.GetNamespace(), ownerRef.Name)
if err != nil {
return false
}
return HasOLMOwnerRef(cm)
}
}
return false
}
}

var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef,
corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool {
return HasOLMOwnerRef(object) || HasOLMLabel(object)
},
appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): HasOLMOwnerRef,
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
}

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
okLock := sync.Mutex{}
var ok bool
g, ctx := errgroup.WithContext(ctx)
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
for gvr, filter := range filters {
allFilters[gvr] = filter
}
allFilters[batchv1.SchemeGroupVersion.WithResource("jobs")] = JobFilter(func(namespace, name string) (metav1.Object, error) {
return metadataClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
})
for gvr, filter := range allFilters {
gvr, filter := gvr, filter
g.Go(func() error {
list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list %s: %w", gvr.String(), err)
}
var count int
for _, item := range list.Items {
if filter(&item) && !hasLabel(&item) {
count++
}
}
if count > 0 {
logger.WithFields(logrus.Fields{
"gvr": gvr.String(),
"nonconforming": count,
}).Info("found nonconforming items")
}
okLock.Lock()
ok = ok && count == 0
okLock.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return false, err
}
return ok, nil
}
5 changes: 2 additions & 3 deletions pkg/controller/operators/labeller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import (
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -22,6 +19,8 @@ import (

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/decorators"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
)

type ApplyConfig[T any] interface {
Expand Down
Loading

0 comments on commit c0c61fe

Please sign in to comment.