Skip to content

Commit

Permalink
fix: Initialize worker to obtain cluster version (#1718)
Browse files Browse the repository at this point in the history
Co-authored-by: 曲源成 <[email protected]>
  • Loading branch information
quyuancheng and 曲源成 authored Jul 24, 2023
1 parent c1b43e5 commit 63f4f0d
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 71 deletions.
3 changes: 2 additions & 1 deletion cmd/worker/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func Run(s *option.Worker) error {

//step 4: create component resource store
updateCh := channels.NewRingChannel(1024)
cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config)
k8sVersion := k8sutil.GetKubeVersion()
cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config, k8sVersion)
if err := cachestore.Start(); err != nil {
logrus.Error("start kube cache store error", err)
return err
Expand Down
36 changes: 15 additions & 21 deletions worker/appm/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ package controller
import (
"context"
"fmt"
k8sutil "github.com/goodrain/rainbond/util/k8s"
utilversion "k8s.io/apimachinery/pkg/util/version"
"sync"
"time"

Expand Down Expand Up @@ -200,32 +198,28 @@ func (s *startController) startOne(app v1.AppService) error {
}

//step 6: create hpa
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
if hpas := app.GetHPAs(); len(hpas) != 0 {
for _, hpa := range hpas {
if len(hpa.ResourceVersion) == 0 {
_, err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
logrus.Debugf("hpa: %#v", hpa)
return fmt.Errorf("create hpa: %v", err)
}
if hpas := app.GetHPAs(); len(hpas) != 0 {
for _, hpa := range hpas {
if len(hpa.ResourceVersion) == 0 {
_, err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
logrus.Debugf("hpa: %#v", hpa)
return fmt.Errorf("create hpa: %v", err)
}
}
}
} else {
if hpas := app.GetHPABeta2s(); len(hpas) != 0 {
for _, hpa := range hpas {
if len(hpa.ResourceVersion) == 0 {
_, err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
logrus.Debugf("hpa: %#v", hpa)
return fmt.Errorf("create hpa: %v", err)
}
}
if hpas := app.GetHPABeta2s(); len(hpas) != 0 {
for _, hpa := range hpas {
if len(hpa.ResourceVersion) == 0 {
_, err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Create(s.ctx, hpa, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
logrus.Debugf("hpa: %#v", hpa)
return fmt.Errorf("create hpa: %v", err)
}
}
}
}

//step 7: create CR resource
if crd, _ := s.manager.store.GetCrd(store.ServiceMonitor); crd != nil {
if sms := app.GetServiceMonitors(true); len(sms) > 0 {
Expand Down
28 changes: 11 additions & 17 deletions worker/appm/controller/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
"context"
"fmt"
"github.com/goodrain/rainbond/db"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/jinzhu/gorm"
utilversion "k8s.io/apimachinery/pkg/util/version"
"sync"
"time"

Expand Down Expand Up @@ -208,26 +206,22 @@ func (s *stopController) stopOne(app v1.AppService) error {
}
}
//step 7: deleta all hpa
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
if hpas := app.GetHPAs(); len(hpas) != 0 {
for _, hpa := range hpas {
err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("delete hpa: %v", err)
}
if hpas := app.GetHPAs(); len(hpas) != 0 {
for _, hpa := range hpas {
err := s.manager.client.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("delete hpa: %v", err)
}
}
} else {
if hpas := app.GetHPABeta2s(); len(hpas) != 0 {
for _, hpa := range hpas {
err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("delete hpa: %v", err)
}
}
if hpas := app.GetHPABeta2s(); len(hpas) != 0 {
for _, hpa := range hpas {
err := s.manager.client.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(s.ctx, hpa.GetName(), metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("delete hpa: %v", err)
}
}
}

//step 8: delete CR resource
if crd, _ := s.manager.store.GetCrd(store.ServiceMonitor); crd != nil {
if sms := app.GetServiceMonitors(true); len(sms) > 0 {
Expand Down
64 changes: 32 additions & 32 deletions worker/appm/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type appRuntimeStore struct {
volumeTypeListeners map[string]chan<- *model.TenantServiceVolumeType
volumeTypeListenerLock sync.Mutex
resourceCache *ResourceCache
k8sVersion *utilversion.Version
}

//NewStore new app runtime store
Expand All @@ -167,7 +168,8 @@ func NewStore(
clientset kubernetes.Interface,
rainbondClient rainbondversioned.Interface,
dbmanager db.Manager,
conf option.Config) Storer {
conf option.Config,
k8sVersion *utilversion.Version) Storer {
ctx, cancel := context.WithCancel(context.Background())
store := &appRuntimeStore{
kubeconfig: kubeconfig,
Expand All @@ -184,6 +186,7 @@ func NewStore(
resourceCache: NewResourceCache(),
podUpdateListeners: make(map[string]chan<- *corev1.Pod, 1),
volumeTypeListeners: make(map[string]chan<- *model.TenantServiceVolumeType, 1),
k8sVersion: k8sVersion,
}
crdClient, err := internalclientset.NewForConfig(kubeconfig)
if err != nil {
Expand Down Expand Up @@ -244,7 +247,7 @@ func NewStore(

store.informers.Events = infFactory.Core().V1().Events().Informer()

if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
if k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
store.informers.HorizontalPodAutoscaler = infFactory.Autoscaling().V2().HorizontalPodAutoscalers().Informer()
store.listers.HorizontalPodAutoscaler = infFactory.Autoscaling().V2().HorizontalPodAutoscalers().Lister()
} else {
Expand All @@ -264,7 +267,7 @@ func NewStore(
store.informers.ComponentDefinition.AddEventHandlerWithResyncPeriod(componentdefinition.GetComponentDefinitionBuilder(), time.Second*300)
store.informers.Job = infFactory.Batch().V1().Jobs().Informer()
store.listers.Job = infFactory.Batch().V1().Jobs().Lister()
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.21.0")) {
if k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.21.0")) {
store.informers.CronJob = infFactory.Batch().V1().CronJobs().Informer()
store.listers.CronJob = infFactory.Batch().V1().CronJobs().Lister()
} else {
Expand Down Expand Up @@ -677,37 +680,34 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
}
}
}
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok {
serviceID := hpa.Labels["service_id"]
version := hpa.Labels["version"]
createrID := hpa.Labels["creater_id"]
if serviceID != "" && version != "" && createrID != "" {
appservice, err := a.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
a.conf.KubeClient.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{})
}
if appservice != nil {
appservice.SetHPA(hpa)
}
return
if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok {
serviceID := hpa.Labels["service_id"]
version := hpa.Labels["version"]
createrID := hpa.Labels["creater_id"]
if serviceID != "" && version != "" && createrID != "" {
appservice, err := a.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
a.conf.KubeClient.AutoscalingV2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{})
}
if appservice != nil {
appservice.SetHPA(hpa)
}
return
}
} else {
if hpa, ok := obj.(*autoscalingv2beta2.HorizontalPodAutoscaler); ok {
serviceID := hpa.Labels["service_id"]
version := hpa.Labels["version"]
createrID := hpa.Labels["creater_id"]
if serviceID != "" && version != "" && createrID != "" {
appservice, err := a.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
a.conf.KubeClient.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{})
}
if appservice != nil {
appservice.SetHPAbeta2(hpa)
}
return
}
if hpa, ok := obj.(*autoscalingv2beta2.HorizontalPodAutoscaler); ok {
serviceID := hpa.Labels["service_id"]
version := hpa.Labels["version"]
createrID := hpa.Labels["creater_id"]
if serviceID != "" && version != "" && createrID != "" {
appservice, err := a.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
a.conf.KubeClient.AutoscalingV2beta2().HorizontalPodAutoscalers(hpa.GetNamespace()).Delete(context.Background(), hpa.GetName(), metav1.DeleteOptions{})
}
if appservice != nil {
appservice.SetHPAbeta2(hpa)
}
return
}
}

Expand Down Expand Up @@ -1793,7 +1793,7 @@ func (a *appRuntimeStore) scalingRecordServiceAndRuleID(evt *corev1.Event) (stri
serviceID = statefulset.GetLabels()["service_id"]
ruleID = statefulset.GetLabels()["rule_id"]
case "HorizontalPodAutoscaler":
if k8sutil.GetKubeVersion().AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
if a.k8sVersion.AtLeast(utilversion.MustParseSemantic("v1.23.0")) {
hpa, err := a.listers.HorizontalPodAutoscaler.HorizontalPodAutoscalers(evt.InvolvedObject.Namespace).Get(evt.InvolvedObject.Name)
if err != nil {
logrus.Warningf("retrieve statefulset: %v", err)
Expand Down

0 comments on commit 63f4f0d

Please sign in to comment.