Skip to content

Commit

Permalink
Containers: Allow containers to open ports for communication with com… (
Browse files Browse the repository at this point in the history
#214)

This change allows for container workflows to open ports. These ports
are opened on the host nodes (i.e. NNF nodes) where the containers are
running. This enables traffic from outside of the network through the IP
address of the NNF node and the port. An application on the compute node
can contact the container with <NNF_NODE_IP>:<PORT>.

The port number(s) can be retrieved via the NNF_CONTAINER_PORTS
environment variable. This environment variable is available inside of
the containers. It is also provided to the Workflow so that Flux can
inform the application on the compute node of which port(s) to use. If
multiple ports are desired, the environment variable will provide a
comma separated list of port numbers.

Ports are requested via the NnfContainerProfile's `numPorts`. **A system
admin must enable the `Ports` port range in the `SystemConfiguration`
before ports can be requested**. If not, the NnfPortManager will not
allocate any ports.

More details:
- Enabled default NnfPortManager to manage port allocation
- Port allocation occurs in the Setup State
- Port de-allocation occurs in the Teardown State
- User Container Pods are now destroyed in the Teardown State prior to
  Port de-allocation
- Added `example-mpi-webserver` NnfContainerProfile to show use of
  envionrment variable with a simple webserver
- Added container teardown + port allocation to workflow deletion

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich authored Aug 3, 2023
1 parent 7b17a96 commit b01d954
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 57 deletions.
6 changes: 6 additions & 0 deletions api/v1alpha1/nnfcontainerprofile_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type NnfContainerProfileData struct {
// this profile.
GroupID *uint32 `json:"groupID,omitempty"`

// Number of ports to open for communication with the user container. These ports are opened on
// the targeted NNF nodes and can be accessed outside of the k8s cluster (e.g. compute nodes).
// The requested ports are made available as environment variables inside the container and in
// the DWS workflow (NNF_CONTAINER_PORTS).
NumPorts int32 `json:"numPorts,omitempty"`

// Spec to define the containers created from container profile. This is used for non-MPI
// containers.
// Either this or MPISpec must be provided, but not both.
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/nnf.cray.hpe.com_nnfcontainerprofiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8616,6 +8616,14 @@ spec:
required:
- mpiReplicaSpecs
type: object
numPorts:
description: Number of ports to open for communication with the user
container. These ports are opened on the targeted NNF nodes and
can be accessed outside of the k8s cluster (e.g. compute nodes).
The requested ports are made available as environment variables
inside the container and in the DWS workflow (NNF_CONTAINER_PORTS).
format: int32
type: integer
pinned:
default: false
description: Pinned is true if this instance is an immutable copy
Expand Down
2 changes: 1 addition & 1 deletion config/dws/nnf-ruleset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ spec:
isRequired: true
isValueRequired: true
- command: "container"
watchStates: Proposal,PreRun,PostRun,Teardown
watchStates: Proposal,Setup,PreRun,PostRun,Teardown
ruleDefs:
- key: "^name$"
type: "string"
Expand Down
40 changes: 40 additions & 0 deletions config/examples/nnf_v1alpha1_nnfcontainerprofiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ metadata:
name: example-mpi
data:
retryLimit: 6
numPorts: 1
storages:
- name: DW_JOB_foo_local_storage
optional: false
Expand Down Expand Up @@ -98,3 +99,42 @@ data:
containers:
- name: example-mpi
image: nnf-mfu:latest

---
apiVersion: nnf.cray.hpe.com/v1alpha1
kind: NnfContainerProfile
metadata:
name: example-mpi-webserver
data:
retryLimit: 6
numPorts: 1
storages:
- name: DW_JOB_foo_local_storage
optional: false
- name: DW_PERSISTENT_foo_persistent_storage
optional: true
- name: DW_GLOBAL_foo_global_lustre
optional: true
pvcMode: ReadWriteMany
mpiSpec:
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
template:
spec:
containers:
- name: example-mpi-webserver
image: ghcr.io/nearnodeflash/nnf-container-example:latest
command:
- mpirun
- python3
- -m
- http.server
- $(NNF_CONTAINER_PORTS)
Worker:
template:
spec:
containers:
- name: example-mpi-webserver
image: ghcr.io/nearnodeflash/nnf-container-example:latest
4 changes: 4 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ spec:
value: nnf-system
- name: NNF_CONTAINER_PROFILE_NAMESPACE
value: nnf-system
- name: NNF_PORT_MANAGER_NAME
value: nnf-port-manager
- name: NNF_PORT_MANAGER_NAMESPACE
value: nnf-system
ports:
- containerPort: 50057
name: nnf-ec
Expand Down
12 changes: 12 additions & 0 deletions config/ports/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Adds namespace to all resources.
namespace: nnf-system

# Value of this field is prepended to the
# names of all resources, e.g. a deployment named
# "wordpress" becomes "alices-wordpress".
# Note that it should also match with the prefix (text before '-') of the namespace
# field above.
namePrefix: nnf-

resources:
- port_manager.yaml
File renamed without changes.
5 changes: 5 additions & 0 deletions config/samples/nnf_v1alpha1_nnfcontainerprofile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ data:
# PostRun. Defaults to 0. A value of 0 disables this behavior.
postRunTimeoutSeconds: 0

# Request the number of ports to open on the targeted rabbits. These ports are accessible outside
# of the k8s cluster. The requested ports are made available as environment variables inside the
# container and in the DWS workflow (NNF_CONTAINER_PORTS).
numPorts: 0

# List of possible filesystems supported by this container profile. These
# storages are mounted inside of the container. Any non-optional storage must
# be supplied with the container directive as an argument and must reference
Expand Down
26 changes: 17 additions & 9 deletions controllers/nnf_port_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,31 @@ func (r *NnfPortManagerReconciler) cleanupUnusedAllocations(log logr.Logger, mgr
// Free unused allocations. This will check if the Status.Allocations exist in
// the list of desired allocations in the Spec field and mark any unused allocations
// as freed.
failedIndices := make([]int, 0)
allocsToRemove := make([]int, 0)
for idx := range mgr.Status.Allocations {
status := &mgr.Status.Allocations[idx]

if !r.isAllocationNeeded(mgr, status) {
log.Info("Allocation unused", "requester", status.Requester, "status", status.Status)
if status.Status == nnfv1alpha1.NnfPortManagerAllocationStatusInUse {
status.Requester = nil
status.Status = nnfv1alpha1.NnfPortManagerAllocationStatusFree
} else if status.Status != nnfv1alpha1.NnfPortManagerAllocationStatusFree {
failedIndices = append(failedIndices, idx)
}

// TODO: allow for cooldown
// if status.Status == nnfv1alpha1.NnfPortManagerAllocationStatusInUse {
// status.Requester = nil
// status.Status = nnfv1alpha1.NnfPortManagerAllocationStatusCooldown
// } else if status.Status == nnfv1alpha1.NnfPortManagerAllocationStatusCooldown {
// if now() - status.timeFreed > cooldownPeriod {
// allocsToRemove = append(allocsToRemove, idx)
// }
// } else if status.Status != nnfv1alpha1.NnfPortManagerAllocationStatusFree {
// allocsToRemove = append(allocsToRemove, idx)
// }

allocsToRemove = append(allocsToRemove, idx)
}
}

for idx := range failedIndices {
failedIdx := failedIndices[len(failedIndices)-1-idx] // remove in reverse order
for idx := range allocsToRemove {
failedIdx := allocsToRemove[len(allocsToRemove)-1-idx] // remove in reverse order
mgr.Status.Allocations = append(mgr.Status.Allocations[:failedIdx], mgr.Status.Allocations[failedIdx+1:]...)
}
}
Expand Down
65 changes: 62 additions & 3 deletions controllers/nnf_port_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var _ = Context("NNF Port Manager Controller Setup", Ordered, func() {
DeferCleanup(func() { Expect(k8sClient.Delete(ctx, mgr)).To(Succeed()) })
})

reservePorts := func(mgr *nnfv1alpha1.NnfPortManager, name string, count int) {
reservePorts := func(mgr *nnfv1alpha1.NnfPortManager, name string, count int) []uint16 {
By(fmt.Sprintf("Reserving %d ports for '%s'", count, name))

allocation := nnfv1alpha1.NnfPortManagerAllocationSpec{
Expand All @@ -110,6 +110,8 @@ var _ = Context("NNF Port Manager Controller Setup", Ordered, func() {
Expect(status).ToNot(BeNil())
Expect(status.Ports).To(HaveLen(allocation.Count))
Expect(status.Status).To(Equal(nnfv1alpha1.NnfPortManagerAllocationStatusInUse))

return status.Ports
}

releasePorts := func(mgr *nnfv1alpha1.NnfPortManager, name string) {
Expand All @@ -130,28 +132,85 @@ var _ = Context("NNF Port Manager Controller Setup", Ordered, func() {
}).Should(Succeed())
}

// Verify the number of allocations in the status allocation list
verifyNumAllocations := func(mgr *nnfv1alpha1.NnfPortManager, count int) {
By(fmt.Sprintf("Verifying there are %d allocations in the status allocation list", count))

Eventually(func() int {
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).To(Succeed())
return len(mgr.Status.Allocations)
}).Should(Equal(count))
}

It("Reserves & removes a single port", func() {
const name = "single"
reservePorts(mgr, name, 1)
ports := reservePorts(mgr, name, 1)
Expect(ports[0]).To(BeEquivalentTo(portStart))
verifyNumAllocations(mgr, 1)
releasePorts(mgr, name)
verifyNumAllocations(mgr, 0)
})

It("Reserves & removes a multiple ports, one after another", func() {
first := "first"
ports := reservePorts(mgr, first, 1)
Expect(ports[0]).To(BeEquivalentTo(portStart))
verifyNumAllocations(mgr, 1)

second := "second"
ports = reservePorts(mgr, second, 1)
Expect(ports[0]).To(BeEquivalentTo(portStart + 1))
verifyNumAllocations(mgr, 2)

releasePorts(mgr, first)
verifyNumAllocations(mgr, 1)

releasePorts(mgr, second)
verifyNumAllocations(mgr, 0)
})

It("Reserves & removes a multiple ports, one at a time", func() {
first := "first"
ports := reservePorts(mgr, first, 1)
firstPort := ports[0]
Expect(ports[0]).To(BeEquivalentTo(portStart))
verifyNumAllocations(mgr, 1)
releasePorts(mgr, first)
verifyNumAllocations(mgr, 0)

// Port should be reused since it was freed already
// This will fail once cooldowns are introduced
second := "second"
ports = reservePorts(mgr, second, 1)
Expect(ports[0]).To(BeEquivalentTo(firstPort))
verifyNumAllocations(mgr, 1)

releasePorts(mgr, second)
verifyNumAllocations(mgr, 0)
})

It("Reserves & removes all ports", func() {
const name = "all"
reservePorts(mgr, name, portEnd-portStart+1)
verifyNumAllocations(mgr, 1)
releasePorts(mgr, name)
verifyNumAllocations(mgr, 0)
})

It("Reserves from free list", func() {
const single = "single"
reservePorts(mgr, single, 1)

const remaining = "remaining"
reservePorts(mgr, remaining, portEnd-portStart)
count := portEnd - portStart
reservePorts(mgr, remaining, count)

releasePorts(mgr, single)
verifyNumAllocations(mgr, 1)

reservePorts(mgr, "free", 1)

verifyNumAllocations(mgr, 2)
})

It("Fails with insufficient resources", func() {
Expand Down
Loading

0 comments on commit b01d954

Please sign in to comment.