Skip to content

Commit

Permalink
chore: applied feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mojtaba-esk committed Oct 16, 2024
1 parent 38437cb commit 27e3ce6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 34 deletions.
4 changes: 2 additions & 2 deletions e2e/basic/headless_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func (s *Suite) TestHeadlessService() {
s.Require().NoError(err)

elapsed := time.Since(startTime)
s.T().Logf("i: %d, test took %d seconds, output: `%s`", i, int64(elapsed.Seconds()), output)
s.T().Logf("i: %d, test took %.2f seconds, output: `%s`", i, elapsed.Seconds(), output)

gotPacketloss, err := strconv.ParseFloat(output, 64)
s.Require().NoError(err, fmt.Sprintf("error parsing output: `%s`", output))
s.Require().NoErrorf(err, "failed to parse output: `%s`", output)

s.Assert().Zero(gotPacketloss)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/instance/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,5 @@ var (
ErrGettingServiceEndpointNotAllowed = errors.New("GettingServiceEndpointNotAllowed", "getting service endpoint is only allowed in state 'Started'. Current state is '%s'")
ErrCannotCloneInstance = errors.New("CannotCloneInstance", "cannot clone instance '%s' in state '%s'")
ErrGettingIPNotAllowed = errors.New("GettingIPNotAllowed", "getting IP is allowed in state 'Started'. Current state is '%s'")
ErrPodIPNotReady = errors.New("PodIPNotReady", "pod IP is not ready for pod '%s'")
)
6 changes: 6 additions & 0 deletions pkg/instance/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (n *network) AddPortUDP(port int) error {

// GetIP returns the IP of the instance
// This function can only be called in the states 'Started'
// The IP is not persistent and can be changed when the pod is restarted
// If a persistent IP is needed, use HostName() instead
func (n *network) GetIP(ctx context.Context) (string, error) {
if !n.instance.IsInState(StateStarted) {
return "", ErrGettingIPNotAllowed.WithParams(n.instance.state.String())
Expand All @@ -130,6 +132,10 @@ func (n *network) GetIP(ctx context.Context) (string, error) {
return "", ErrGettingPodFromReplicaSet.WithParams(n.instance.name).Wrap(err)
}

if pod.Status.PodIP == "" {
return "", ErrPodIPNotReady.WithParams(pod.Name)
}

return pod.Status.PodIP, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,7 @@ var (
ErrListingPods = errors.New("ListingPods", "failed to list pods")
ErrGetPodStatus = errors.New("GetPodStatus", "failed to get pod status for pod %s")
ErrUpdatingConfigmap = errors.New("UpdatingConfigmap", "failed to update configmap %s")
ErrGettingServiceIP = errors.New("GettingServiceIP", "failed to get service IP for service %s")
ErrGettingServiceNodePort = errors.New("GettingServiceNodePort", "failed to get service node port for service %s")
ErrHeadlessService = errors.New("HeadlessService", "headless service '%s' does not have a cluster IP, use DNS instead")
)
84 changes: 52 additions & 32 deletions pkg/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,48 @@ func (c *Client) DeleteService(ctx context.Context, name string) error {
}

func (c *Client) GetServiceIP(ctx context.Context, name string) (string, error) {
svc, err := c.GetService(ctx, name)
srv, err := c.GetService(ctx, name)
if err != nil {
return "", ErrGettingService.WithParams(name).Wrap(err)
}
return svc.Spec.ClusterIP, nil

if srv.Spec.Type == v1.ServiceTypeLoadBalancer {
// Use the LoadBalancer's external IP
if len(srv.Status.LoadBalancer.Ingress) > 0 {
return srv.Status.LoadBalancer.Ingress[0].IP, nil
}
return "", ErrLoadBalancerIPNotAvailable
}

if srv.Spec.Type != v1.ServiceTypeNodePort {
// Headless service does not have a cluster IP
if srv.Spec.ClusterIP == v1.ClusterIPNone {
return "", ErrHeadlessService.WithParams(name)
}
return srv.Spec.ClusterIP, nil
}

// Use the Node IP and NodePort
nodes, err := c.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return "", ErrGettingNodes.Wrap(err)
}
if len(nodes.Items) == 0 {
return "", ErrNoNodesFound
}

// Use the first node for simplicity, you might need to handle multiple nodes
var nodeIP string
for _, address := range nodes.Items[0].Status.Addresses {
if address.Type == v1.NodeExternalIP {
nodeIP = address.Address
break
}
}
return nodeIP, nil
}

// WaitForService() works only for the services with publicly accessible IP
func (c *Client) WaitForService(ctx context.Context, name string) error {
retryInterval := time.Duration(0)
for {
Expand All @@ -148,11 +183,13 @@ func (c *Client) WaitForService(ctx context.Context, name string) error {
}

// Check if service is reachable
// Since this function is called from the client,
// we cannot use headless service as it is not accessible from outside the cluster
// so we use the service IP and port to check connectivity
endpoint, err := c.GetServiceEndpoint(ctx, name)
if err != nil {
return ErrGettingServiceEndpoint.WithParams(name).Wrap(err)
}

if err := checkServiceConnectivity(endpoint); err != nil {
continue
}
Expand All @@ -166,43 +203,26 @@ func (c *Client) ServiceDNS(name string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local", name, c.namespace)
}

// TODO: refactor this function to use the service IP directly
func (c *Client) GetServiceEndpoint(ctx context.Context, name string) (string, error) {
srv, err := c.clientset.CoreV1().Services(c.namespace).Get(ctx, name, metav1.GetOptions{})
ip, err := c.GetServiceIP(ctx, name)
if err != nil {
return "", ErrGettingService.WithParams(name).Wrap(err)
return "", ErrGettingServiceIP.WithParams(name).Wrap(err)
}

if srv.Spec.Type == v1.ServiceTypeLoadBalancer {
// Use the LoadBalancer's external IP
if len(srv.Status.LoadBalancer.Ingress) > 0 {
return fmt.Sprintf("%s:%d", srv.Status.LoadBalancer.Ingress[0].IP, srv.Spec.Ports[0].Port), nil
}
return "", ErrLoadBalancerIPNotAvailable
port, err := c.ServiceNodePort(ctx, name)
if err != nil {
return "", ErrGettingServiceNodePort.WithParams(name).Wrap(err)
}

if srv.Spec.Type == v1.ServiceTypeNodePort {
// Use the Node IP and NodePort
nodes, err := c.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return "", ErrGettingNodes.Wrap(err)
}
if len(nodes.Items) == 0 {
return "", ErrNoNodesFound
}
return fmt.Sprintf("%s:%d", ip, port), nil
}

// Use the first node for simplicity, you might need to handle multiple nodes
var nodeIP string
for _, address := range nodes.Items[0].Status.Addresses {
if address.Type == v1.NodeExternalIP {
nodeIP = address.Address
break
}
}
return fmt.Sprintf("%s:%d", nodeIP, srv.Spec.Ports[0].NodePort), nil
func (c *Client) ServiceNodePort(ctx context.Context, name string) (int32, error) {
svc, err := c.GetService(ctx, name)
if err != nil {
return 0, ErrGettingService.WithParams(name).Wrap(err)
}

return fmt.Sprintf("%s:%d", srv.Spec.ClusterIP, srv.Spec.Ports[0].Port), nil
return svc.Spec.Ports[0].NodePort, nil
}

func (c *Client) isServiceReady(ctx context.Context, name string) (bool, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type KubeManager interface {
GetServiceEndpoint(ctx context.Context, name string) (string, error)
GetServiceIP(ctx context.Context, name string) (string, error)
ServiceDNS(name string) string
ServiceNodePort(ctx context.Context, name string) (int32, error)
IsPodRunning(ctx context.Context, name string) (bool, error)
IsReplicaSetRunning(ctx context.Context, name string) (bool, error)
Namespace() string
Expand Down

0 comments on commit 27e3ce6

Please sign in to comment.