diff --git a/packages/sonataflow-operator/internal/controller/knative/knative.go b/packages/sonataflow-operator/internal/controller/knative/knative.go index d215a66398f..49f2540b2d4 100644 --- a/packages/sonataflow-operator/internal/controller/knative/knative.go +++ b/packages/sonataflow-operator/internal/controller/knative/knative.go @@ -22,11 +22,13 @@ package knative import ( "context" "fmt" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" + "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" @@ -51,14 +53,16 @@ type Availability struct { } const ( - kSink = "K_SINK" - knativeBundleVolume = "kne-bundle-volume" - kCeOverRides = "K_CE_OVERRIDES" - knativeServingGroup = "serving.knative.dev" - knativeEventingGroup = "eventing.knative.dev" - knativeEventingAPIVersion = "eventing.knative.dev/v1" - knativeBrokerKind = "Broker" - knativeSinkProvided = "SinkProvided" + kSink = "K_SINK" + knativeBundleVolume = "kne-bundle-volume" + kCeOverRides = "K_CE_OVERRIDES" + knativeServingGroup = "serving.knative.dev" + knativeEventingGroup = "eventing.knative.dev" + knativeEventingAPIVersion = "eventing.knative.dev/v1" + knativeBrokerKind = "Broker" + knativeSinkProvided = "SinkProvided" + KafkaKnativeEventingDeliveryOrder = "kafka.eventing.knative.dev/delivery.order" + KafkaKnativeEventingDeliveryOrderOrdered = "ordered" ) func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) { @@ -132,19 +136,33 @@ func getDestinationWithNamespace(dest *duckv1.Destination, namespace string) *du return dest } -func ValidateBroker(name, namespace string) error { +func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) { broker := &eventingv1.Broker{} if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil { if errors.IsNotFound(err) { - return fmt.Errorf("broker %s in namespace %s does not exist", name, namespace) + return nil, fmt.Errorf("broker %s in namespace %s does not exist", name, namespace) } - return err + return nil, err } cond := broker.Status.GetCondition(apis.ConditionReady) if cond != nil && cond.Status == corev1.ConditionTrue { - return nil + return broker, nil + } + return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, namespace) +} + +// GetBrokerClass returns the broker class for a Knative Eventing Broker. +func GetBrokerClass(broker *eventingv1.Broker) string { + if broker.Annotations == nil { + return "" } - return fmt.Errorf("broker %s in namespace %s is not ready", name, namespace) + return broker.Annotations[eventing.BrokerClassKey] +} + +// IsKafkaBroker returns true if the class for a Knative Eventing Broker corresponds to a Kafka broker. +func IsKafkaBroker(brokerClass string) bool { + // currently available kafka broker classes are "Kafka", and "KafkaNamespaced", for safety ask for the substring "Kafka". + return strings.Contains(brokerClass, "Kafka") } func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) { diff --git a/packages/sonataflow-operator/internal/controller/platform/services/services.go b/packages/sonataflow-operator/internal/controller/platform/services/services.go index 97114d3e777..03427c548f6 100644 --- a/packages/sonataflow-operator/internal/controller/platform/services/services.go +++ b/packages/sonataflow-operator/internal/controller/platform/services/services.go @@ -575,12 +575,13 @@ func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination { return GetPlatformBroker(d.platform) } -func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger { +func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger { return &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())), - Namespace: namespace, - Labels: labels, + Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())), + Namespace: namespace, + Labels: labels, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, @@ -613,7 +614,9 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata if len(namespace) == 0 { namespace = platform.Namespace } - if err := knative.ValidateBroker(brokerName, namespace); err != nil { + var brokerObject *eventingv1.Broker + var err error + if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil { event := &corev1.Event{ Type: corev1.EventTypeWarning, Reason: WaitingKnativeEventing, @@ -621,16 +624,18 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata } return nil, event, err } + annotations := make(map[string]string) + managedAnnotations := make(map[string]string) + addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations) serviceName := d.GetServiceName() return []client.Object{ - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform), + d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil } func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { @@ -640,6 +645,12 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { return GetPlatformBroker(d.platform) } +func addTriggerAnnotations(brokerClass string, annotations map[string]string) { + if knative.IsKafkaBroker(brokerClass) { + annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered + } +} + func (d JobServiceHandler) GetSink() *duckv1.Destination { if d.platform.Spec.Services.JobService.Sink != nil { return d.platform.Spec.Services.JobService.Sink @@ -658,7 +669,9 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat if len(namespace) == 0 { namespace = platform.Namespace } - if err := knative.ValidateBroker(brokerName, namespace); err != nil { + var brokerObject *eventingv1.Broker + var err error + if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil { event := &corev1.Event{ Type: corev1.EventTypeWarning, Reason: WaitingKnativeEventing, @@ -666,11 +679,14 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat } return nil, event, err } + annotations := make(map[string]string) + addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations) jobCreateTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())), - Namespace: namespace, - Labels: lbl, + Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())), + Namespace: namespace, + Labels: lbl, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, @@ -695,9 +711,10 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat resultObjs = append(resultObjs, jobCreateTrigger) jobDeleteTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())), - Namespace: namespace, - Labels: lbl, + Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())), + Namespace: namespace, + Labels: lbl, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, diff --git a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go index e61881ebf62..396c270fa68 100644 --- a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go +++ b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go @@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl // No broker configured for the eventType. Skip and will not create trigger for it. continue } - if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil { + if _, err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil { return nil, err } // construct eventingv1.Trigger diff --git a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go index 925d84a98f4..cb983e82f51 100644 --- a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go +++ b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go @@ -920,7 +920,6 @@ func TestSonataFlowPlatformController(t *testing.T) { validateTrigger(t, cl, "data-index-process-definition-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-error-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger) - validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger) @@ -1034,8 +1033,6 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger) assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) - validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger) - assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger) assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger) diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go b/packages/sonataflow-operator/test/e2e/platform_test.go index d2a9ce433ca..c7074176d85 100644 --- a/packages/sonataflow-operator/test/e2e/platform_test.go +++ b/packages/sonataflow-operator/test/e2e/platform_test.go @@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { Expect(err).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) - Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) @@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { Expect(err).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) - Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())