diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 5fef7df5deb..d41ac0db4a7 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -152,7 +152,7 @@ func watchEvictionEvents(evictedEventChan <-chan watch.Event, observer oom.Obser } // Creates clients watching pods: PodLister (listing only not terminated pods). -func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler, namespace string) v1lister.PodLister { +func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler, namespace string, stopCh <-chan struct{}) v1lister.PodLister { // We are interested in pods which are Running or Unknown (in case the pod is // running but there are some transient errors we don't want to delete it from // our model). @@ -170,15 +170,17 @@ func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podLister := v1lister.NewPodLister(indexer) - stopCh := make(chan struct{}) go controller.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, controller.HasSynced) { + klog.ErrorS(nil, "Failed to sync Pod cache during initialization") + } return podLister } // NewPodListerAndOOMObserver creates pair of pod lister and OOM observer. -func NewPodListerAndOOMObserver(kubeClient kube_client.Interface, namespace string) (v1lister.PodLister, oom.Observer) { +func NewPodListerAndOOMObserver(kubeClient kube_client.Interface, namespace string, stopCh <-chan struct{}) (v1lister.PodLister, oom.Observer) { oomObserver := oom.NewObserver() - podLister := newPodClients(kubeClient, oomObserver, namespace) + podLister := newPodClients(kubeClient, oomObserver, namespace, stopCh) WatchEvictionEventsWithRetries(kubeClient, oomObserver, namespace) return podLister, oomObserver } diff --git a/vertical-pod-autoscaler/pkg/recommender/main.go b/vertical-pod-autoscaler/pkg/recommender/main.go index d59685fd3ae..badca2fc634 100644 --- a/vertical-pod-autoscaler/pkg/recommender/main.go +++ b/vertical-pod-autoscaler/pkg/recommender/main.go @@ -202,12 +202,15 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf } func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { + // Create a stop channel that will be used to signal shutdown + stopCh := make(chan struct{}) + defer close(stopCh) config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst)) kubeClient := kube_client.NewForConfigOrDie(config) clusterState := model.NewClusterState(aggregateContainerStateGCInterval) factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.IgnoredVpaObjectNamespaces)) controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) - podLister, oomObserver := input.NewPodListerAndOOMObserver(kubeClient, commonFlag.IgnoredVpaObjectNamespaces) + podLister, oomObserver := input.NewPodListerAndOOMObserver(kubeClient, commonFlag.IgnoredVpaObjectNamespaces, stopCh) model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp)) diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 949cdc67402..e6e91080104 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -90,7 +90,7 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) vpaLister := vpa_lister.NewVerticalPodAutoscalerLister(indexer) go controller.Run(stopChannel) - if !cache.WaitForCacheSync(make(chan struct{}), controller.HasSynced) { + if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) { klog.ErrorS(nil, "Failed to sync VPA cache during initialization") os.Exit(255) } else {