Skip to content

Commit

Permalink
implemented collision handling using ownerReferences
Browse files Browse the repository at this point in the history
Signed-off-by: Adem Baccara <[email protected]>
  • Loading branch information
Adembc committed Jun 29, 2024
1 parent 1591210 commit 207d950
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ metadata:
namespace: default
spec:
kind: "jupyter-lab"
paused: false
podTemplate:
options:
imageConfig: "jupyter_scipy_171"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ spec:
podTemplate:
httpProxy:
removePathPrefix: false
requestHeaders:
- append: {}
remove:
- ""
set: {}
requestHeaders: {}
options:
imageConfig:
default: "jupyter_scipy_171"
Expand Down
194 changes: 162 additions & 32 deletions workspaces/controller/internal/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@ import (
"context"
"fmt"
kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1"
"github.com/kubeflow/notebooks/workspaces/controller/internal/helper"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

const DefaultContainerPort = 8888
const (
DefaultContainerPort = 8888
RetryInterval = 200 * time.Millisecond
RetryAttempts = 3
)

var (
workspaceOwnerKey = ".metadata.controller"
apiGVStr = kubefloworgv1beta1.GroupVersion.String()
)

// WorkspaceReconciler reconciles a Workspace object
type WorkspaceReconciler struct {
Expand Down Expand Up @@ -65,63 +77,181 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
workspaceKindName := workspace.Spec.Kind
workspaceKind := &kubefloworgv1beta1.WorkspaceKind{}
if err := r.Get(ctx, client.ObjectKey{Name: workspaceKindName}, workspaceKind); err != nil {

logger.Error(err, "unable to fetch Workspace Kind")
if client.IgnoreNotFound(err) == nil {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if err := ctrl.SetControllerReference(workspaceKind, workspace, r.Scheme); err != nil {
logger.Error(err, "unable to set controller reference to workspace")
return ctrl.Result{}, err
}

ss := generateStatefulSet(workspace, workspaceKind)
if err := ctrl.SetControllerReference(workspace, ss, r.Scheme); err != nil {
logger.Error(err, "unable to set controller reference to StatefulSet")
return ctrl.Result{}, err
}

var statefulSets appsv1.StatefulSetList
if err := r.List(ctx, &statefulSets, client.InNamespace(req.Namespace), client.MatchingFields{workspaceOwnerKey: req.Name}); err != nil {
logger.Error(err, "unable to list child StatefulSets")
return ctrl.Result{}, err
}

foundStatefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, foundStatefulSet)
if err != nil {
if client.IgnoreNotFound(err) == nil {
logger.Info("Creating StatefulSet")
err := r.Client.Create(ctx, ss)
if err != nil {
logger.Error(err, "unable to create StatefulSet")
return ctrl.Result{}, err
justCreated := false
if len(statefulSets.Items) > 1 {
logger.Info("Found multiple StatefulSets")
workspace.Status.State = kubefloworgv1beta1.WorkspaceStateError
if err := r.Status().Update(ctx, workspace); err != nil {
logger.Error(err, "unable to update Workspace status")
return ctrl.Result{}, err
}
logger.Info("Workspace status updated", "state", workspace.Status.State)
return ctrl.Result{}, nil
}
if len(statefulSets.Items) == 1 {
foundStatefulSet = &statefulSets.Items[0]
} else {
logger.Info("Creating StatefulSet")
err := r.Client.Create(ctx, ss)
if err != nil {
if errors.IsAlreadyExists(err) {
logger.Error(err, "statefulset already exists with this name , retrying")
err := helper.Retry(RetryAttempts, RetryInterval, func() error {
err := r.Client.Create(ctx, ss)
if err != nil {
if errors.IsAlreadyExists(err) {
logger.Error(err, "statefulset already exists with this name , retrying ...")
return err
}
return &helper.StopRetry{Err: err}
}
return nil
})
if err != nil {
logger.Error(err, "unable to create StatefulSet after retrying")
return ctrl.Result{}, err
}
}
} else {
logger.Error(err, "unable to fetch StatefulSet")
logger.Error(err, "unable to create StatefulSet")
return ctrl.Result{}, err
}
justCreated = true
logger.Info("StatefulSet created")
}

if !justCreated && helper.CopyStatefulSetFields(ss, foundStatefulSet) {
logger.Info("Updating StatefulSet")
err := r.Client.Update(ctx, foundStatefulSet)
if err != nil {
logger.Error(err, "unable to update StatefulSet")
return ctrl.Result{}, err
}
logger.Info("StatefulSet updated")
}
//TODO: sync foundStatefulSet with ss

svc := generateService(workspace)
if err := ctrl.SetControllerReference(workspace, svc, r.Scheme); err != nil {
logger.Error(err, "unable to set controller reference to Service")
logger.Error(err, "unable to set controller reference to service")
return ctrl.Result{}, err
}

var services corev1.ServiceList
if err := r.List(ctx, &services, client.InNamespace(req.Namespace), client.MatchingFields{workspaceOwnerKey: req.Name}); err != nil {
logger.Error(err, "unable to list child Services")
return ctrl.Result{}, err
}
justCreated = false
foundService := &corev1.Service{}
err = r.Get(ctx, client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, foundService)
if err != nil {
if client.IgnoreNotFound(err) == nil {
logger.Info("Creating Service")
err := r.Client.Create(ctx, svc)
if err != nil {
logger.Error(err, "unable to create service")
return ctrl.Result{}, err
if len(services.Items) > 1 {
logger.Info("Found multiple Services")
workspace.Status.State = kubefloworgv1beta1.WorkspaceStateError
if err := r.Status().Update(ctx, workspace); err != nil {
logger.Error(err, "unable to update Workspace status")
return ctrl.Result{}, err
}
logger.Info("Workspace status updated", "state", workspace.Status.State)
return ctrl.Result{}, nil
}
if len(services.Items) == 1 {
foundService = &services.Items[0]
} else {
logger.Info("Creating Service")
err := r.Client.Create(ctx, svc)
if err != nil {
if errors.IsAlreadyExists(err) {
logger.Error(err, "service already exists with this name , retrying ...")
err := helper.Retry(RetryAttempts, RetryInterval, func() error {
err := r.Client.Create(ctx, svc)
if err != nil {
if errors.IsAlreadyExists(err) {
logger.Error(err, "service already exists with this name , retrying ...")
return err
}
return &helper.StopRetry{Err: err}
}
return nil
})
if err != nil {
logger.Error(err, "unable to create service after retrying")
return ctrl.Result{}, err
}
}
} else {
logger.Error(err, "unable to fetch Service")
logger.Error(err, "unable to create service")
return ctrl.Result{}, err
}
justCreated = true
logger.Info("Service created")
}
//TODO: sync foundService with svc
if !justCreated && helper.CopyServiceFields(svc, foundService) {
logger.Info("Updating Service")
err := r.Client.Update(ctx, foundService)
if err != nil {
logger.Error(err, "unable to update Service")
return ctrl.Result{}, err
}
logger.Info("Service updated")
}

logger.Info("Finish Reconciling Workspace")
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Index StatefulSet by owner
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.StatefulSet{}, workspaceOwnerKey, func(rawObj client.Object) []string {
statefulSet := rawObj.(*appsv1.StatefulSet)
owner := metav1.GetControllerOf(statefulSet)
if owner == nil {
return nil
}
if owner.APIVersion != apiGVStr || owner.Kind != "Workspace" {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}
// Index Service by owner
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, workspaceOwnerKey, func(rawObj client.Object) []string {
service := rawObj.(*corev1.Service)
owner := metav1.GetControllerOf(service)
if owner == nil {
return nil
}
if owner.APIVersion != apiGVStr || owner.Kind != "Workspace" {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&kubefloworgv1beta1.Workspace{}).
For(&kubefloworgv1beta1.Workspace{}).Owns(&appsv1.StatefulSet{}).Owns(&corev1.Service{}).
Complete(r)
}

Expand All @@ -138,10 +268,10 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
replicas := int32(1)
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: workspace.Name,
Namespace: workspace.Namespace,
Labels: workspaceKind.Spec.PodTemplate.PodMetadata.Labels,
Annotations: workspaceKind.Spec.PodTemplate.PodMetadata.Annotations,
GenerateName: fmt.Sprintf("ws-%s-", workspace.Name),
Namespace: workspace.Namespace,
Labels: workspaceKind.Spec.PodTemplate.PodMetadata.Labels,
Annotations: workspaceKind.Spec.PodTemplate.PodMetadata.Annotations,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Expand Down Expand Up @@ -180,8 +310,8 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
func generateService(workspace *kubefloworgv1beta1.Workspace) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: workspace.Name,
Namespace: workspace.Namespace,
GenerateName: fmt.Sprintf("ws-%s-", workspace.Name),
Namespace: workspace.Namespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"statefulset": workspace.Name},
Expand Down
95 changes: 95 additions & 0 deletions workspaces/controller/internal/helper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package helper

import (
"errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"reflect"
"time"
)

func CopyStatefulSetFields(from, to *appsv1.StatefulSet) bool {
requireUpdate := false
for k, v := range to.Labels {
if from.Labels[k] != v {
requireUpdate = true
}
}
to.Labels = from.Labels

for k, v := range to.Annotations {
if from.Annotations[k] != v {
requireUpdate = true
}
}
to.Annotations = from.Annotations

if *from.Spec.Replicas != *to.Spec.Replicas {
*to.Spec.Replicas = *from.Spec.Replicas
requireUpdate = true
}

if !reflect.DeepEqual(to.Spec.Template.Spec, from.Spec.Template.Spec) {
requireUpdate = true
}
to.Spec.Template.Spec = from.Spec.Template.Spec

return requireUpdate
}

// CopyServiceFields copies the owned fields from one Service to another
func CopyServiceFields(from, to *corev1.Service) bool {
requireUpdate := false
for k, v := range to.Labels {
if from.Labels[k] != v {
requireUpdate = true
}
}
to.Labels = from.Labels

for k, v := range to.Annotations {
if from.Annotations[k] != v {
requireUpdate = true
}
}
to.Annotations = from.Annotations

// Don't copy the entire Spec, because we can't overwrite the clusterIp field

if !reflect.DeepEqual(to.Spec.Selector, from.Spec.Selector) {
requireUpdate = true
}
to.Spec.Selector = from.Spec.Selector

if !reflect.DeepEqual(to.Spec.Ports, from.Spec.Ports) {
requireUpdate = true
}
to.Spec.Ports = from.Spec.Ports

return requireUpdate
}

// StopRetry is returned to tell the Retry function to stop retrying.
type StopRetry struct{ Err error }

// Error implements the error interface
func (s *StopRetry) Error() string { return s.Err.Error() }

// Retry will retry the given function until either the maximum attempts is reached or
// a stop error is returned.
func Retry(attempts int, sleep time.Duration, f func() error) error {
if err := f(); err != nil {
var stop *StopRetry
if errors.As(err, &stop) {
return stop.Err
}
// user can pass -1 to retry indefinitely
if attempts--; attempts != 0 {
time.Sleep(sleep)
return Retry(attempts, 2*sleep, f)
}
return err
}

return nil
}

0 comments on commit 207d950

Please sign in to comment.