Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.102.x-prod] SRVLOGIC-472: Job service job status change events are not sent in order #112

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions internal/controller/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package knative
import (
"context"
"fmt"
"strings"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand All @@ -50,14 +52,16 @@ type Availability struct {
}

const (
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
KafkaKnativeEventingDeliveryOrder = "kafka.eventing.knative.dev/delivery.order"
KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
)

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -131,19 +135,33 @@ func getDestinationWithNamespace(dest *duckv1.Destination, namespace string) *du
return dest
}

func ValidateBroker(name, namespace string) error {
func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) {
broker := &eventingv1.Broker{}
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
return nil, fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
}
return err
return nil, err
}
cond := broker.Status.GetCondition(apis.ConditionReady)
if cond != nil && cond.Status == corev1.ConditionTrue {
return nil
return broker, nil
}
return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
}

// GetBrokerClass returns the broker class for a Knative Eventing Broker.
func GetBrokerClass(broker *eventingv1.Broker) string {
if broker.Annotations == nil {
return ""
}
return fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
return broker.Annotations[eventing.BrokerClassKey]
}

// IsKafkaBroker returns true if the class for a Knative Eventing Broker corresponds to a Kafka broker.
func IsKafkaBroker(brokerClass string) bool {
// currently available kafka broker classes are "Kafka", and "KafkaNamespaced", for safety ask for the substring "Kafka".
return strings.Contains(brokerClass, "Kafka")
}

func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
Expand Down
57 changes: 37 additions & 20 deletions internal/controller/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,13 @@ func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
return &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down Expand Up @@ -611,24 +612,28 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), d.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
managedAnnotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations)
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand All @@ -638,6 +643,12 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
if knative.IsKafkaBroker(brokerClass) {
annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered
}
}

func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Sink
Expand All @@ -656,19 +667,24 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), j.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations)
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand All @@ -693,9 +709,10 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl
// No broker configured for the eventType. Skip and will not create trigger for it.
continue
}
if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
if _, err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
return nil, err
}
// construct eventingv1.Trigger
Expand Down
3 changes: 0 additions & 3 deletions internal/controller/sonataflowplatform_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
validateTrigger(t, cl, "data-index-process-definition-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-error-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ricardozanini Btw, the "data-index-process-sla-" was basically removed because the workflows don't produce this event, so we have basically one lees k8s object to create/maintain, etc.

validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)

Expand Down Expand Up @@ -1033,8 +1032,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)
Expand Down
2 changes: 0 additions & 2 deletions test/e2e/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expand Down Expand Up @@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expand Down