From 8123496042d56155cc2d519fce707579a2a66ac4 Mon Sep 17 00:00:00 2001 From: Poorunga <2744323@qq.com> Date: Sat, 25 Nov 2023 20:38:12 +0800 Subject: [PATCH] fix gateway serviceMap numbers sync error Signed-off-by: Poorunga <2744323@qq.com> --- go.mod | 2 +- pkg/loadbalancer/loadbalancer.go | 387 +++++++++++++++++++++++++------ 2 files changed, 311 insertions(+), 78 deletions(-) diff --git a/go.mod b/go.mod index 6bda5ea39..cd7aea457 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( k8s.io/api v0.23.1 k8s.io/apimachinery v0.23.1 k8s.io/client-go v0.23.1 + k8s.io/cloud-provider v0.23.0 k8s.io/component-base v0.23.0 k8s.io/klog/v2 v2.40.1 k8s.io/kubernetes v1.23.0 @@ -184,7 +185,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect istio.io/gogo-genproto v0.0.0-20210113155706-4daf5697332f // indirect k8s.io/apiserver v0.23.0 // indirect - k8s.io/cloud-provider v0.23.0 // indirect k8s.io/component-helpers v0.23.0 // indirect k8s.io/klog v1.0.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index 6400e996a..723e8dbfb 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -4,8 +4,10 @@ import ( "fmt" "net" "net/http" + "reflect" "strconv" "sync" + "sync/atomic" "time" istioapi "istio.io/client-go/pkg/apis/networking/v1alpha3" @@ -18,10 +20,12 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + servicehelper "k8s.io/cloud-provider/service/helpers" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/userspace" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" netutils "k8s.io/utils/net" stringslices "k8s.io/utils/strings/slices" @@ -31,14 +35,59 @@ import ( netutil "github.com/kubeedge/edgemesh/pkg/util/net" ) -type ServiceObject struct { - ip net.IP - port int +type portal struct { + ip net.IP + port int + isExternal bool +} + +// ServiceInfo contains information and state for a particular proxied service +type ServiceInfo struct { + isAliveAtomic int32 // Only access this with atomic ops + portal portal protocol v1.Protocol + proxyPort int nodePort int loadBalancerStatus v1.LoadBalancerStatus - serviceAffinityType v1.ServiceAffinity + sessionAffinityType v1.ServiceAffinity stickyMaxAgeSeconds int + // Deprecated, but required for back-compat (including e2e) + externalIPs []string + + // isStartedAtomic is set to non-zero when the service's socket begins + // accepting requests. Used in testcases. Only access this with atomic ops. + isStartedAtomic int32 + // isFinishedAtomic is set to non-zero when the service's socket shuts + // down. Used in testcases. Only access this with atomic ops. + isFinishedAtomic int32 +} + +func (info *ServiceInfo) setStarted() { + atomic.StoreInt32(&info.isStartedAtomic, 1) +} + +func (info *ServiceInfo) IsStarted() bool { + return atomic.LoadInt32(&info.isStartedAtomic) != 0 +} + +func (info *ServiceInfo) setFinished() { + atomic.StoreInt32(&info.isFinishedAtomic, 1) +} + +func (info *ServiceInfo) IsFinished() bool { + return atomic.LoadInt32(&info.isFinishedAtomic) != 0 +} + +func (info *ServiceInfo) setAlive(b bool) { + var i int32 + if b { + i = 1 + } + atomic.StoreInt32(&info.isAliveAtomic, i) +} + +func (info *ServiceInfo) IsAlive() bool { + return atomic.LoadInt32(&info.isAliveAtomic) != 0 } type affinityState struct { @@ -67,6 +116,19 @@ type balancerState struct { affinity affinityPolicy } +const numBurstSyncs int = 2 + +type serviceChange struct { + current *v1.Service + previous *v1.Service +} + +// Interface for async runner; abstracted for testing +type asyncRunnerInterface interface { + Run() + Loop(<-chan struct{}) +} + // assert LoadBalancer is an userspace.LoadBalancer var _ userspace.LoadBalancer = &LoadBalancer{} @@ -76,25 +138,71 @@ type LoadBalancer struct { istioClient istio.Interface syncPeriod time.Duration mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*ServiceObject + serviceMap map[proxy.ServicePortName]*ServiceInfo lock sync.RWMutex // protects services services map[proxy.ServicePortName]*balancerState policyMutex sync.Mutex // protects policyMap policyMap map[proxy.ServicePortName]Policy stopCh chan struct{} + // endpointsSynced and servicesSynced are set to 1 when the corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. + endpointsSynced int32 + servicesSynced int32 + initialized int32 + // protects serviceChanges + serviceChangesLock sync.Mutex + serviceChanges map[types.NamespacedName]*serviceChange // map of service changes + syncRunner asyncRunnerInterface // governs calls to syncProxyRules } func New(config *v1alpha1.LoadBalancer, kubeClient kubernetes.Interface, istioClient istio.Interface, syncPeriod time.Duration) *LoadBalancer { - return &LoadBalancer{ - Config: config, - kubeClient: kubeClient, - istioClient: istioClient, - syncPeriod: syncPeriod, - serviceMap: make(map[proxy.ServicePortName]*ServiceObject), - services: make(map[proxy.ServicePortName]*balancerState), - policyMap: make(map[proxy.ServicePortName]Policy), - stopCh: make(chan struct{}), + lb := &LoadBalancer{ + Config: config, + kubeClient: kubeClient, + istioClient: istioClient, + syncPeriod: syncPeriod, + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), + serviceChanges: make(map[types.NamespacedName]*serviceChange), + services: make(map[proxy.ServicePortName]*balancerState), + policyMap: make(map[proxy.ServicePortName]Policy), + stopCh: make(chan struct{}), } + lb.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", lb.syncServices, time.Minute, syncPeriod, numBurstSyncs) + return lb +} + +func (lb *LoadBalancer) isInitialized() bool { + return atomic.LoadInt32(&lb.initialized) > 0 +} + +func (lb *LoadBalancer) syncServices() { + start := time.Now() + defer func() { + klog.V(4).InfoS("syncServices complete", "elapsed", time.Since(start)) + }() + + // don't sync rules till we've received services and endpoints + if !lb.isInitialized() { + klog.V(2).InfoS("Not syncing userspace proxy until Services and Endpoints have been received from master") + return + } + + lb.serviceChangesLock.Lock() + changes := lb.serviceChanges + lb.serviceChanges = make(map[types.NamespacedName]*serviceChange) + lb.serviceChangesLock.Unlock() + + lb.mu.Lock() + defer lb.mu.Unlock() + + klog.V(4).InfoS("userspace proxy: processing service events", "count", len(changes)) + for _, change := range changes { + existingPorts := lb.mergeService(change.current) + lb.unmergeService(change.previous, existingPorts) + } + + lb.cleanupStaleStickySessions() } func (lb *LoadBalancer) Run() error { @@ -121,6 +229,7 @@ func (lb *LoadBalancer) Run() error { ) go lb.runEndpoints(endpointsInformer.Informer().HasSynced, lb.stopCh) kubeInformerFactory.Start(lb.stopCh) + go lb.syncRunner.Loop(lb.stopCh) } istioInformerFactory := istioinformers.NewSharedInformerFactory(lb.istioClient, lb.syncPeriod) @@ -283,107 +392,219 @@ func (lb *LoadBalancer) handleDeleteDestinationRule(obj interface{}) { lb.OnDestinationRuleDelete(dr) } -func (lb *LoadBalancer) newServiceObject(service *v1.Service) { - if service == nil { +// clean up any stale sticky session records in the hash map. +func (lb *LoadBalancer) cleanupStaleStickySessions() { + for name := range lb.serviceMap { + lb.CleanupStaleStickySessions(name) + } +} + +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. +func (lb *LoadBalancer) removeStaleAffinity(svcPort proxy.ServicePortName, newEndpoints []string) { + newEndpointsSet := sets.NewString() + for _, newEndpoint := range newEndpoints { + newEndpointsSet.Insert(newEndpoint) + } + + state, exists := lb.services[svcPort] + if !exists { return } + for _, existingEndpoint := range state.endpoints { + if !newEndpointsSet.Has(existingEndpoint) { + klog.V(2).InfoS("Delete endpoint for service", "endpoint", existingEndpoint, "servicePortName", svcPort) + removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint) + } + } +} + +func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { + if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { + return false + } + if !info.portal.ip.Equal(netutils.ParseIPSloppy(service.Spec.ClusterIP)) { + return false + } + if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) { + return false + } + if !servicehelper.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) { + return false + } + if info.sessionAffinityType != service.Spec.SessionAffinity { + return false + } + return true +} + +func ipsEqual(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if lhs[i] != rhs[i] { + return false + } + } + return true +} + +func (lb *LoadBalancer) mergeService(service *v1.Service) sets.String { + if service == nil { + return nil + } if utilproxy.ShouldSkipService(service) { - return + return nil } + existingPorts := sets.NewString() svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + existingPorts.Insert(servicePort.Name) + info, exists := lb.serviceMap[serviceName] + // TODO: check health of the socket? What if ProxyLoop exited? + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue + } + if exists { + klog.V(4).InfoS("Something changed for service: stopping it", "serviceName", serviceName) + delete(lb.serviceMap, serviceName) + info.setFinished() + } + serviceIP := netutils.ParseIPSloppy(service.Spec.ClusterIP) + klog.V(1).InfoS("Adding new service", "serviceName", serviceName, "addr", net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), "protocol", servicePort.Protocol) + info = &ServiceInfo{ + isAliveAtomic: 1, + protocol: servicePort.Protocol, + sessionAffinityType: v1.ServiceAffinityNone, + portal: portal{ + ip: serviceIP, + port: int(servicePort.Port), + }, + externalIPs: service.Spec.ExternalIPs, + } + // Deep-copy in case the service instance changes + info.loadBalancerStatus = *service.Status.LoadBalancer.DeepCopy() + info.nodePort = int(servicePort.NodePort) + info.sessionAffinityType = service.Spec.SessionAffinity + lb.serviceMap[serviceName] = info // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP - var stickyMaxAgeSeconds int if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP { - stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) - } - so := &ServiceObject{ - ip: serviceIP, - port: int(servicePort.Port), - protocol: servicePort.Protocol, - nodePort: int(servicePort.NodePort), - loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), - serviceAffinityType: v1.ServiceAffinityNone, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, + info.stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } - lb.serviceMap[serviceName] = so - err := lb.NewService(serviceName, so.serviceAffinityType, so.stickyMaxAgeSeconds) - if err != nil { + klog.V(4).InfoS("Record serviceInfo", "serviceInfo", info) + + if err := lb.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds); err != nil { klog.ErrorS(err, "Failed to new service", "serviceName", serviceName) } - } -} - -func (lb *LoadBalancer) OnServiceAdd(service *v1.Service) { - lb.mu.Lock() - defer lb.mu.Unlock() - lb.newServiceObject(service) - lb.cleanupStaleStickySessions() -} - -func (lb *LoadBalancer) OnServiceUpdate(_, service *v1.Service) { - lb.mu.Lock() - defer lb.mu.Unlock() + info.setStarted() + } - lb.newServiceObject(service) - lb.cleanupStaleStickySessions() + return existingPorts } -func (lb *LoadBalancer) OnServiceDelete(service *v1.Service) { - lb.mu.Lock() - defer lb.mu.Unlock() - +func (lb *LoadBalancer) unmergeService(service *v1.Service, existingPorts sets.String) { if service == nil { return } + if utilproxy.ShouldSkipService(service) { return } + staleUDPServices := sets.NewString() svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] + if existingPorts.Has(servicePort.Name) { + continue + } serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + + klog.V(1).InfoS("Stopping service", "serviceName", serviceName) + info, exists := lb.serviceMap[serviceName] + if !exists { + klog.ErrorS(nil, "Service is being removed but doesn't exist", "serviceName", serviceName) + continue + } + + if lb.serviceMap[serviceName].protocol == v1.ProtocolUDP { + staleUDPServices.Insert(lb.serviceMap[serviceName].portal.ip.String()) + } + delete(lb.serviceMap, serviceName) lb.DeleteService(serviceName) + info.setFinished() } - lb.cleanupStaleStickySessions() } -func (lb *LoadBalancer) OnServiceSynced() { +func (lb *LoadBalancer) serviceChange(previous, current *v1.Service, detail string) { + var svcName types.NamespacedName + if current != nil { + svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name} + } else { + svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name} + } + klog.V(4).InfoS("Record service change", "action", detail, "svcName", svcName) -} + lb.serviceChangesLock.Lock() + defer lb.serviceChangesLock.Unlock() -// clean up any stale sticky session records in the hash map. -func (lb *LoadBalancer) cleanupStaleStickySessions() { - for name := range lb.serviceMap { - lb.CleanupStaleStickySessions(name) + change, exists := lb.serviceChanges[svcName] + if !exists { + // change.previous is only set for new changes. We must keep + // the oldest service info (or nil) because correct unmerging + // depends on the next update/del after a merge, not subsequent + // updates. + change = &serviceChange{previous: previous} + lb.serviceChanges[svcName] = change } -} -// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. -// This assumes the lb.lock is held. -func (lb *LoadBalancer) removeStaleAffinity(svcPort proxy.ServicePortName, newEndpoints []string) { - newEndpointsSet := sets.NewString() - for _, newEndpoint := range newEndpoints { - newEndpointsSet.Insert(newEndpoint) - } + // Always use the most current service (or nil) as change.current + change.current = current - state, exists := lb.services[svcPort] - if !exists { - return + if reflect.DeepEqual(change.previous, change.current) { + // collapsed change had no effect + delete(lb.serviceChanges, svcName) + } else if lb.isInitialized() { + // change will have an effect, ask the proxy to sync + lb.syncRunner.Run() } - for _, existingEndpoint := range state.endpoints { - if !newEndpointsSet.Has(existingEndpoint) { - klog.V(2).InfoS("Delete endpoint for service", "endpoint", existingEndpoint, "servicePortName", svcPort) - removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint) - } +} + +func (lb *LoadBalancer) OnServiceAdd(service *v1.Service) { + lb.serviceChange(nil, service, "OnServiceAdd") +} + +func (lb *LoadBalancer) OnServiceUpdate(oldService, service *v1.Service) { + lb.serviceChange(oldService, service, "OnServiceUpdate") + klog.Info("") +} + +func (lb *LoadBalancer) OnServiceDelete(service *v1.Service) { + lb.serviceChange(service, nil, "OnServiceDelete") +} + +func (lb *LoadBalancer) OnServiceSynced() { + klog.V(2).InfoS("LoadBalancer OnServiceSynced") + + // Mark services as initialized and (if endpoints are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&lb.servicesSynced, 1) + if atomic.LoadInt32(&lb.endpointsSynced) > 0 { + atomic.StoreInt32(&lb.initialized, 1) } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go lb.syncServices() } func (lb *LoadBalancer) OnEndpointsAdd(endpoints *v1.Endpoints) { @@ -509,6 +730,19 @@ func (lb *LoadBalancer) OnEndpointsDelete(endpoints *v1.Endpoints) { } func (lb *LoadBalancer) OnEndpointsSynced() { + klog.V(2).InfoS(" OnEndpointsSynced") + + // Mark endpoints as initialized and (if services are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&lb.endpointsSynced, 1) + if atomic.LoadInt32(&lb.servicesSynced) > 0 { + atomic.StoreInt32(&lb.initialized, 1) + } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go lb.syncServices() } func (lb *LoadBalancer) OnDestinationRuleAdd(dr *istioapi.DestinationRule) { @@ -735,7 +969,6 @@ func (lb *LoadBalancer) dialEndpoint(protocol, endpoint string) (net.Conn, error } func (lb *LoadBalancer) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) { - klog.V(4).InfoS("", svcPort, srcAddr, sessionAffinityReset) return "", nil } @@ -798,8 +1031,8 @@ func (lb *LoadBalancer) GetServicePortName(namespacedName types.NamespacedName, lb.mu.Lock() defer lb.mu.Unlock() - for svcPort, so := range lb.serviceMap { - if svcPort.NamespacedName == namespacedName && so.port == port { + for svcPort, info := range lb.serviceMap { + if svcPort.NamespacedName == namespacedName && info.portal.port == port { return svcPort, true } }