From 63f4f0db6d37b7cbc371a9ef440f17a519c2f456 Mon Sep 17 00:00:00 2001 From: Quyc <94251049@qq.com> Date: Mon, 24 Jul 2023 22:51:15 +0800 Subject: [PATCH] fix: Initialize worker to obtain cluster version (#1718) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 曲源成 --- cmd/worker/server/server.go | 3 +- worker/appm/controller/start.go | 36 ++++++++----------- worker/appm/controller/stop.go | 28 ++++++--------- worker/appm/store/store.go | 64 ++++++++++++++++----------------- 4 files changed, 60 insertions(+), 71 deletions(-) diff --git a/cmd/worker/server/server.go b/cmd/worker/server/server.go index fafb2c9f2..5affa1341 100644 --- a/cmd/worker/server/server.go +++ b/cmd/worker/server/server.go @@ -105,7 +105,8 @@ func Run(s *option.Worker) error { //step 4: create component resource store updateCh := channels.NewRingChannel(1024) - cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config) + k8sVersion := k8sutil.GetKubeVersion() + cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config, k8sVersion) if err := cachestore.Start(); err != nil { logrus.Error("start kube cache store error", err) return err diff --git a/worker/appm/controller/start.go b/worker/appm/controller/start.go index a35a20961..0f9e71a14 100644 --- a/worker/appm/controller/start.go +++ b/worker/appm/controller/start.go @@ -21,8 +21,6 @@ package controller import ( "context" "fmt" - k8sutil "github.com/goodrain/rainbond/util/k8s" - utilversion "k8s.io/apimachinery/pkg/util/version" "sync" "time" @@ -200,32 +198,28 @@ func (s *startController) startOne(app v1.AppService) error { } //step 6: create hpa - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) { - if hpas := app.GetHPAs(); len(hpas) != 0 { - for _, hpa := range hpas { - if len(hpa.ResourceVersion) == 0 { - _, err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{}) - if err != nil && !errors.IsAlreadyExists(err) { - logrus.Debugf("hpa: %#v", hpa) - return fmt.Errorf("create hpa: %v", err) - } + if hpas := app.GetHPAs(); len(hpas) != 0 { + for _, hpa := range hpas { + if len(hpa.ResourceVersion) == 0 { + _, err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{}) + if err != nil && !errors.IsAlreadyExists(err) { + logrus.Debugf("hpa: %#v", hpa) + return fmt.Errorf("create hpa: %v", err) } } } - } else { - if hpas := app.GetHPABeta2s(); len(hpas) != 0 { - for _, hpa := range hpas { - if len(hpa.ResourceVersion) == 0 { - _, err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{}) - if err != nil && !errors.IsAlreadyExists(err) { - logrus.Debugf("hpa: %#v", hpa) - return fmt.Errorf("create hpa: %v", err) - } + } + if hpas := app.GetHPABeta2s(); len(hpas) != 0 { + for _, hpa := range hpas { + if len(hpa.ResourceVersion) == 0 { + _, err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{}) + if err != nil && !errors.IsAlreadyExists(err) { + logrus.Debugf("hpa: %#v", hpa) + return fmt.Errorf("create hpa: %v", err) } } } } - //step 7: create CR resource if crd, _ := s.manager.store.GetCrd(store.ServiceMonitor); crd != nil { if sms := app.GetServiceMonitors(true); len(sms) > 0 { diff --git a/worker/appm/controller/stop.go b/worker/appm/controller/stop.go index 4f7c73cc8..40891b894 100644 --- a/worker/appm/controller/stop.go +++ b/worker/appm/controller/stop.go @@ -22,9 +22,7 @@ import ( "context" "fmt" "github.com/goodrain/rainbond/db" - k8sutil "github.com/goodrain/rainbond/util/k8s" "github.com/jinzhu/gorm" - utilversion "k8s.io/apimachinery/pkg/util/version" "sync" "time" @@ -208,26 +206,22 @@ func (s *stopController) stopOne(app v1.AppService) error { } } //step 7: deleta all hpa - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) { - if hpas := app.GetHPAs(); len(hpas) != 0 { - for _, hpa := range hpas { - err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("delete hpa: %v", err) - } + if hpas := app.GetHPAs(); len(hpas) != 0 { + for _, hpa := range hpas { + err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("delete hpa: %v", err) } } - } else { - if hpas := app.GetHPABeta2s(); len(hpas) != 0 { - for _, hpa := range hpas { - err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("delete hpa: %v", err) - } + } + if hpas := app.GetHPABeta2s(); len(hpas) != 0 { + for _, hpa := range hpas { + err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("delete hpa: %v", err) } } } - //step 8: delete CR resource if crd, _ := s.manager.store.GetCrd(store.ServiceMonitor); crd != nil { if sms := app.GetServiceMonitors(true); len(sms) > 0 { diff --git a/worker/appm/store/store.go b/worker/appm/store/store.go index 5b3dd75f1..43666b5cd 100644 --- a/worker/appm/store/store.go +++ b/worker/appm/store/store.go @@ -159,6 +159,7 @@ type appRuntimeStore struct { volumeTypeListeners map[string]chan<- *model.TenantServiceVolumeType volumeTypeListenerLock sync.Mutex resourceCache *ResourceCache + k8sVersion *utilversion.Version } //NewStore new app runtime store @@ -167,7 +168,8 @@ func NewStore( clientset kubernetes.Interface, rainbondClient rainbondversioned.Interface, dbmanager db.Manager, - conf option.Config) Storer { + conf option.Config, + k8sVersion *utilversion.Version) Storer { ctx, cancel := context.WithCancel(context.Background()) store := &appRuntimeStore{ kubeconfig: kubeconfig, @@ -184,6 +186,7 @@ func NewStore( resourceCache: NewResourceCache(), podUpdateListeners: make(map[string]chan<- *corev1.Pod, 1), volumeTypeListeners: make(map[string]chan<- *model.TenantServiceVolumeType, 1), + k8sVersion: k8sVersion, } crdClient, err := internalclientset.NewForConfig(kubeconfig) if err != nil { @@ -244,7 +247,7 @@ func NewStore( store.informers.Events = infFactory.Core().V1().Events().Informer() - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) { + if k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.23.0")) { store.informers.HorizontalPodAutoscaler = infFactory.Autoscaling().V2().HorizontalPodAutoscalers().Informer() store.listers.HorizontalPodAutoscaler = infFactory.Autoscaling().V2().HorizontalPodAutoscalers().Lister() } else { @@ -264,7 +267,7 @@ func NewStore( store.informers.ComponentDefinition.AddEventHandlerWithResyncPeriod(componentdefinition.GetComponentDefinitionBuilder(), time.Second*300) store.informers.Job = infFactory.Batch().V1().Jobs().Informer() store.listers.Job = infFactory.Batch().V1().Jobs().Lister() - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.21.0")) { + if k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.21.0")) { store.informers.CronJob = infFactory.Batch().V1().CronJobs().Informer() store.listers.CronJob = infFactory.Batch().V1().CronJobs().Lister() } else { @@ -677,37 +680,34 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { } } } - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) { - if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok { - serviceID := hpa.Labels["service_id"] - version := hpa.Labels["version"] - createrID := hpa.Labels["creater_id"] - if serviceID != "" && version != "" && createrID != "" { - appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrServiceNotFound { - a.conf.KubeClient.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{}) - } - if appservice != nil { - appservice.SetHPA(hpa) - } - return + if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok { + serviceID := hpa.Labels["service_id"] + version := hpa.Labels["version"] + createrID := hpa.Labels["creater_id"] + if serviceID != "" && version != "" && createrID != "" { + appservice, err := a.getAppService(serviceID, version, createrID, true) + if err == conversion.ErrServiceNotFound { + a.conf.KubeClient.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{}) + } + if appservice != nil { + appservice.SetHPA(hpa) } + return } - } else { - if hpa, ok := obj.(*autoscalingv2beta2.HorizontalPodAutoscaler); ok { - serviceID := hpa.Labels["service_id"] - version := hpa.Labels["version"] - createrID := hpa.Labels["creater_id"] - if serviceID != "" && version != "" && createrID != "" { - appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrServiceNotFound { - a.conf.KubeClient.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{}) - } - if appservice != nil { - appservice.SetHPAbeta2(hpa) - } - return + } + if hpa, ok := obj.(*autoscalingv2beta2.HorizontalPodAutoscaler); ok { + serviceID := hpa.Labels["service_id"] + version := hpa.Labels["version"] + createrID := hpa.Labels["creater_id"] + if serviceID != "" && version != "" && createrID != "" { + appservice, err := a.getAppService(serviceID, version, createrID, true) + if err == conversion.ErrServiceNotFound { + a.conf.KubeClient.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{}) + } + if appservice != nil { + appservice.SetHPAbeta2(hpa) } + return } } @@ -1793,7 +1793,7 @@ func (a *appRuntimeStore) scalingRecordServiceAndRuleID(evt *corev1.Event) (stri serviceID = statefulset.GetLabels()["service_id"] ruleID = statefulset.GetLabels()["rule_id"] case "HorizontalPodAutoscaler": - if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) { + if a.k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.23.0")) { hpa, err := a.listers.HorizontalPodAutoscaler.HorizontalPodAutoscalers(evt.InvolvedObject.Namespace).Get(evt.InvolvedObject.Name) if err != nil { logrus.Warningf("retrieve statefulset: %v", err)