diff --git a/internal/controller/knative/knative.go b/internal/controller/knative/knative.go index 48aabd1ea..fab2dd6ae 100644 --- a/internal/controller/knative/knative.go +++ b/internal/controller/knative/knative.go @@ -22,6 +22,7 @@ package knative import ( "context" "fmt" + "strings" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" @@ -31,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" + "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" @@ -50,14 +52,16 @@ type Availability struct { } const ( - kSink = "K_SINK" - knativeBundleVolume = "kne-bundle-volume" - kCeOverRides = "K_CE_OVERRIDES" - knativeServingGroup = "serving.knative.dev" - knativeEventingGroup = "eventing.knative.dev" - knativeEventingAPIVersion = "eventing.knative.dev/v1" - knativeBrokerKind = "Broker" - knativeSinkProvided = "SinkProvided" + kSink = "K_SINK" + knativeBundleVolume = "kne-bundle-volume" + kCeOverRides = "K_CE_OVERRIDES" + knativeServingGroup = "serving.knative.dev" + knativeEventingGroup = "eventing.knative.dev" + knativeEventingAPIVersion = "eventing.knative.dev/v1" + knativeBrokerKind = "Broker" + knativeSinkProvided = "SinkProvided" + KafkaKnativeEventingDeliveryOrder = "kafka.eventing.knative.dev/delivery.order" + KafkaKnativeEventingDeliveryOrderOrdered = "ordered" ) func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) { @@ -131,19 +135,33 @@ func getDestinationWithNamespace(dest *duckv1.Destination, namespace string) *du return dest } -func ValidateBroker(name, namespace string) error { +func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) { broker := &eventingv1.Broker{} if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil { if errors.IsNotFound(err) { - return fmt.Errorf("broker %s in namespace %s does not exist", name, namespace) + return nil, fmt.Errorf("broker %s in namespace %s does not exist", name, namespace) } - return err + return nil, err } cond := broker.Status.GetCondition(apis.ConditionReady) if cond != nil && cond.Status == corev1.ConditionTrue { - return nil + return broker, nil + } + return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, namespace) +} + +// GetBrokerClass returns the broker class for a Knative Eventing Broker. +func GetBrokerClass(broker *eventingv1.Broker) string { + if broker.Annotations == nil { + return "" } - return fmt.Errorf("broker %s in namespace %s is not ready", name, namespace) + return broker.Annotations[eventing.BrokerClassKey] +} + +// IsKafkaBroker returns true if the class for a Knative Eventing Broker corresponds to a Kafka broker. +func IsKafkaBroker(brokerClass string) bool { + // currently available kafka broker classes are "Kafka", and "KafkaNamespaced", for safety ask for the substring "Kafka". + return strings.Contains(brokerClass, "Kafka") } func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) { diff --git a/internal/controller/platform/services/services.go b/internal/controller/platform/services/services.go index 6c9644add..2b26dab26 100644 --- a/internal/controller/platform/services/services.go +++ b/internal/controller/platform/services/services.go @@ -573,12 +573,13 @@ func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination { return GetPlatformBroker(d.platform) } -func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger { +func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger { return &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())), - Namespace: namespace, - Labels: labels, + Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())), + Namespace: namespace, + Labels: labels, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, @@ -611,7 +612,9 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata if len(namespace) == 0 { namespace = platform.Namespace } - if err := knative.ValidateBroker(brokerName, namespace); err != nil { + var brokerObject *eventingv1.Broker + var err error + if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil { event := &corev1.Event{ Type: corev1.EventTypeWarning, Reason: WaitingKnativeEventing, @@ -619,16 +622,18 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata } return nil, event, err } + annotations := make(map[string]string) + managedAnnotations := make(map[string]string) + addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations) serviceName := d.GetServiceName() return []client.Object{ - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform), - d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform), + d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform), + d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil } func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { @@ -638,6 +643,12 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination { return GetPlatformBroker(d.platform) } +func addTriggerAnnotations(brokerClass string, annotations map[string]string) { + if knative.IsKafkaBroker(brokerClass) { + annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered + } +} + func (d JobServiceHandler) GetSink() *duckv1.Destination { if d.platform.Spec.Services.JobService.Sink != nil { return d.platform.Spec.Services.JobService.Sink @@ -656,7 +667,9 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat if len(namespace) == 0 { namespace = platform.Namespace } - if err := knative.ValidateBroker(brokerName, namespace); err != nil { + var brokerObject *eventingv1.Broker + var err error + if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil { event := &corev1.Event{ Type: corev1.EventTypeWarning, Reason: WaitingKnativeEventing, @@ -664,11 +677,14 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat } return nil, event, err } + annotations := make(map[string]string) + addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations) jobCreateTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())), - Namespace: namespace, - Labels: lbl, + Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())), + Namespace: namespace, + Labels: lbl, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, @@ -693,9 +709,10 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat resultObjs = append(resultObjs, jobCreateTrigger) jobDeleteTrigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())), - Namespace: namespace, - Labels: lbl, + Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())), + Namespace: namespace, + Labels: lbl, + Annotations: annotations, }, Spec: eventingv1.TriggerSpec{ Broker: brokerName, diff --git a/internal/controller/profiles/common/object_creators.go b/internal/controller/profiles/common/object_creators.go index 1a1bbec73..3c9d18014 100644 --- a/internal/controller/profiles/common/object_creators.go +++ b/internal/controller/profiles/common/object_creators.go @@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl // No broker configured for the eventType. Skip and will not create trigger for it. continue } - if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil { + if _, err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil { return nil, err } // construct eventingv1.Trigger diff --git a/internal/controller/sonataflowplatform_controller_test.go b/internal/controller/sonataflowplatform_controller_test.go index af9280a3b..c61c633f8 100644 --- a/internal/controller/sonataflowplatform_controller_test.go +++ b/internal/controller/sonataflowplatform_controller_test.go @@ -919,7 +919,6 @@ func TestSonataFlowPlatformController(t *testing.T) { validateTrigger(t, cl, "data-index-process-definition-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-error-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger) - validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger) validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger) @@ -1033,8 +1032,6 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger) assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) - validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger) - assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger) assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource) validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger) diff --git a/test/e2e/platform_test.go b/test/e2e/platform_test.go index 74315f52b..0db407bee 100644 --- a/test/e2e/platform_test.go +++ b/test/e2e/platform_test.go @@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { Expect(err).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) - Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred()) @@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { Expect(err).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) - Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred()) Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())