From 0d36c77b96d3273696774f88194cda99f7eac868 Mon Sep 17 00:00:00 2001 From: Jianrong Zhang Date: Fri, 8 Nov 2024 11:33:27 -0500 Subject: [PATCH] [issue-562] Serverless workflow pod gets restarted repeatedly after Knative K_SINK injection (#565) --- config/rbac/role.yaml | 8 ++ .../profiles/common/knative_eventing.go | 2 +- .../profiles/preview/states_preview.go | 86 ++++++++++++++++++- internal/controller/sonataflow_controller.go | 1 + operator.yaml | 8 ++ 5 files changed, 102 insertions(+), 3 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ecba276f6..bfeba82cc 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -32,6 +32,14 @@ rules: - list - update - watch +- apiGroups: + - serving.knative.dev + resources: + - revisions + verbs: + - delete + - list + - watch - apiGroups: - sonataflow.org resources: diff --git a/internal/controller/profiles/common/knative_eventing.go b/internal/controller/profiles/common/knative_eventing.go index 7838138dd..dcb1faad8 100644 --- a/internal/controller/profiles/common/knative_eventing.go +++ b/internal/controller/profiles/common/knative_eventing.go @@ -20,9 +20,9 @@ package common import ( "context" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "k8s.io/klog/v2" - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/log" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/internal/controller/profiles/preview/states_preview.go b/internal/controller/profiles/preview/states_preview.go index c108ee826..c5480d44d 100644 --- a/internal/controller/profiles/preview/states_preview.go +++ b/internal/controller/profiles/preview/states_preview.go @@ -22,21 +22,31 @@ package preview import ( "context" "fmt" + "sort" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/builder" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/log" kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" + "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" +) + +const ( + kSink = "K_SINK" + workflowContainer = "workflow" ) type newBuilderState struct { @@ -221,8 +231,8 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato } func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { - //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error - return nil + // Clean up the outdated Knative revisions, if any + return h.cleanupOutdatedRevisions(ctx, workflow) } // isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation @@ -233,3 +243,75 @@ func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.S } return false } + +func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error { + if !workflow.IsKnativeDeployment() { + return nil + } + avail, err := knative.GetKnativeAvailability(h.Cfg) + if err != nil { + return err + } + if !avail.Serving || !avail.Eventing { + return nil + } + injected, err := knative.CheckKSinkInjected(workflow.Name, workflow.Namespace) + if err != nil { + return err + } + if !injected { + return fmt.Errorf("waiting for Sinkbinding K_SINK injection to complete") + } + opts := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet( + map[string]string{ + workflowproj.LabelWorkflow: workflow.Name, + workflowproj.LabelWorkflowNamespace: workflow.Namespace, + }, + ), + Namespace: workflow.Namespace, + } + revisionList := &servingv1.RevisionList{} + if err := h.C.List(ctx, revisionList, opts); err != nil { + return err + } + // Sort the revisions based on creation timestamp + sortRevisions(revisionList.Items) + // Clean up previous revisions that do not have K_SINK injected + for i := 0; i < len(revisionList.Items)-1; i++ { + revision := &revisionList.Items[i] + if !containsKSink(revision) { + klog.V(log.I).InfoS("Revision %s does not have K_SINK injected and can be cleaned up.", revision.Name) + if err := h.C.Delete(ctx, revision, &client.DeleteOptions{}); err != nil { + return err + } + } + } + return nil +} + +func containsKSink(revision *servingv1.Revision) bool { + for _, container := range revision.Spec.PodSpec.Containers { + if container.Name == workflowContainer { + for _, env := range container.Env { + if env.Name == kSink { + return true + } + } + break + } + } + return false +} + +type CreationTimestamp []servingv1.Revision + +func (a CreationTimestamp) Len() int { return len(a) } +func (a CreationTimestamp) Less(i, j int) bool { + return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp) +} +func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func sortRevisions(revisions []servingv1.Revision) { + sort.Sort(CreationTimestamp(revisions)) +} diff --git a/internal/controller/sonataflow_controller.go b/internal/controller/sonataflow_controller.go index 7724a6378..2fbc8b335 100644 --- a/internal/controller/sonataflow_controller.go +++ b/internal/controller/sonataflow_controller.go @@ -71,6 +71,7 @@ type SonataFlowReconciler struct { //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update //+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. diff --git a/operator.yaml b/operator.yaml index 977f7904c..b45f2fe7e 100644 --- a/operator.yaml +++ b/operator.yaml @@ -27746,6 +27746,14 @@ rules: - list - update - watch +- apiGroups: + - serving.knative.dev + resources: + - revisions + verbs: + - delete + - list + - watch - apiGroups: - sonataflow.org resources: