From 28cc5b839b61047ffae0316c8a3f908c9af135e1 Mon Sep 17 00:00:00 2001 From: Blake Devcich Date: Wed, 30 Aug 2023 12:23:55 -0500 Subject: [PATCH] Containers: Added PreRunTimeoutSeconds + Error Exit Strategy - Adds a timeout to Prerun and errors out if the containers do not start - Moved timeout anonymous functions to be normal functions - Fixed some issues with the example profiles for testing + added example-mpi-fail - Improved error handing in Postrun Signed-off-by: Blake Devcich --- api/v1alpha1/nnfcontainerprofile_types.go | 14 +- ...nnf.cray.hpe.com_nnfcontainerprofiles.yaml | 15 +- .../nnf_v1alpha1_nnfcontainerprofiles.yaml | 180 +++++++++++------- .../nnf_workflow_controller_helpers.go | 179 ++++++++++++----- 4 files changed, 262 insertions(+), 126 deletions(-) diff --git a/api/v1alpha1/nnfcontainerprofile_types.go b/api/v1alpha1/nnfcontainerprofile_types.go index 969d66fc5..12d658505 100644 --- a/api/v1alpha1/nnfcontainerprofile_types.go +++ b/api/v1alpha1/nnfcontainerprofile_types.go @@ -40,13 +40,21 @@ type NnfContainerProfileData struct { // List of possible filesystems supported by this container profile Storages []NnfContainerProfileStorage `json:"storages,omitempty"` - // Stop any containers after X seconds once a workflow has transitioned to PostRun. Defaults to - // 0. A value of 0 disables this behavior. + // Containers are launched in the PreRun state. Allow this many seconds for the containers to + // start before declaring an error to the workflow. + // Defaults to 60. A value of 0 disables this behavior. + // +kubebuilder:default:=60 + // +kubebuilder:validation:Minimum:=0 + PreRunTimeoutSeconds int64 `json:"preRunTimeoutSeconds,omitempty"` + + // Containers are expected to complete in the PostRun State. Allow this many seconds for the + // containers to exit before declaring an error the workflow. + // Defaults to 0. A value of 0 disables this behavior. // +kubebuilder:validation:Minimum:=0 PostRunTimeoutSeconds int64 `json:"postRunTimeoutSeconds,omitempty"` // Specifies the number of times a container will be retried upon a failure. A new pod is - // deployed on each retry. Defaults to 6 by kubernetes itself and must be set. A value of 0 + // deployed on each retry. Defaults to 6 by kubernetes itself and must be set. A value of 0 // disables retries. // +kubebuilder:validation:Minimum:=0 // +kubebuilder:default:=6 diff --git a/config/crd/bases/nnf.cray.hpe.com_nnfcontainerprofiles.yaml b/config/crd/bases/nnf.cray.hpe.com_nnfcontainerprofiles.yaml index 65ea0c77e..fb6d3b96a 100644 --- a/config/crd/bases/nnf.cray.hpe.com_nnfcontainerprofiles.yaml +++ b/config/crd/bases/nnf.cray.hpe.com_nnfcontainerprofiles.yaml @@ -8629,16 +8629,25 @@ spec: description: Pinned is true if this instance is an immutable copy type: boolean postRunTimeoutSeconds: - description: Stop any containers after X seconds once a workflow has - transitioned to PostRun. Defaults to 0. A value of 0 disables this + description: Containers are expected to complete in the PostRun State. + Allow this many seconds for the containers to exit before declaring + an error the workflow. Defaults to 0. A value of 0 disables this behavior. format: int64 minimum: 0 type: integer + preRunTimeoutSeconds: + default: 60 + description: Containers are launched in the PreRun state. Allow this + many seconds for the containers to start before declaring an error + to the workflow. Defaults to 60. A value of 0 disables this behavior. + format: int64 + minimum: 0 + type: integer retryLimit: default: 6 description: Specifies the number of times a container will be retried - upon a failure. A new pod is deployed on each retry. Defaults to + upon a failure. A new pod is deployed on each retry. Defaults to 6 by kubernetes itself and must be set. A value of 0 disables retries. format: int32 minimum: 0 diff --git a/config/examples/nnf_v1alpha1_nnfcontainerprofiles.yaml b/config/examples/nnf_v1alpha1_nnfcontainerprofiles.yaml index 6b0840896..80d8c8035 100644 --- a/config/examples/nnf_v1alpha1_nnfcontainerprofiles.yaml +++ b/config/examples/nnf_v1alpha1_nnfcontainerprofiles.yaml @@ -5,18 +5,34 @@ metadata: data: retryLimit: 6 storages: - - name: DW_JOB_foo_local_storage - optional: false - - name: DW_PERSISTENT_foo_persistent_storage - optional: true + - name: DW_JOB_foo_local_storage + optional: false + - name: DW_PERSISTENT_foo_persistent_storage + optional: true + - name: DW_GLOBAL_foo_global_lustre + optional: true spec: containers: - - name: example-success - image: alpine:latest - command: - - /bin/sh - - -c - - "sleep 15 && exit 0" + - name: example-success + image: alpine:latest + command: + - /bin/sh + - -c + - "sleep 10 && exit 0" +--- +apiVersion: nnf.cray.hpe.com/v1alpha1 +kind: NnfContainerProfile +metadata: + name: example-fail +data: + spec: + containers: + - name: example-fail + image: alpine:latest + command: + - /bin/sh + - -c + - "sleep 10 && exit 1" --- apiVersion: nnf.cray.hpe.com/v1alpha1 kind: NnfContainerProfile @@ -25,23 +41,23 @@ metadata: data: retryLimit: 6 storages: - - name: DW_JOB_foo_local_storage - optional: false - - name: DW_PERSISTENT_foo_persistent_storage - optional: true + - name: DW_JOB_foo_local_storage + optional: false + - name: DW_PERSISTENT_foo_persistent_storage + optional: true spec: containers: - - name: example-randomly-fail - image: alpine:latest - command: - - /bin/sh - - -c - - | - echo "starting..." - sleep 30 - x=$(($RANDOM % 2)) - echo "exiting: $x" - exit $x + - name: example-randomly-fail + image: alpine:latest + command: + - /bin/sh + - -c + - | + echo "starting..." + sleep 10 + x=$(($RANDOM % 2)) + echo "exiting: $x" + exit $x --- apiVersion: nnf.cray.hpe.com/v1alpha1 kind: NnfContainerProfile @@ -50,18 +66,18 @@ metadata: data: retryLimit: 6 storages: - - name: DW_JOB_foo_local_storage - optional: false - - name: DW_PERSISTENT_foo_persistent_storage - optional: true + - name: DW_JOB_foo_local_storage + optional: false + - name: DW_PERSISTENT_foo_persistent_storage + optional: true spec: containers: - - name: example-forever - image: alpine:latest - command: - - /bin/sh - - -c - - "while true; do date && sleep 5; done" + - name: example-forever + image: alpine:latest + command: + - /bin/sh + - -c + - "while true; do date && sleep 5; done" --- apiVersion: nnf.cray.hpe.com/v1alpha1 kind: NnfContainerProfile @@ -71,13 +87,13 @@ data: retryLimit: 6 numPorts: 1 storages: - - name: DW_JOB_foo_local_storage - optional: false - - name: DW_PERSISTENT_foo_persistent_storage - optional: true - - name: DW_GLOBAL_foo_global_lustre - optional: true - pvcMode: ReadWriteMany + - name: DW_JOB_foo_local_storage + optional: false + - name: DW_PERSISTENT_foo_persistent_storage + optional: true + - name: DW_GLOBAL_foo_global_lustre + optional: true + pvcMode: ReadWriteMany mpiSpec: runPolicy: cleanPodPolicy: Running @@ -86,36 +102,64 @@ data: template: spec: containers: - - name: example-mpi - image: nnf-mfu:latest - command: - - mpirun - - dcmp - - "$(DW_JOB_foo_local_storage)/0" - - "$(DW_JOB_foo_local_storage)/1" + - name: example-mpi + image: nnf-mfu:latest + command: + - mpirun + - dcmp + - "$(DW_JOB_foo_local_storage)/0" + - "$(DW_JOB_foo_local_storage)/1" Worker: template: spec: containers: - - name: example-mpi - image: nnf-mfu:latest + - name: example-mpi + image: nnf-mfu:latest --- apiVersion: nnf.cray.hpe.com/v1alpha1 kind: NnfContainerProfile +metadata: + name: example-mpi-fail +data: + numPorts: 1 + mpiSpec: + runPolicy: + cleanPodPolicy: Running + mpiReplicaSpecs: + Launcher: + template: + spec: + containers: + - name: example-mpi-fail + image: nnf-mfu:latest + command: + - mpirun + - /bin/sh + - -c + - "sleep 10 && exit 1" + Worker: + template: + spec: + containers: + - name: example-mpi-fail + image: nnf-mfu:latest +--- +apiVersion: nnf.cray.hpe.com/v1alpha1 +kind: NnfContainerProfile metadata: name: example-mpi-webserver data: retryLimit: 6 numPorts: 1 storages: - - name: DW_JOB_foo_local_storage - optional: false - - name: DW_PERSISTENT_foo_persistent_storage - optional: true - - name: DW_GLOBAL_foo_global_lustre - optional: true - pvcMode: ReadWriteMany + - name: DW_JOB_foo_local_storage + optional: false + - name: DW_PERSISTENT_foo_persistent_storage + optional: true + - name: DW_GLOBAL_foo_global_lustre + optional: true + pvcMode: ReadWriteMany mpiSpec: runPolicy: cleanPodPolicy: Running @@ -124,17 +168,17 @@ data: template: spec: containers: - - name: example-mpi-webserver - image: ghcr.io/nearnodeflash/nnf-container-example:latest - command: - - mpirun - - python3 - - -m - - http.server - - $(NNF_CONTAINER_PORTS) + - name: example-mpi-webserver + image: ghcr.io/nearnodeflash/nnf-container-example:latest + command: + - mpirun + - python3 + - -m + - http.server + - $(NNF_CONTAINER_PORTS) Worker: template: spec: containers: - - name: example-mpi-webserver - image: ghcr.io/nearnodeflash/nnf-container-example:latest + - name: example-mpi-webserver + image: ghcr.io/nearnodeflash/nnf-container-example:latest diff --git a/controllers/nnf_workflow_controller_helpers.go b/controllers/nnf_workflow_controller_helpers.go index 090c016d8..8f0c19042 100644 --- a/controllers/nnf_workflow_controller_helpers.go +++ b/controllers/nnf_workflow_controller_helpers.go @@ -1325,10 +1325,33 @@ func (r *NnfWorkflowReconciler) waitForContainersToStart(ctx context.Context, wo if err != nil { return nil, err } - - if profile.Data.MPISpec != nil { + isMPIJob := profile.Data.MPISpec != nil + + // Timeouts - If the containers don't start after PreRunTimeoutSeconds, we need to send an error + // up to the workflow in every one of our return cases. Each return path will check for + // timeoutElapsed and bubble up a fatal error. + // We must also set the Jobs' activeDeadline timeout so that the containers are stopped once the + // timeout is hit. This needs to be handled slightly differently depending on if the job is MPI + // or not. Once set, k8s will take care of stopping the pods for us. + timeoutElapsed := false + timeout := time.Duration(profile.Data.PreRunTimeoutSeconds) * time.Second + timeoutMessage := fmt.Sprintf("user container(s) failed to start after %d seconds", int(timeout.Seconds())) + + // Check if PreRunTimeoutSeconds has elapsed and set the flag. The logic will check once more to + // see if it started or not. If not, then the job(s) activeDeadline will be set to stop the + // jobs/pods. + if timeout > 0 && metav1.Now().Sub(workflow.Status.DesiredStateChange.Time) >= timeout { + timeoutElapsed = true + } + + if isMPIJob { mpiJob, result := r.getMPIJobConditions(ctx, workflow, index, 1) if result != nil { + // If timeout, don't allow requeue and return an error + if timeoutElapsed { + return nil, dwsv1alpha2.NewResourceError("could not retrieve MPIJobs to set timeout"). + WithUserMessage(timeoutMessage).WithFatal() + } return result, nil } @@ -1341,21 +1364,53 @@ func (r *NnfWorkflowReconciler) waitForContainersToStart(ctx context.Context, wo } } + // Jobs are not running. Check to see if timeout elapsed and have k8s stop the jobs for us. + // If no timeout, then just requeue. if !running { + if timeoutElapsed { + r.Log.Info("container prerun timeout occurred, attempting to set MPIJob activeDeadlineSeconds") + if err := r.setMPIJobTimeout(ctx, workflow, mpiJob, time.Duration(1*time.Millisecond)); err != nil { + return nil, dwsv1alpha2.NewResourceError("could not set timeout on MPIJobs"). + WithUserMessage(timeoutMessage).WithError(err).WithFatal() + } else { + return nil, dwsv1alpha2.NewResourceError("MPIJob timeout set").WithUserMessage(timeoutMessage).WithFatal() + } + } return Requeue(fmt.Sprintf("pending MPIJob start for workflow '%s', index: %d", workflow.Name, index)).after(2 * time.Second), nil } } else { jobList, err := r.getContainerJobs(ctx, workflow, index) if err != nil { + if timeoutElapsed { + return nil, dwsv1alpha2.NewResourceError("could not retrieve Jobs to set timeout"). + WithUserMessage(timeoutMessage).WithFatal().WithError(err) + } return nil, err } // Jobs may not be queryable yet, so requeue if len(jobList.Items) < 1 { + // If timeout, don't allow a requeue and return an error + if timeoutElapsed { + return nil, dwsv1alpha2.NewResourceError("no Jobs found in JobList to set timeout"). + WithUserMessage(timeoutMessage).WithFatal() + } return Requeue(fmt.Sprintf("pending job creation for workflow '%s', index: %d", workflow.Name, index)).after(2 * time.Second), nil } for _, job := range jobList.Items { + + // Attempt to set the timeout on all the Jobs in the list + if timeoutElapsed { + r.Log.Info("container prerun timeout occurred, attempting to set Job activeDeadlineSeconds") + if err := r.setJobTimeout(ctx, job, time.Duration(1*time.Millisecond)); err != nil { + return nil, dwsv1alpha2.NewResourceError("could not set timeout on MPIJobs"). + WithUserMessage(timeoutMessage).WithError(err).WithFatal() + } else { + continue + } + } + // If we have any conditions, the job already finished if len(job.Status.Conditions) > 0 { continue @@ -1366,6 +1421,11 @@ func (r *NnfWorkflowReconciler) waitForContainersToStart(ctx context.Context, wo return Requeue(fmt.Sprintf("pending container start for job '%s'", job.Name)).after(2 * time.Second), nil } } + + // Report the timeout error + if timeoutElapsed { + return nil, dwsv1alpha2.NewResourceError("job(s) timeout set").WithUserMessage(timeoutMessage).WithFatal() + } } return nil, nil @@ -1451,64 +1511,67 @@ func (r *NnfWorkflowReconciler) getMPIJobConditions(ctx context.Context, workflo return mpiJob, nil } -func (r *NnfWorkflowReconciler) waitForContainersToFinish(ctx context.Context, workflow *dwsv1alpha2.Workflow, index int) (*result, error) { - // Get profile to determine container job type (MPI or not) - profile, err := getContainerProfile(ctx, r.Client, workflow, index) - if err != nil { - return nil, err - } - timeout := time.Duration(profile.Data.PostRunTimeoutSeconds) * time.Second - - setTimeout := func(job batchv1.Job) error { - // If desired, set the ActiveDeadline on the job to kill pods. Use the job's creation - // timestamp to determine how long the job/pod has been running at this point. Then, add - // the desired timeout to that value. k8s Job's ActiveDeadLineSeconds will then - // terminate the pods once the deadline is hit. - if timeout > 0 && job.Spec.ActiveDeadlineSeconds == nil { - deadline := int64((metav1.Now().Sub(job.CreationTimestamp.Time) + timeout).Seconds()) - - // Update the job with the deadline - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - j := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: job.Name, Namespace: job.Namespace}} - if err := r.Get(ctx, client.ObjectKeyFromObject(j), j); err != nil { - return client.IgnoreNotFound(err) - } +func (r *NnfWorkflowReconciler) setJobTimeout(ctx context.Context, job batchv1.Job, timeout time.Duration) error { + // If desired, set the ActiveDeadline on the job to kill pods. Use the job's creation + // timestamp to determine how long the job/pod has been running at this point. Then, add + // the desired timeout to that value. k8s Job's ActiveDeadLineSeconds will then + // terminate the pods once the deadline is hit. + if timeout > 0 && job.Spec.ActiveDeadlineSeconds == nil { + var deadline int64 + deadline = int64((metav1.Now().Sub(job.CreationTimestamp.Time) + timeout).Seconds()) + + // Update the job with the deadline + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + j := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: job.Name, Namespace: job.Namespace}} + if err := r.Get(ctx, client.ObjectKeyFromObject(j), j); err != nil { + return client.IgnoreNotFound(err) + } - j.Spec.ActiveDeadlineSeconds = &deadline - return r.Update(ctx, j) - }) + j.Spec.ActiveDeadlineSeconds = &deadline + return r.Update(ctx, j) + }) - if err != nil { - return dwsv1alpha2.NewResourceError("error updating job '%s' activeDeadlineSeconds:", job.Name) - } + if err != nil { + return dwsv1alpha2.NewResourceError("error updating job '%s' activeDeadlineSeconds:", job.Name) } + } - return nil + return nil +} + +func (r *NnfWorkflowReconciler) setMPIJobTimeout(ctx context.Context, workflow *dwsv1alpha2.Workflow, mpiJob *mpiv2beta1.MPIJob, timeout time.Duration) error { + // Set the ActiveDeadLineSeconds on each of the k8s jobs created by MPIJob/mpi-operator. We + // need to retrieve the jobs in a different way than non-MPI jobs since the jobs are created + // by the MPIJob. + jobList, err := r.getMPIJobChildrenJobs(ctx, workflow, mpiJob) + if err != nil { + return dwsv1alpha2.NewResourceError("setMPIJobTimeout: no MPIJob JobList found for workflow '%s'", workflow.Name).WithMajor() } - setMPITimeout := func(mpiJob *mpiv2beta1.MPIJob) error { - // Set the ActiveDeadLineSeconds on each of the k8s jobs created by MPIJob/mpi-operator. We - // need to retrieve the jobs in a different way than non-MPI jobs since the jobs are created - // by the MPIJob. - jobList, err := r.getMPIJobChildrenJobs(ctx, workflow, mpiJob) - if err != nil { - return dwsv1alpha2.NewResourceError("waitForContainersToFinish: no MPIJob JobList found for workflow '%s', index: %d", workflow.Name, index).WithMajor() - } + if len(jobList.Items) < 1 { + return dwsv1alpha2.NewResourceError("setMPIJobTimeout: no MPIJob jobs found for workflow '%s'", workflow.Name).WithMajor() + } - if len(jobList.Items) < 1 { - return dwsv1alpha2.NewResourceError("waitForContainersToFinish: no MPIJob jobs found for workflow '%s', index: %d", workflow.Name, index).WithMajor() + for _, job := range jobList.Items { + if err := r.setJobTimeout(ctx, job, timeout); err != nil { + return err } + } - for _, job := range jobList.Items { - if err := setTimeout(job); err != nil { - return err - } - } + return nil +} - return nil +func (r *NnfWorkflowReconciler) waitForContainersToFinish(ctx context.Context, workflow *dwsv1alpha2.Workflow, index int) (*result, error) { + // Get profile to determine container job type (MPI or not) + profile, err := getContainerProfile(ctx, r.Client, workflow, index) + if err != nil { + return nil, err } + isMPIJob := profile.Data.MPISpec != nil + + timeout := time.Duration(profile.Data.PostRunTimeoutSeconds) * time.Second - if profile.Data.MPISpec != nil { + if isMPIJob { // We should expect at least 2 conditions: created and running mpiJob, result := r.getMPIJobConditions(ctx, workflow, index, 2) if result != nil { @@ -1525,7 +1588,7 @@ func (r *NnfWorkflowReconciler) waitForContainersToFinish(ctx context.Context, w } if !finished { - if err := setMPITimeout(mpiJob); err != nil { + if err := r.setMPIJobTimeout(ctx, workflow, mpiJob, timeout); err != nil { return nil, err } return Requeue(fmt.Sprintf("pending MPIJob completion for workflow '%s', index: %d", workflow.Name, index)).after(2 * time.Second), nil @@ -1545,7 +1608,7 @@ func (r *NnfWorkflowReconciler) waitForContainersToFinish(ctx context.Context, w for _, job := range jobList.Items { // Jobs will have conditions when finished if len(job.Status.Conditions) <= 0 { - if err := setTimeout(job); err != nil { + if err := r.setJobTimeout(ctx, job, timeout); err != nil { return nil, err } return Requeue("pending container finish").after(2 * time.Second).withObject(&job), nil @@ -1562,8 +1625,12 @@ func (r *NnfWorkflowReconciler) checkContainersResults(ctx context.Context, work if err != nil { return nil, err } + isMPIJob := profile.Data.MPISpec != nil - if profile.Data.MPISpec != nil { + timeout := time.Duration(profile.Data.PostRunTimeoutSeconds) * time.Second + timeoutMessage := fmt.Sprintf("user container(s) failed to complete after %d seconds", int(timeout.Seconds())) + + if isMPIJob { mpiJob, result := r.getMPIJobConditions(ctx, workflow, index, 2) if result != nil { return result, nil @@ -1571,7 +1638,12 @@ func (r *NnfWorkflowReconciler) checkContainersResults(ctx context.Context, work for _, c := range mpiJob.Status.Conditions { if c.Type == mpiv2beta1.JobFailed { - return nil, dwsv1alpha2.NewResourceError("container MPIJob %s (%s): %s", c.Type, c.Reason, c.Message).WithFatal() + if c.Reason == "DeadlineExceeded" { + return nil, dwsv1alpha2.NewResourceError("container MPIJob %s (%s): %s", c.Type, c.Reason, c.Message).WithFatal(). + WithUserMessage(timeoutMessage) + } + return nil, dwsv1alpha2.NewResourceError("container MPIJob %s (%s): %s", c.Type, c.Reason, c.Message).WithFatal(). + WithUserMessage("user container(s) failed to run successfully after %d attempts", profile.Data.RetryLimit+1) } } } else { @@ -1587,6 +1659,9 @@ func (r *NnfWorkflowReconciler) checkContainersResults(ctx context.Context, work for _, job := range jobList.Items { for _, condition := range job.Status.Conditions { if condition.Type != batchv1.JobComplete { + if condition.Reason == "DeadlineExceeded" { + return nil, dwsv1alpha2.NewResourceError("container job %s (%s): %s", condition.Type, condition.Reason, condition.Message).WithFatal().WithUserMessage(timeoutMessage) + } return nil, dwsv1alpha2.NewResourceError("container job %s (%s): %s", condition.Type, condition.Reason, condition.Message).WithFatal() } }