Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prevent Tasks in terminal state from leaking PVCs #93

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ file in the Carbyne Stack

> **NOTE**: Please keep the following list of contributors sorted.

### Resolve.tech

- Adrián Vaca Humanes [[email protected]](mailto:[email protected])

### Robert Bosch GmbH

- Becker Sebastian
Expand Down
61 changes: 54 additions & 7 deletions klyshko-operator/controllers/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 - for information on the respective copyright owner
Copyright (c) 2022-2024 - for information on the respective copyright owner
see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko.

SPDX-License-Identifier: Apache-2.0
Expand All @@ -10,14 +10,20 @@ package controllers
import (
"context"
"fmt"
"io"
"math"
"path/filepath"
"strconv"
"strings"
"time"

klyshkov1alpha1 "github.com/carbynestack/klyshko/api/v1alpha1"
"github.com/carbynestack/klyshko/castor"
"github.com/google/uuid"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
clientv3 "go.etcd.io/etcd/client/v3"
"io"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -26,16 +32,11 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
"math"
"path/filepath"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"strconv"
"strings"
"time"
)

const (
Expand Down Expand Up @@ -343,6 +344,7 @@ var _ = Describe("Generating tuples", func() {
jobs []klyshkov1alpha1.TupleGenerationJob
localTasksByVCP []klyshkov1alpha1.TupleGenerationTask
generatorPodsByVCP []v1.Pod
taskPVCsByVCP []v1.PersistentVolumeClaim
)

BeforeEach(func() {
Expand Down Expand Up @@ -374,6 +376,7 @@ var _ = Describe("Generating tuples", func() {
localTasksByVCP = ensureTasksCreatedOnEachVcp(ctx, vc, scheduler, jobs, klyshkov1alpha1.TaskGenerating)

generatorPodsByVCP = ensureGeneratorPodsCreatedOnEachVcp(ctx, vc, localTasksByVCP)
taskPVCsByVCP = ensureTaskPVCsCreatedOnEachVcp(ctx, vc, localTasksByVCP)
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobRunning)
})

Expand All @@ -392,6 +395,17 @@ var _ = Describe("Generating tuples", func() {
Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed())
}
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed)

for i := 0; i < NumberOfVCPs; i++ {
key := client.ObjectKey{
Namespace: taskPVCsByVCP[i].GetNamespace(),
Name: taskPVCsByVCP[i].GetName(),
}
Eventually(func() bool {
return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i]))
}, Timeout, PollingInterval).Should(BeTrue())
}

})
})

Expand All @@ -411,6 +425,16 @@ var _ = Describe("Generating tuples", func() {
Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed())
}
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed)

for i := 0; i < NumberOfVCPs; i++ {
key := client.ObjectKey{
Namespace: taskPVCsByVCP[i].GetNamespace(),
Name: taskPVCsByVCP[i].GetName(),
}
Eventually(func() bool {
return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i]))
}, Timeout, PollingInterval).Should(BeTrue())
}
})
})

Expand Down Expand Up @@ -519,6 +543,29 @@ func ensurePodsCreatedOnEachVcp(ctx context.Context, vc *vc, name func(int) type
return pods
}

