Skip to content

Commit

Permalink
chore: change services to be headless
Browse files Browse the repository at this point in the history
  • Loading branch information
mojtaba-esk committed Oct 15, 2024
1 parent 339f749 commit a7f3913
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 3 deletions.
68 changes: 68 additions & 0 deletions e2e/basic/headless_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package basic

import (
"context"
"fmt"
"strconv"
"time"
)

const gopingImage = "ghcr.io/celestiaorg/goping:4803195"

func (s *Suite) TestHeadlessService() {
const (
namePrefix = "headless-srv-test"
numOfPingPackets = 100
numOfTests = 10
packetTimeout = 1 * time.Second
gopingPort = 8001
)
ctx := context.Background()

mother, err := s.Knuu.NewInstance(namePrefix + "mother")
s.Require().NoError(err)

err = mother.Build().SetImage(ctx, gopingImage)
s.Require().NoError(err)

s.Require().NoError(mother.Network().AddPortTCP(gopingPort))
s.Require().NoError(mother.Build().Commit(ctx))

err = mother.Build().SetEnvironmentVariable("SERVE_ADDR", fmt.Sprintf("0.0.0.0:%d", gopingPort))
s.Require().NoError(err)

target, err := mother.CloneWithName(namePrefix + "target")
s.Require().NoError(err)

executor, err := mother.CloneWithName(namePrefix + "executor")
s.Require().NoError(err)

// Prepare ping executor & target

s.Require().NoError(target.Execution().Start(ctx))
s.Require().NoError(executor.Execution().Start(ctx))

targetEndpoint, err := target.Network().GetServiceEndpoint(gopingPort)
s.Require().NoError(err)
s.T().Logf("targetEndpoint: %v", targetEndpoint)

s.T().Log("Starting ping test. It takes a while.")
for i := 0; i < numOfTests; i++ {
startTime := time.Now()

output, err := executor.Execution().ExecuteCommand(ctx, "goping", "ping", "-q",
"-c", fmt.Sprint(numOfPingPackets),
"-t", packetTimeout.String(),
"-m", "packetloss",
targetEndpoint)
s.Require().NoError(err)

elapsed := time.Since(startTime)
s.T().Logf("i: %d, test took %d seconds, output: `%s`", i, int64(elapsed.Seconds()), output)

gotPacketloss, err := strconv.ParseFloat(output, 64)
s.Require().NoError(err, fmt.Sprintf("error parsing output: `%s`", output))

s.Assert().Zero(gotPacketloss)
}
}
1 change: 1 addition & 0 deletions pkg/instance/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,5 @@ var (
ErrAddingHostToProxyNotAllowed = errors.New("AddingHostToProxyNotAllowed", "adding host to proxy is only allowed in state 'Started' and 'Preparing'. Current state is '%s'")
ErrInstanceNameAlreadyExists = errors.New("InstanceNameAlreadyExists", "instance name '%s' already exists")
ErrSettingSidecarName = errors.New("SettingSidecarName", "error setting sidecar name with prefix '%s' for instance '%s'")
ErrGettingServiceEndpointNotAllowed = errors.New("GettingServiceEndpointNotAllowed", "getting service endpoint is only allowed in state 'Started'. Current state is '%s'")
)
14 changes: 14 additions & 0 deletions pkg/instance/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instance

import (
"context"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -149,6 +150,19 @@ func (n *network) GetIP(ctx context.Context) (string, error) {
return ip, nil
}

// GetServiceEndpoint returns the endpoint of the service for the instance
// This function can only be called in the state 'Started'
func (n *network) GetServiceEndpoint(port int) (string, error) {
if !n.instance.IsInState(StateStarted) {
return "", ErrGettingServiceEndpointNotAllowed.WithParams(n.instance.state.String())
}
dns := n.instance.K8sClient.GetServiceDNS(n.instance.name)
if port == 0 {
return dns, nil
}
return fmt.Sprintf("%s:%d", dns, port), nil
}

// deployService deploys the service for the instance
func (n *network) deployService(ctx context.Context, portsTCP, portsUDP []int) error {
// a sidecar instance should use the parent instance's service
Expand Down
12 changes: 9 additions & 3 deletions pkg/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (c *Client) WaitForService(ctx context.Context, name string) error {
}
}

func (c *Client) GetServiceDNS(name string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local", name, c.namespace)
}

// TODO: refactor this function to use the service IP directly
func (c *Client) GetServiceEndpoint(ctx context.Context, name string) (string, error) {
srv, err := c.clientset.CoreV1().Services(c.namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -276,9 +281,10 @@ func prepareService(
Labels: labels,
},
Spec: v1.ServiceSpec{
Ports: servicePorts,
Selector: selectorMap,
Type: v1.ServiceTypeClusterIP,
Ports: servicePorts,
Selector: selectorMap,
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone, // Headless service
},
}
return svc, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type KubeManager interface {
GetService(ctx context.Context, name string) (*corev1.Service, error)
GetServiceEndpoint(ctx context.Context, name string) (string, error)
GetServiceIP(ctx context.Context, name string) (string, error)
GetServiceDNS(name string) string
IsPodRunning(ctx context.Context, name string) (bool, error)
IsReplicaSetRunning(ctx context.Context, name string) (bool, error)
Namespace() string
Expand Down

0 comments on commit a7f3913

Please sign in to comment.