Skip to content

Commit

Permalink
Merge pull request kubernetes#7661 from omerap12/fix-vpa-stopchannel-…
Browse files Browse the repository at this point in the history
…propagation

fix: Propagate stop channel to prevent resource leaks in VPA components
  • Loading branch information
k8s-ci-robot authored Jan 8, 2025
2 parents 415a60f + 93d1e33 commit 72206c1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
10 changes: 6 additions & 4 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func watchEvictionEvents(evictedEventChan <-chan watch.Event, observer oom.Obser
}

// Creates clients watching pods: PodLister (listing only not terminated pods).
func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler, namespace string) v1lister.PodLister {
func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.ResourceEventHandler, namespace string, stopCh <-chan struct{}) v1lister.PodLister {
// We are interested in pods which are Running or Unknown (in case the pod is
// running but there are some transient errors we don't want to delete it from
// our model).
Expand All @@ -170,15 +170,17 @@ func newPodClients(kubeClient kube_client.Interface, resourceEventHandler cache.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podLister := v1lister.NewPodLister(indexer)
stopCh := make(chan struct{})
go controller.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, controller.HasSynced) {
klog.ErrorS(nil, "Failed to sync Pod cache during initialization")
}
return podLister
}

// NewPodListerAndOOMObserver creates pair of pod lister and OOM observer.
func NewPodListerAndOOMObserver(kubeClient kube_client.Interface, namespace string) (v1lister.PodLister, oom.Observer) {
func NewPodListerAndOOMObserver(kubeClient kube_client.Interface, namespace string, stopCh <-chan struct{}) (v1lister.PodLister, oom.Observer) {
oomObserver := oom.NewObserver()
podLister := newPodClients(kubeClient, oomObserver, namespace)
podLister := newPodClients(kubeClient, oomObserver, namespace, stopCh)
WatchEvictionEventsWithRetries(kubeClient, oomObserver, namespace)
return podLister, oomObserver
}
Expand Down
5 changes: 4 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,15 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
}

func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
// Create a stop channel that will be used to signal shutdown
stopCh := make(chan struct{})
defer close(stopCh)
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.IgnoredVpaObjectNamespaces))
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
podLister, oomObserver := input.NewPodListerAndOOMObserver(kubeClient, commonFlag.IgnoredVpaObjectNamespaces)
podLister, oomObserver := input.NewPodListerAndOOMObserver(kubeClient, commonFlag.IgnoredVpaObjectNamespaces, stopCh)

model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))

Expand Down
2 changes: 1 addition & 1 deletion vertical-pod-autoscaler/pkg/utils/vpa/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
vpaLister := vpa_lister.NewVerticalPodAutoscalerLister(indexer)
go controller.Run(stopChannel)
if !cache.WaitForCacheSync(make(chan struct{}), controller.HasSynced) {
if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) {
klog.ErrorS(nil, "Failed to sync VPA cache during initialization")
os.Exit(255)
} else {
Expand Down

0 comments on commit 72206c1

Please sign in to comment.