From b8da86fd50189fb68671fa8c9c342be74a0ef74c Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 26 Jul 2024 16:16:06 -0400 Subject: [PATCH] (fix) List catalogsource using client, instead of referring to cache Using the information in the resolver cache to list the available catalogsources leads to the very common and widely known problem of using caches: invalid data due to a stale cache. This has showed up multiple times in production environments over the years, manifesting itself in the form of the all subscriptions in a namespace being transitioned into an error state when a Catalogsource that the cache claims to exist, has actually been deleted from the cluster, but the cache was not updated. The Subscriptions are transitioned to an error state because of the deleted catalogsource with the follwing error message: "message": "failed to populate resolver cache from source : failed to list bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing dial tcp: lookup ..svc on 172.....: no such host\"", "reason": "ErrorPreventedResolution", "status": "True", "type": "ResolutionFailed" This PR switches the information lookup from the cache, to using a client to list the CatalogSources present in the cluster. --- pkg/controller/operators/catalog/operator.go | 2 +- .../registry/resolver/source_registry.go | 58 +++++++++++++++---- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 7f8f2c4809..1f637cbe37 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo clientFactory: clients.NewFactory(validatingConfig), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) - op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger) + op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger) resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister()) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage) res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger) diff --git a/pkg/controller/registry/resolver/source_registry.go b/pkg/controller/registry/resolver/source_registry.go index bbfef6c542..ec97972287 100644 --- a/pkg/controller/registry/resolver/source_registry.go +++ b/pkg/controller/registry/resolver/source_registry.go @@ -8,12 +8,14 @@ import ( "time" "github.com/blang/semver/v4" + v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/client" opregistry "github.com/operator-framework/operator-registry/pkg/registry" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" ) // todo: move to pkg/controller/operators/catalog @@ -65,15 +67,17 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{} } type RegistrySourceProvider struct { - rcp RegistryClientProvider - logger logrus.StdLogger - invalidator *sourceInvalidator + rcp RegistryClientProvider + catsrcLister v1alpha1listers.CatalogSourceLister + logger logrus.StdLogger + invalidator *sourceInvalidator } -func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider { +func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider { return &RegistrySourceProvider{ - rcp: rcp, - logger: logger, + rcp: rcp, + logger: logger, + catsrcLister: catsrcLister, invalidator: &sourceInvalidator{ validChans: make(map[cache.SourceKey]chan struct{}), ttl: 5 * time.Minute, @@ -81,15 +85,45 @@ func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger } } +type errorSource struct { + error +} + +func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) { + return nil, s.error +} + func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source { result := make(map[cache.SourceKey]cache.Source) - for key, client := range a.rcp.ClientsForNamespaces(namespaces...) { - result[cache.SourceKey(key)] = ®istrySource{ - key: cache.SourceKey(key), - client: client, - logger: a.logger, - invalidator: a.invalidator, + + cats, err := a.catsrcLister.List(labels.Everything()) + if err != nil { + for _, ns := range namespaces { + result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{ + error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err), + } } + return result + } + + clients := a.rcp.ClientsForNamespaces(namespaces...) + for _, cat := range cats { + key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace} + if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok { + result[key] = ®istrySource{ + key: key, + client: client, + logger: a.logger, + invalidator: a.invalidator, + } + } else { + result[key] = errorSource{ + error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name), + } + } + } + if len(result) == 0 { + return nil } return result }