Skip to content

Commit

Permalink
(fix) List catalogsource using client, instead of referring to cache
Browse files Browse the repository at this point in the history
Using the information in the resolver cache to list the available catalogsources
leads to the very common and widely known  problem of using caches: invalid data
due to a stale cache.

This has showed up multiple times in production environments over the years, manifesting
itself in the form of the all subscriptions in a namespace being transitioned into an error
state when a Catalogsource that the cache claims to exist, has actually been deleted from the
cluster, but the cache was not updated.

The Subscriptions are transitioned to an error state because of the deleted catalogsource
with the follwing error message:

"message": "failed to populate resolver cache from source <deleted-catalogsource>: failed to list
bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport:
Error while dialing dial tcp: lookup <deleted-catalogsource>.<ns>.svc on 172.....: no such host\"",
                "reason": "ErrorPreventedResolution",
                "status": "True",
                "type": "ResolutionFailed"

This PR switches the information lookup from the cache, to using a client to list the CatalogSources
present in the cluster.
  • Loading branch information
anik120 committed Jul 29, 2024
1 parent 8089266 commit b8da86f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
Expand Down
58 changes: 46 additions & 12 deletions pkg/controller/registry/resolver/source_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"time"

"github.com/blang/semver/v4"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

// todo: move to pkg/controller/operators/catalog
Expand Down Expand Up @@ -65,31 +67,63 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
}

type RegistrySourceProvider struct {
rcp RegistryClientProvider
logger logrus.StdLogger
invalidator *sourceInvalidator
rcp RegistryClientProvider
catsrcLister v1alpha1listers.CatalogSourceLister
logger logrus.StdLogger
invalidator *sourceInvalidator
}

func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
return &RegistrySourceProvider{
rcp: rcp,
logger: logger,
rcp: rcp,
logger: logger,
catsrcLister: catsrcLister,
invalidator: &sourceInvalidator{
validChans: make(map[cache.SourceKey]chan struct{}),
ttl: 5 * time.Minute,
},
}
}

type errorSource struct {
error
}

func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
return nil, s.error
}

func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
result := make(map[cache.SourceKey]cache.Source)
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
result[cache.SourceKey(key)] = &registrySource{
key: cache.SourceKey(key),
client: client,
logger: a.logger,
invalidator: a.invalidator,

cats, err := a.catsrcLister.List(labels.Everything())
if err != nil {
for _, ns := range namespaces {
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
}
}
return result
}

clients := a.rcp.ClientsForNamespaces(namespaces...)
for _, cat := range cats {
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
result[key] = &registrySource{
key: key,
client: client,
logger: a.logger,
invalidator: a.invalidator,
}
} else {
result[key] = errorSource{
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
}
}
}
if len(result) == 0 {
return nil
}
return result
}
Expand Down

0 comments on commit b8da86f

Please sign in to comment.