Skip to content

Commit

Permalink
Containers: Fix mpirun issue where it cannot contact the workers
Browse files Browse the repository at this point in the history
There is an intermittent issue where mpirun cannot contact the workers
even though nslookup can successfully resolve their DNS hostnames in the
Init Container. This is seen somewhat infrequently, but has happened
enough. The end result causes the user containers to restart (if
restartLimit > 0), and it always seems to work on the second try.

This seems to solve the issue by using the Init Continer to use
mpirun to contact the workers and just get their hostnames. This
replaces the use of nslookup and ensures that mpirun can be successful
on the launcher. To support this, the Init Container must run as the
given UID/GID rather than root.

It also speeds up container start times as we only need to run 1 Init
Container for all of the workers rather than an Init Container for each
worker.

I have not been able to reproduce the original error using int-test,
which would (in)frequently catch this.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich committed Sep 6, 2023
1 parent d8b7b85 commit be4c12c
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions controllers/nnf_workflow_controller_container_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/go-logr/logr"
mpicommonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
mpiv2beta1 "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"go.openly.dev/pointy"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -105,11 +106,6 @@ func (c *nnfUserContainer) createMPIJob() error {
// Run the launcher on the first NNF node
launcherSpec.NodeSelector = map[string]string{"kubernetes.io/hostname": c.nnfNodes[0]}

// Use initContainers to ensure the workers are up and discoverable before running the launcher command
for i := range c.nnfNodes {
c.addInitContainerWorkerWait(launcherSpec, i)
}

// Target all the NNF nodes for the workers
replicas := int32(len(c.nnfNodes))
worker.Replicas = &replicas
Expand Down Expand Up @@ -152,6 +148,11 @@ func (c *nnfUserContainer) createMPIJob() error {
c.applyPermissions(launcherSpec, &mpiJob.Spec, false)
c.applyPermissions(workerSpec, &mpiJob.Spec, true)

// Use an Init Container to test the waters for mpi - ensure it can contact the workers before
// the launcher tries it. Since this runs as the UID/GID, this needs to happen after the
// passwd Init Container.
c.addInitContainerWorkerWait(launcherSpec, len(c.nnfNodes))

// Get the ports from the port manager
ports, err := c.getHostPorts()
if err != nil {
Expand Down Expand Up @@ -303,37 +304,55 @@ exit 0
})
}

func (c *nnfUserContainer) addInitContainerWorkerWait(spec *corev1.PodSpec, worker int) {
// Add an initContainer to ensure that a worker pod is up and discoverable via dns. This
// assumes nslookup is available in the container. The nnf-mfu image provides this.
script := `# use nslookup to contact workers
echo "contacting $HOST..."
func (c *nnfUserContainer) addInitContainerWorkerWait(spec *corev1.PodSpec, numWorkers int) {
// Add an initContainer to ensure that the worker pods are up and discoverable via mpirun.
script := `# use mpirun to contact workers
echo "contacting $HOSTS..."
for i in $(seq 1 100); do
sleep 1
echo "attempt $i of 100..."
nslookup $HOST
echo "mpirun -H $HOSTS hostname"
mpirun -H $HOSTS hostname
if [ $? -eq 0 ]; then
echo "successfully contacted $HOST; done"
echo "successfully contacted $HOSTS; done"
exit 0
fi
done
echo "failed to contact $HOST"
echo "failed to contact $HOSTS"
exit 1
`
// Build the worker's hostname.domain (e.g. nnf-container-example-worker-0.nnf-container-example-worker.default.svc)
// This name comes from mpi-operator.
host := strings.ToLower(fmt.Sprintf(
"%s-worker-%d.%s-worker.%s.svc", c.workflow.Name, worker, c.workflow.Name, c.workflow.Namespace))
script = strings.ReplaceAll(script, "$HOST", host)
// Build a slice of the workers' hostname.domain (e.g. nnf-container-example-worker-0.nnf-container-example-worker.default.svc)
// This hostname comes from mpi-operator.
workers := []string{}
for i := 0; i < numWorkers; i++ {
host := strings.ToLower(fmt.Sprintf(
"%s-worker-%d.%s-worker.%s.svc", c.workflow.Name, i, c.workflow.Name, c.workflow.Namespace))
workers = append(workers, host)
}
// mpirun takes a comma separated list of hosts (-H)
script = strings.ReplaceAll(script, "$HOSTS", strings.Join(workers, ","))

spec.InitContainers = append(spec.InitContainers, corev1.Container{
Name: fmt.Sprintf("mpi-wait-for-worker-%d", worker),
Name: fmt.Sprintf("mpi-wait-for-worker-%d", numWorkers),
Image: spec.Containers[0].Image,
Command: []string{
"/bin/sh",
"-c",
script,
},
// mpirun needs this environment variable to use DNS hostnames
Env: []corev1.EnvVar{{Name: "OMPI_MCA_orte_keep_fqdn_hostnames", Value: "true"}},
// Run this initContainer as the same UID/GID as the launcher
SecurityContext: &corev1.SecurityContext{
RunAsUser: &c.uid,
RunAsGroup: &c.gid,
RunAsNonRoot: pointy.Bool(true),
},
// And use the necessary volumes to support the UID/GID
VolumeMounts: []corev1.VolumeMount{
{MountPath: "/etc/passwd", Name: "passwd", SubPath: "passwd"},
{MountPath: "/home/mpiuser/.ssh", Name: "ssh-auth"},
},
})
}

Expand Down Expand Up @@ -389,16 +408,13 @@ func (c *nnfUserContainer) applyPermissions(spec *corev1.PodSpec, mpiJobSpec *mp
if !worker {
container.SecurityContext.RunAsUser = &c.uid
container.SecurityContext.RunAsGroup = &c.gid
nonRoot := true
container.SecurityContext.RunAsNonRoot = &nonRoot
su := false
container.SecurityContext.AllowPrivilegeEscalation = &su
container.SecurityContext.RunAsNonRoot = pointy.Bool(true)
container.SecurityContext.AllowPrivilegeEscalation = pointy.Bool(false)
} else {
// For the worker nodes, we need to ensure we have the appropriate linux capabilities to
// allow for ssh access for mpirun. Drop all capabilities and only add what is
// necessary. Only do this if the Capabilities have not been set by the user.
su := true
container.SecurityContext.AllowPrivilegeEscalation = &su
container.SecurityContext.AllowPrivilegeEscalation = pointy.Bool(true)
if container.SecurityContext.Capabilities == nil {
container.SecurityContext.Capabilities = &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
Expand Down

0 comments on commit be4c12c

Please sign in to comment.