// Ensures the PVCs associated to a task have been created, with the main purpose of checking for their
// non-existence after cleanup
func ensureTaskPVCsCreatedOnEachVcp(ctx context.Context, vc *vc, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.PersistentVolumeClaim {
pvcs := make([]v1.PersistentVolumeClaim, NumberOfVCPs)
for i := 0; i < NumberOfVCPs; i++ {
taskKey, _ := taskKeyFromName(localTasks[i].Namespace, localTasks[i].Name)
pvc := &v1.PersistentVolumeClaim{}
name := types.NamespacedName{
Name: pvcName(*taskKey),
Namespace: taskKey.Namespace,
}
Eventually(func() bool {
err := vc.vcps[i].k8sClient.Get(ctx, name, pvc)
if err != nil {
return false
}
return true
}, Timeout, PollingInterval).Should(BeTrue())
pvcs[i] = *pvc
}
return pvcs
}

// Ensures that provisioner pods associated with the respective tasks eventually become available for the given job in
// each VCP of the given VC. In addition, it is checked that the pod is owned by the respective task.
func ensureProvisionerPodsCreatedOnEachVcp(ctx context.Context, vc *vc, jobs []klyshkov1alpha1.TupleGenerationJob, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.Pod {
Expand Down
40 changes: 33 additions & 7 deletions klyshko-operator/controllers/tuplegenerationtask_controller.go
strieflin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 - for information on the respective copyright owner
Copyright (c) 2022-2024 - for information on the respective copyright owner
see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko.

SPDX-License-Identifier: Apache-2.0
Expand All @@ -11,6 +11,9 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/carbynestack/klyshko/logging"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
Expand All @@ -19,8 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl.
logger := log.FromContext(ctx).WithValues("Task.Name", req.Name)
logger.V(logging.DEBUG).Info("Reconciling tuple generation task")

taskKey, err := r.taskKeyFromName(req.Namespace, req.Name)
taskKey, err := taskKeyFromName(req.Namespace, req.Name)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get key for task %v: %w", req.Name, err)
}
Expand Down Expand Up @@ -233,9 +234,12 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl.
Requeue: true,
}, r.setState(ctx, *taskKey, status, klyshkov1alpha1.TaskFailed)
}
case klyshkov1alpha1.TaskFailed, klyshkov1alpha1.TaskCompleted:
sbckr marked this conversation as resolved.
Show resolved Hide resolved
logger.V(logging.DEBUG).Info("Task reached a terminal state")
return ctrl.Result{}, r.deletePVC(ctx, taskKey)
default:
return ctrl.Result{}, fmt.Errorf("unexpected state for Task %v, PVC not reclaimed", req.Name)
}

logger.V(logging.DEBUG).Info("Desired state reached")
return ctrl.Result{}, nil
}

Expand All @@ -250,7 +254,7 @@ func (r *TupleGenerationTaskReconciler) SetupWithManager(mgr ctrl.Manager) error

// taskKeyFromName creates a RosterEntryKey from the given name and namespace. Expects that the zero-based VCP
// identifier is appended with a hyphen to the name.
func (r *TupleGenerationTaskReconciler) taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) {
func taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) {
strieflin marked this conversation as resolved.
Show resolved Hide resolved
parts := strings.Split(name, "-")
vcpID := parts[len(parts)-1]
jobName := strings.Join(parts[:len(parts)-1], "-")
Expand Down Expand Up @@ -366,6 +370,28 @@ func (r *TupleGenerationTaskReconciler) getOrCreatePVC(ctx context.Context, key
return pvc, nil
}

// deletePVC deletes a PVC associated to a given task
func (r *TupleGenerationTaskReconciler) deletePVC(ctx context.Context, key *RosterEntryKey) error {
logger := log.FromContext(ctx).WithValues("Task.Key", key)
name := types.NamespacedName{
Name: pvcName(*key),
Namespace: key.Namespace,
}
found := &v1.PersistentVolumeClaim{}
err := r.Get(ctx, name, found)
if err != nil {
return fmt.Errorf("to be deleted persistent volume claim not found for task %v: %w", key, err)
}

err = r.Delete(ctx, found)
if err != nil {
return fmt.Errorf("persistent volume claim deletion failed for task %v: %w", key, err)
}

logger.V(logging.DEBUG).Info("Deleted Persistent Volume Claim for task")
return nil
}

// provisionerPodName returns the name for the provisioner pod used for the task with the given key.
func (r *TupleGenerationTaskReconciler) provisionerPodName(key RosterEntryKey) string {
return key.Name + "-provisioner"
Expand Down
Loading