diff --git a/NOTICE.md b/NOTICE.md index 96ee6bc..c38914f 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -9,6 +9,10 @@ file in the Carbyne Stack > **NOTE**: Please keep the following list of contributors sorted. +### Resolve.tech + +- Adrián Vaca Humanes [adrian.humanes@vml.com](mailto:adrian.humanes@vml.com) + ### Robert Bosch GmbH - Becker Sebastian diff --git a/klyshko-operator/controllers/controller_test.go b/klyshko-operator/controllers/controller_test.go index e129513..a5f0464 100644 --- a/klyshko-operator/controllers/controller_test.go +++ b/klyshko-operator/controllers/controller_test.go @@ -1,5 +1,5 @@ /* -Copyright (c) 2022-2023 - for information on the respective copyright owner +Copyright (c) 2022-2024 - for information on the respective copyright owner see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko. SPDX-License-Identifier: Apache-2.0 @@ -10,6 +10,13 @@ package controllers import ( "context" "fmt" + "io" + "math" + "path/filepath" + "strconv" + "strings" + "time" + klyshkov1alpha1 "github.com/carbynestack/klyshko/api/v1alpha1" "github.com/carbynestack/klyshko/castor" "github.com/google/uuid" @@ -17,7 +24,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" clientv3 "go.etcd.io/etcd/client/v3" - "io" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -26,16 +32,11 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/utils/pointer" - "math" - "path/filepath" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" - "strconv" - "strings" - "time" ) const ( @@ -343,6 +344,7 @@ var _ = Describe("Generating tuples", func() { jobs []klyshkov1alpha1.TupleGenerationJob localTasksByVCP []klyshkov1alpha1.TupleGenerationTask generatorPodsByVCP []v1.Pod + taskPVCsByVCP []v1.PersistentVolumeClaim ) BeforeEach(func() { @@ -374,6 +376,7 @@ var _ = Describe("Generating tuples", func() { localTasksByVCP = ensureTasksCreatedOnEachVcp(ctx, vc, scheduler, jobs, klyshkov1alpha1.TaskGenerating) generatorPodsByVCP = ensureGeneratorPodsCreatedOnEachVcp(ctx, vc, localTasksByVCP) + taskPVCsByVCP = ensureTaskPVCsCreatedOnEachVcp(ctx, vc, localTasksByVCP) ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobRunning) }) @@ -392,6 +395,17 @@ var _ = Describe("Generating tuples", func() { Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed()) } ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed) + + for i := 0; i < NumberOfVCPs; i++ { + key := client.ObjectKey{ + Namespace: taskPVCsByVCP[i].GetNamespace(), + Name: taskPVCsByVCP[i].GetName(), + } + Eventually(func() bool { + return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i])) + }, Timeout, PollingInterval).Should(BeTrue()) + } + }) }) @@ -411,6 +425,16 @@ var _ = Describe("Generating tuples", func() { Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed()) } ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed) + + for i := 0; i < NumberOfVCPs; i++ { + key := client.ObjectKey{ + Namespace: taskPVCsByVCP[i].GetNamespace(), + Name: taskPVCsByVCP[i].GetName(), + } + Eventually(func() bool { + return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i])) + }, Timeout, PollingInterval).Should(BeTrue()) + } }) }) @@ -519,6 +543,29 @@ func ensurePodsCreatedOnEachVcp(ctx context.Context, vc *vc, name func(int) type return pods } +// Ensures the PVCs associated to a task have been created, with the main purpose of checking for their +// non-existence after cleanup +func ensureTaskPVCsCreatedOnEachVcp(ctx context.Context, vc *vc, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.PersistentVolumeClaim { + pvcs := make([]v1.PersistentVolumeClaim, NumberOfVCPs) + for i := 0; i < NumberOfVCPs; i++ { + taskKey, _ := taskKeyFromName(localTasks[i].Namespace, localTasks[i].Name) + pvc := &v1.PersistentVolumeClaim{} + name := types.NamespacedName{ + Name: pvcName(*taskKey), + Namespace: taskKey.Namespace, + } + Eventually(func() bool { + err := vc.vcps[i].k8sClient.Get(ctx, name, pvc) + if err != nil { + return false + } + return true + }, Timeout, PollingInterval).Should(BeTrue()) + pvcs[i] = *pvc + } + return pvcs +} + // Ensures that provisioner pods associated with the respective tasks eventually become available for the given job in // each VCP of the given VC. In addition, it is checked that the pod is owned by the respective task. func ensureProvisionerPodsCreatedOnEachVcp(ctx context.Context, vc *vc, jobs []klyshkov1alpha1.TupleGenerationJob, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.Pod { diff --git a/klyshko-operator/controllers/tuplegenerationtask_controller.go b/klyshko-operator/controllers/tuplegenerationtask_controller.go index b419215..32b12c5 100644 --- a/klyshko-operator/controllers/tuplegenerationtask_controller.go +++ b/klyshko-operator/controllers/tuplegenerationtask_controller.go @@ -1,5 +1,5 @@ /* -Copyright (c) 2022-2023 - for information on the respective copyright owner +Copyright (c) 2022-2024 - for information on the respective copyright owner see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko. SPDX-License-Identifier: Apache-2.0 @@ -11,6 +11,9 @@ import ( "context" "encoding/json" "fmt" + "strconv" + "strings" + "github.com/carbynestack/klyshko/logging" clientv3 "go.etcd.io/etcd/client/v3" v1 "k8s.io/api/core/v1" @@ -19,8 +22,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "strconv" - "strings" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -59,7 +60,7 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl. logger := log.FromContext(ctx).WithValues("Task.Name", req.Name) logger.V(logging.DEBUG).Info("Reconciling tuple generation task") - taskKey, err := r.taskKeyFromName(req.Namespace, req.Name) + taskKey, err := taskKeyFromName(req.Namespace, req.Name) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to get key for task %v: %w", req.Name, err) } @@ -233,9 +234,12 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl. Requeue: true, }, r.setState(ctx, *taskKey, status, klyshkov1alpha1.TaskFailed) } + case klyshkov1alpha1.TaskFailed, klyshkov1alpha1.TaskCompleted: + logger.V(logging.DEBUG).Info("Task reached a terminal state") + return ctrl.Result{}, r.deletePVC(ctx, taskKey) + default: + return ctrl.Result{}, fmt.Errorf("unexpected state for Task %v, PVC not reclaimed", req.Name) } - - logger.V(logging.DEBUG).Info("Desired state reached") return ctrl.Result{}, nil } @@ -250,7 +254,7 @@ func (r *TupleGenerationTaskReconciler) SetupWithManager(mgr ctrl.Manager) error // taskKeyFromName creates a RosterEntryKey from the given name and namespace. Expects that the zero-based VCP // identifier is appended with a hyphen to the name. -func (r *TupleGenerationTaskReconciler) taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) { +func taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) { parts := strings.Split(name, "-") vcpID := parts[len(parts)-1] jobName := strings.Join(parts[:len(parts)-1], "-") @@ -366,6 +370,28 @@ func (r *TupleGenerationTaskReconciler) getOrCreatePVC(ctx context.Context, key return pvc, nil } +// deletePVC deletes a PVC associated to a given task +func (r *TupleGenerationTaskReconciler) deletePVC(ctx context.Context, key *RosterEntryKey) error { + logger := log.FromContext(ctx).WithValues("Task.Key", key) + name := types.NamespacedName{ + Name: pvcName(*key), + Namespace: key.Namespace, + } + found := &v1.PersistentVolumeClaim{} + err := r.Get(ctx, name, found) + if err != nil { + return fmt.Errorf("to be deleted persistent volume claim not found for task %v: %w", key, err) + } + + err = r.Delete(ctx, found) + if err != nil { + return fmt.Errorf("persistent volume claim deletion failed for task %v: %w", key, err) + } + + logger.V(logging.DEBUG).Info("Deleted Persistent Volume Claim for task") + return nil +} + // provisionerPodName returns the name for the provisioner pod used for the task with the given key. func (r *TupleGenerationTaskReconciler) provisionerPodName(key RosterEntryKey) string { return key.Name + "-provisioner"