Skip to content

Commit

Permalink
Handle type switching of LoadBalancer from Internal to Public and vic…
Browse files Browse the repository at this point in the history
…e versa (#272)

* handle LoadBalancer type switching

* fix wait function of load balancer controller

* delete existing load balancer if type is changed

* Address review comments
  • Loading branch information
kasabe28 authored Aug 28, 2023
1 parent 66773e2 commit 983d0a1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 74 deletions.
111 changes: 78 additions & 33 deletions pkg/cloudprovider/onmetal/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -63,10 +63,10 @@ func newOnmetalLoadBalancer(targetClient client.Client, onmetalClient client.Cli
}

func (o *onmetalLoadBalancer) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
loadBalancerName := o.GetLoadBalancerName(ctx, clusterName, service)
klog.V(2).InfoS("Getting LoadBalancer %s", loadBalancerName)
klog.V(2).InfoS("GetLoadBalancer for Service", "Cluster", clusterName, "Service", client.ObjectKeyFromObject(service))

loadBalancer := &networkingv1alpha1.LoadBalancer{}
loadBalancerName := o.GetLoadBalancerName(ctx, clusterName, service)
if err = o.onmetalClient.Get(ctx, client.ObjectKey{Namespace: o.onmetalNamespace, Name: loadBalancerName}, loadBalancer); err != nil {
return nil, false, fmt.Errorf("failed to get LoadBalancer %s for Service %s: %w", loadBalancerName, client.ObjectKeyFromObject(service), err)
}
Expand All @@ -87,7 +87,28 @@ func (o *onmetalLoadBalancer) GetLoadBalancerName(ctx context.Context, clusterNa
func (o *onmetalLoadBalancer) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
klog.V(2).InfoS("EnsureLoadBalancer for Service", "Cluster", clusterName, "Service", client.ObjectKeyFromObject(service))

// decide load balancer type based on service annotation for internal load balancer
var desiredLoadBalancerType networkingv1alpha1.LoadBalancerType
if value, ok := service.Annotations[InternalLoadBalancerAnnotation]; ok && value == "true" {
desiredLoadBalancerType = networkingv1alpha1.LoadBalancerTypeInternal
} else {
desiredLoadBalancerType = networkingv1alpha1.LoadBalancerTypePublic
}

loadBalancerName := getLoadBalancerNameForService(clusterName, service)

// get existing load balancer type
existingLoadBalancer := &networkingv1alpha1.LoadBalancer{}
var existingLoadBalancerType networkingv1alpha1.LoadBalancerType
if err := o.onmetalClient.Get(ctx, client.ObjectKey{Namespace: o.onmetalNamespace, Name: loadBalancerName}, existingLoadBalancer); err == nil {
existingLoadBalancerType = existingLoadBalancer.Spec.Type
if existingLoadBalancerType != desiredLoadBalancerType {
if err = o.EnsureLoadBalancerDeleted(ctx, clusterName, service); err != nil {
return nil, fmt.Errorf("failed deleting existing loadbalancer %s: %w", loadBalancerName, err)
}
}
}

klog.V(2).InfoS("Getting LoadBalancer ports from Service", "Service", client.ObjectKeyFromObject(service))
var lbPorts []networkingv1alpha1.LoadBalancerPort
for _, svcPort := range service.Spec.Ports {
Expand All @@ -113,7 +134,7 @@ func (o *onmetalLoadBalancer) EnsureLoadBalancer(ctx context.Context, clusterNam
},
},
Spec: networkingv1alpha1.LoadBalancerSpec{
Type: networkingv1alpha1.LoadBalancerTypePublic,
Type: desiredLoadBalancerType,
IPFamilies: service.Spec.IPFamilies,
NetworkRef: v1.LocalObjectReference{
Name: o.cloudConfig.NetworkName,
Expand All @@ -122,11 +143,11 @@ func (o *onmetalLoadBalancer) EnsureLoadBalancer(ctx context.Context, clusterNam
},
}

if value, ok := service.Annotations[InternalLoadBalancerAnnotation]; ok && value == "true" {
// if load balancer type is Internal then update IPSource with valid prefix template
if desiredLoadBalancerType == networkingv1alpha1.LoadBalancerTypeInternal {
if o.cloudConfig.PrefixName == "" {
return nil, fmt.Errorf("prefixName is not defined in config")
}
loadBalancer.Spec.Type = networkingv1alpha1.LoadBalancerTypeInternal
loadBalancer.Spec.IPs = []networkingv1alpha1.IPSource{
{
Ephemeral: &networkingv1alpha1.EphemeralPrefixSource{
Expand Down Expand Up @@ -156,50 +177,51 @@ func (o *onmetalLoadBalancer) EnsureLoadBalancer(ctx context.Context, clusterNam
}
klog.V(2).InfoS("Applied LoadBalancerRouting for LoadBalancer", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))

lbIPs := loadBalancer.Status.IPs
if len(lbIPs) == 0 {
if err := waitLoadBalancerActive(ctx, clusterName, service, o.onmetalClient, loadBalancer); err != nil {
return nil, err
}
}

lbIngress := []v1.LoadBalancerIngress{}
for _, ipAddr := range loadBalancer.Status.IPs {
lbIngress = append(lbIngress, v1.LoadBalancerIngress{IP: ipAddr.String()})
lbStatus, err := waitLoadBalancerActive(ctx, o.onmetalClient, existingLoadBalancerType, service, loadBalancer)
if err != nil {
return nil, err
}

return &v1.LoadBalancerStatus{Ingress: lbIngress}, nil
return &lbStatus, nil
}

func getLoadBalancerNameForService(clusterName string, service *v1.Service) string {
nameSuffix := strings.Split(string(service.UID), "-")[0]
return fmt.Sprintf("%s-%s-%s", clusterName, service.Name, nameSuffix)
}

func waitLoadBalancerActive(ctx context.Context, clusterName string, service *v1.Service, onmetalClient client.Client, loadBalancer *networkingv1alpha1.LoadBalancer) error {
klog.V(2).InfoS("Waiting for onmetal LoadBalancer to become ready", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
func waitLoadBalancerActive(ctx context.Context, onmetalClient client.Client, existingLoadBalancerType networkingv1alpha1.LoadBalancerType,
service *v1.Service, loadBalancer *networkingv1alpha1.LoadBalancer) (v1.LoadBalancerStatus, error) {
klog.V(2).InfoS("Waiting for LoadBalancer instance to become ready", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
backoff := wait.Backoff{
Duration: waitLoadbalancerInitDelay,
Factor: waitLoadbalancerFactor,
Steps: waitLoadbalancerActiveSteps,
}

err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
err := onmetalClient.Get(ctx, client.ObjectKey{Namespace: service.Namespace, Name: loadBalancer.Name}, loadBalancer)
if err == nil {
if len(loadBalancer.Status.IPs) > 0 {
return true, nil
}
loadBalancerStatus := v1.LoadBalancerStatus{}
if err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
if err := onmetalClient.Get(ctx, client.ObjectKey{Namespace: loadBalancer.Namespace, Name: loadBalancer.Name}, loadBalancer); err != nil {
return false, err
}
if len(loadBalancer.Status.IPs) == 0 {
return false, nil
}
return false, err
})
lbIngress := []v1.LoadBalancerIngress{}
for _, ipAddr := range loadBalancer.Status.IPs {
lbIngress = append(lbIngress, v1.LoadBalancerIngress{IP: ipAddr.String()})
}
loadBalancerStatus.Ingress = lbIngress

if wait.Interrupted(err) {
err = fmt.Errorf("timeout waiting for the onmetal LoadBalancer %s to become ready", client.ObjectKeyFromObject(loadBalancer))
if loadBalancer.Spec.Type != existingLoadBalancerType && servicehelper.LoadBalancerStatusEqual(&service.Status.LoadBalancer, &loadBalancerStatus) {
return false, nil
}
return true, nil
}); wait.Interrupted(err) {
return loadBalancerStatus, fmt.Errorf("timeout waiting for the LoadBalancer %s to become ready", client.ObjectKeyFromObject(loadBalancer))
}

klog.V(2).InfoS("onmetal LoadBalancer became ready", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
return err
klog.V(2).InfoS("LoadBalancer became ready", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
return loadBalancerStatus, nil
}

func (o *onmetalLoadBalancer) applyLoadBalancerRoutingForLoadBalancer(ctx context.Context, loadBalancer *networkingv1alpha1.LoadBalancer, nodes []*v1.Node) error {
Expand Down Expand Up @@ -332,6 +354,29 @@ func (o *onmetalLoadBalancer) EnsureLoadBalancerDeleted(ctx context.Context, clu
}
return fmt.Errorf("failed to delete loadbalancer %s: %w", client.ObjectKeyFromObject(loadBalancer), err)
}
if err := waitForDeletingLoadBalancer(ctx, service, o.onmetalClient, loadBalancer); err != nil {
return err
}
return nil
}

func waitForDeletingLoadBalancer(ctx context.Context, service *v1.Service, onmetalClient client.Client, loadBalancer *networkingv1alpha1.LoadBalancer) error {
klog.V(2).InfoS("Waiting for LoadBalancer instance to be deleted", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
backoff := wait.Backoff{
Duration: waitLoadbalancerInitDelay,
Factor: waitLoadbalancerFactor,
Steps: waitLoadbalancerActiveSteps,
}

if err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
if err := onmetalClient.Get(ctx, client.ObjectKey{Namespace: loadBalancer.Namespace, Name: loadBalancer.Name}, loadBalancer); !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); wait.Interrupted(err) {
return fmt.Errorf("timeout waiting for the LoadBalancer %s to be deleted", client.ObjectKeyFromObject(loadBalancer))
}

klog.V(2).InfoS("Deleted LoadBalancer", "LoadBalancer", client.ObjectKeyFromObject(loadBalancer))
return nil
}
71 changes: 30 additions & 41 deletions pkg/cloudprovider/onmetal/load_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
"fmt"
"time"

cloudprovider "k8s.io/cloud-provider"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
cloudprovider "k8s.io/cloud-provider"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"

commonv1alpha1 "github.com/onmetal/onmetal-api/api/common/v1alpha1"
Expand Down Expand Up @@ -130,30 +129,26 @@ var _ = Describe("LoadBalancer", func() {
Expect(k8sClient.Create(ctx, service)).To(Succeed())
DeferCleanup(k8sClient.Delete, service)

By("failing if no public IP is present for load balancer")
By("ensuring load balancer for service")
lbCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expect(lbProvider.EnsureLoadBalancer(lbCtx, clusterName, service, []*corev1.Node{node})).Error().To(HaveOccurred())

By("ensuring the load balancer type is public")
By("patching public IP into load balancer status")
loadBalancer := &networkingv1alpha1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: lbProvider.GetLoadBalancerName(ctx, clusterName, service),
},
}
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic))))

By("patching public IP into load balancer status")
Eventually(UpdateStatus(loadBalancer, func() {
loadBalancer.Status.IPs = []commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}
})).Should(Succeed())

By("ensuring load balancer for service")
Expect(lbProvider.EnsureLoadBalancer(ctx, clusterName, service, []*corev1.Node{node})).To(Equal(&corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{{IP: "10.0.0.1"}},
}))
By("ensuring the load balancer type is public and load balancer status has public IP")
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic)),
HaveField("Status.IPs", Equal([]commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}))))

