Skip to content

Commit

Permalink
incubator-kie-tools-2774: Make the SonataFlow Operator to configure t…
Browse files Browse the repository at this point in the history
…he Jobs related Knative Eventing Triggers to consume the events in order (#2775)
  • Loading branch information
wmedvede authored Nov 27, 2024
1 parent 886fd6e commit 0ca727d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 39 deletions.
44 changes: 31 additions & 13 deletions packages/sonataflow-operator/internal/controller/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package knative
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand All @@ -51,14 +53,16 @@ type Availability struct {
}

const (
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
KafkaKnativeEventingDeliveryOrder = "kafka.eventing.knative.dev/delivery.order"
KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
)

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -132,19 +136,33 @@ func getDestinationWithNamespace(dest *duckv1.Destination, namespace string) *du
return dest
}

func ValidateBroker(name, namespace string) error {
func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) {
broker := &eventingv1.Broker{}
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
return nil, fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
}
return err
return nil, err
}
cond := broker.Status.GetCondition(apis.ConditionReady)
if cond != nil && cond.Status == corev1.ConditionTrue {
return nil
return broker, nil
}
return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
}

// GetBrokerClass returns the broker class for a Knative Eventing Broker.
func GetBrokerClass(broker *eventingv1.Broker) string {
if broker.Annotations == nil {
return ""
}
return fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
return broker.Annotations[eventing.BrokerClassKey]
}

// IsKafkaBroker returns true if the class for a Knative Eventing Broker corresponds to a Kafka broker.
func IsKafkaBroker(brokerClass string) bool {
// currently available kafka broker classes are "Kafka", and "KafkaNamespaced", for safety ask for the substring "Kafka".
return strings.Contains(brokerClass, "Kafka")
}

func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,13 @@ func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
return &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down Expand Up @@ -613,24 +614,28 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), d.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
managedAnnotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations)
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand All @@ -640,6 +645,12 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
if knative.IsKafkaBroker(brokerClass) {
annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered
}
}

func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Sink
Expand All @@ -658,19 +669,24 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), j.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations)
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand All @@ -695,9 +711,10 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl
// No broker configured for the eventType. Skip and will not create trigger for it.
continue
}
if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
if _, err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
return nil, err
}
// construct eventingv1.Trigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
validateTrigger(t, cl, "data-index-process-definition-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-error-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)

Expand Down Expand Up @@ -1034,8 +1033,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)
Expand Down
2 changes: 0 additions & 2 deletions packages/sonataflow-operator/test/e2e/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expand Down Expand Up @@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expand Down

0 comments on commit 0ca727d

Please sign in to comment.