From 588080fb3722d8db5b0c862311be0f013f44ec09 Mon Sep 17 00:00:00 2001 From: Jianrong Zhang Date: Thu, 31 Oct 2024 13:53:25 -0400 Subject: [PATCH] [issue-547] Allow workflow deployment if Knative broker is not defined in the SonataFlow or SonataFlowPlatform CRs (#553) --- .../profiles/common/object_creators.go | 17 +--- .../profiles/common/object_creators_test.go | 6 +- .../profiles/preview/deployment_handler.go | 33 ------- test/e2e/clusterplatform_test.go | 3 +- .../02-sonataflow_platform.yaml | 31 +++++++ .../kustomization.yaml | 27 ++++++ ...3-sonataflow_callbackstatetimeouts.sw.yaml | 88 +++++++++++++++++++ .../kustomization.yaml | 1 - 8 files changed, 153 insertions(+), 53 deletions(-) create mode 100644 test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/02-sonataflow_platform.yaml create mode 100644 test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/kustomization.yaml create mode 100644 test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml diff --git a/internal/controller/profiles/common/object_creators.go b/internal/controller/profiles/common/object_creators.go index 240c8aefd..1a1bbec73 100644 --- a/internal/controller/profiles/common/object_creators.go +++ b/internal/controller/profiles/common/object_creators.go @@ -26,7 +26,6 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/workflowdef" servingv1 "knative.dev/serving/pkg/apis/serving/v1" cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" @@ -47,7 +46,6 @@ import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/persistence" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/properties" @@ -285,14 +283,7 @@ func SinkBindingCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.Sonat if err != nil { return nil, err } - dataIndexEnabled := services.IsDataIndexEnabled(plf) - jobServiceEnabled := services.IsJobServiceEnabled(plf) - - // skip if no produced event is found and there is no DataIndex/JobService enabled if sink == nil { - if dataIndexEnabled || jobServiceEnabled || workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { - return nil, fmt.Errorf("a sink in the SonataFlow %s or broker in the SonataFlowPlatform %s should be configured when DataIndex or JobService is enabled", workflow.Name, plf.Name) - } return nil, nil /*nothing to do*/ } @@ -389,11 +380,9 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl if err != nil { return nil, err } - if brokerRef == nil { - return nil, fmt.Errorf("no broker configured for eventType %s in SonataFlow %s", event.Type, workflow.Name) - } - if !knative.IsKnativeBroker(brokerRef) { - return nil, fmt.Errorf("no valid broker configured for eventType %s in SonataFlow %s", event.Type, workflow.Name) + if brokerRef == nil || !knative.IsKnativeBroker(brokerRef) { + // No broker configured for the eventType. Skip and will not create trigger for it. + continue } if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil { return nil, err diff --git a/internal/controller/profiles/common/object_creators_test.go b/internal/controller/profiles/common/object_creators_test.go index 9e2b5274d..e2d86d7d0 100644 --- a/internal/controller/profiles/common/object_creators_test.go +++ b/internal/controller/profiles/common/object_creators_test.go @@ -246,8 +246,7 @@ func TestEnsureWorkflowSinkBindingWithoutBrokerAreNotCreated(t *testing.T) { plf := test.GetBasePlatformWithBroker() plf.Spec.Eventing = nil // No broker configured in the platform, but data index and jobs service are enabled sinkBinding, err := SinkBindingCreator(workflow, plf) - assert.Error(t, err) - assert.Contains(t, err.Error(), "a sink in the SonataFlow vet or broker in the SonataFlowPlatform sonataflow-platform should be configured when DataIndex or JobService is enabled") + assert.NoError(t, err) assert.Nil(t, sinkBinding) } @@ -372,8 +371,7 @@ func TestEnsureWorkflowTriggersWithoutBrokerAreNotCreated(t *testing.T) { plf := test.GetBasePlatform() triggers, err := TriggersCreator(workflow, plf) - assert.Error(t, err) - assert.Contains(t, err.Error(), "no broker configured for eventType events.vet.appointments in SonataFlow vet") + assert.NoError(t, err) assert.Nil(t, triggers) } diff --git a/internal/controller/profiles/preview/deployment_handler.go b/internal/controller/profiles/preview/deployment_handler.go index 17cc49ed0..c6a8e2864 100644 --- a/internal/controller/profiles/preview/deployment_handler.go +++ b/internal/controller/profiles/preview/deployment_handler.go @@ -19,7 +19,6 @@ package preview import ( "context" - "fmt" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" v1 "k8s.io/api/core/v1" @@ -31,7 +30,6 @@ import ( operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" @@ -59,11 +57,6 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx context.Context, workflow return reconcile.Result{Requeue: false}, nil, err } - // Checks if the workflow has sink configured. - if requires, err := d.ensureKnativeSinkConfigured(workflow); requires || err != nil { - return reconcile.Result{Requeue: false}, nil, err - } - // Ensure objects result, objs, err := d.ensureObjects(ctx, workflow, image) if err != nil || result.Requeue { @@ -100,32 +93,6 @@ func (d *DeploymentReconciler) ensureKnativeServingRequired(workflow *operatorap return false, nil } -// if Knative Eventing is available, the workflow should have a sink configured, or the platform should have a broker defined -func (d *DeploymentReconciler) ensureKnativeSinkConfigured(workflow *operatorapi.SonataFlow) (bool, error) { - avail, err := knative.GetKnativeAvailability(d.Cfg) - if err != nil { - return true, err - } - if !avail.Eventing { - return false, nil - } - platform, err := platform.GetActivePlatform(context.TODO(), d.C, workflow.Namespace) - if err != nil { - return true, err - } - sink, err := knative.GetWorkflowSink(workflow, platform) - if err != nil { - return true, err - } - if sink == nil && (services.IsDataIndexEnabled(platform) || services.IsJobServiceEnabled(platform)) { - d.Recorder.Eventf(workflow, v1.EventTypeWarning, - "KnativeSinkNotConfigured", - "Failed to deploy workflow. No sink configured in the workflow or the platform when Job Service or Data Index Service is enabled.") - return true, fmt.Errorf("no sink configured in the workflow or the platform when Job Service or Data Index Service is enabled") - } - return false, nil -} - func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) { pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace) userPropsCM, _, err := d.ensurers.userPropsConfigMap.Ensure(ctx, workflow) diff --git a/test/e2e/clusterplatform_test.go b/test/e2e/clusterplatform_test.go index 9f8fa1696..c6e4f5584 100644 --- a/test/e2e/clusterplatform_test.go +++ b/test/e2e/clusterplatform_test.go @@ -230,7 +230,8 @@ var _ = Describe("Cluster Platform Use Cases :: ", Label("cluster"), Ordered, fu Expect(err).NotTo(HaveOccurred()) }, Entry("without services configured", test.GetPathFromE2EDirectory("platform", "noservices"), metadata.GitOpsProfile.String(), ephemeral, false), - Entry("with services configured", test.GetPathFromE2EDirectory("platform", "services"), metadata.GitOpsProfile.String(), "ephemeral-with-workflow", true), + Entry("with services configured and platform broker", test.GetPathFromE2EDirectory("platform", "services"), metadata.GitOpsProfile.String(), "ephemeral-with-workflow", true), + Entry("with services configured and no broker", test.GetPathFromE2EDirectory("platform", "services"), metadata.GitOpsProfile.String(), "ephemeral-with-workflow-no-broker", true), ) DescribeTable("against a platform in a separate namespace", func(testcaseDir string, profile string, persistenceType string) { diff --git a/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/02-sonataflow_platform.yaml b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/02-sonataflow_platform.yaml new file mode 100644 index 000000000..d6af1ae90 --- /dev/null +++ b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/02-sonataflow_platform.yaml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + jobService: + enabled: true diff --git a/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/kustomization.yaml b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/kustomization.yaml new file mode 100644 index 000000000..a95dc70d7 --- /dev/null +++ b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/kustomization.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- 02-sonataflow_platform.yaml +- sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml + +sortOptions: + order: fifo + diff --git a/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml new file mode 100644 index 000000000..a11207c5f --- /dev/null +++ b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow-no-broker/sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlow +metadata: + name: callbackstatetimeouts + annotations: + sonataflow.org/description: Callback State Timeouts Example k8s + sonataflow.org/version: 0.0.1 + sonataflow.org/profile: gitops +spec: + podTemplate: + replicas: 0 + container: + image: replaceme + flow: + start: PrintStartMessage + events: + - name: callbackEvent + source: '' + type: callback_event_type + functions: + - name: systemOut + type: custom + operation: sysout + states: + - name: PrintStartMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has started.\"}" + transition: CallbackState + - name: CallbackState + type: callback + action: + name: callbackAction + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}" + eventRef: callbackEvent + transition: CheckEventArrival + timeouts: + eventTimeout: PT30S + - name: CheckEventArrival + type: switch + dataConditions: + - condition: "${ .eventData != null }" + transition: EventArrived + defaultCondition: + transition: EventNotArrived + - name: EventArrived + type: inject + data: + exitMessage: "The callback event has arrived." + transition: PrintExitMessage + - name: EventNotArrived + type: inject + data: + exitMessage: "The callback event has not arrived, and the timeout has overdue." + transition: PrintExitMessage + - name: PrintExitMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has finalized. \" + .exitMessage + \" eventData: \" + .eventData}" + end: true diff --git a/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow/kustomization.yaml b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow/kustomization.yaml index bf215bdd6..a95dc70d7 100644 --- a/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow/kustomization.yaml +++ b/test/e2e/testdata/platform/services/gitops/ephemeral-with-workflow/kustomization.yaml @@ -19,7 +19,6 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: -- 00-broker.yaml - 02-sonataflow_platform.yaml - sonataflow/03-sonataflow_callbackstatetimeouts.sw.yaml