By("ensuring destinations of load balancer routing")
lbRouting := &networkingv1alpha1.LoadBalancerRouting{
Expand Down Expand Up @@ -272,45 +267,41 @@ var _ = Describe("LoadBalancer", func() {
defer cancel()
Expect(lbProvider.EnsureLoadBalancer(lbCtx, clusterName, service, []*corev1.Node{node})).Error().To(HaveOccurred())

By("ensuring the load balancer type is internal")
By("patching internal IP in load balancer status")
loadBalancer := &networkingv1alpha1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: lbProvider.GetLoadBalancerName(ctx, clusterName, service),
},
}
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypeInternal))))

By("patching internal IP in load balancer status")
Eventually(UpdateStatus(loadBalancer, func() {
loadBalancer.Status.IPs = []commonv1alpha1.IP{commonv1alpha1.MustParseIP("100.0.0.10")}
})).Should(Succeed())

By("ensuring load balancer for service")
Expect(lbProvider.EnsureLoadBalancer(ctx, clusterName, service, []*corev1.Node{node})).
To(Equal(&corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{{IP: "100.0.0.10"}},
}))
By("ensuring the load balancer type is internal and load balancer status has internal IP")
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypeInternal)),
HaveField("Status.IPs", Equal([]commonv1alpha1.IP{commonv1alpha1.MustParseIP("100.0.0.10")}))))

By("removing internal load balancer annotation from service")
Eventually(Update(service, func() {
service.Annotations = map[string]string{}
})).Should(Succeed())

By("ensuring load balancer for service")
lbCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
Expect(lbProvider.EnsureLoadBalancer(lbCtx, clusterName, service, []*corev1.Node{node})).Error().To(HaveOccurred())

By("patching public IP into LoadBalancer status")
Eventually(UpdateStatus(loadBalancer, func() {
loadBalancer.Status.IPs = []commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}
})).Should(Succeed())

By("ensuring load balancer for service")
Expect(lbProvider.EnsureLoadBalancer(ctx, clusterName, service, []*corev1.Node{node})).To(Equal(&corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{{IP: "10.0.0.1"}},
}))

By("ensuring that the load balancer is of type public")
By("ensuring the load balancer type is public and load balancer status has public IP")
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic))))
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic)),
HaveField("Status.IPs", Equal([]commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}))))

By("ensuring destinations of load balancer routing")
lbRouting := &networkingv1alpha1.LoadBalancerRouting{
Expand All @@ -334,6 +325,9 @@ var _ = Describe("LoadBalancer", func() {
})),
))

By("getting load balancer for service")
Expect(lbProvider.GetLoadBalancer(ctx, clusterName, service)).Error().NotTo(HaveOccurred())

By("deleting the load balancer")
Expect(lbProvider.EnsureLoadBalancerDeleted(ctx, clusterName, service)).To(Succeed())
})
Expand Down Expand Up @@ -449,31 +443,26 @@ var _ = Describe("LoadBalancer", func() {
Expect(k8sClient.Create(ctx, service)).To(Succeed())
DeferCleanup(k8sClient.Delete, service)

By("failing if no public IP is present for load balancer")
By("ensuring load balancer for service")
ensureCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
Expect(lbProvider.EnsureLoadBalancer(ensureCtx, clusterName, service, []*corev1.Node{node})).Error().To(HaveOccurred())

By("ensuring the load balancer type is internal")
By("patching public IP in load balancer status")
loadBalancer := &networkingv1alpha1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: lbProvider.GetLoadBalancerName(ctx, clusterName, service),
},
}
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic))))

By("patching internal IP in load balancer status")
Eventually(UpdateStatus(loadBalancer, func() {
loadBalancer.Status.IPs = []commonv1alpha1.IP{commonv1alpha1.MustParseIP("100.0.0.10")}
loadBalancer.Status.IPs = []commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}
})).Should(Succeed())

By("ensuring load balancer for service")
Expect(lbProvider.EnsureLoadBalancer(ctx, clusterName, service, []*corev1.Node{node})).
To(Equal(&corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{{IP: "100.0.0.10"}},
}))
By("ensuring the load balancer type is public and load balancer status has public IP")
Eventually(Object(loadBalancer)).Should(SatisfyAll(
HaveField("Spec.Type", Equal(networkingv1alpha1.LoadBalancerTypePublic)),
HaveField("Status.IPs", Equal([]commonv1alpha1.IP{commonv1alpha1.MustParseIP("10.0.0.1")}))))

By("creating a second machine object")
machine2 := &computev1alpha1.Machine{
Expand Down

0 comments on commit 983d0a1

Please sign in to comment.