From 5fe2b0a21aead60f84ecddec224dbbb3a6d62d07 Mon Sep 17 00:00:00 2001 From: grac3gao <52978759+grac3gao@users.noreply.github.com> Date: Tue, 29 Oct 2019 11:35:11 -0700 Subject: [PATCH] Enable eventing when a Trigger is created for first time (#2034) * Enable eventing when a Trigger is created for first time * Enable eventing when a Trigger is created for first time * Code change * code change * code change * code change after review * code change after review * code change after review * code change after review --- pkg/apis/eventing/v1alpha1/trigger_types.go | 3 + .../eventing/v1alpha1/trigger_validation.go | 31 +++++++-- .../v1alpha1/trigger_validation_test.go | 41 ++++++++++++ pkg/reconciler/testing/trigger.go | 9 +++ pkg/reconciler/trigger/controller.go | 3 + pkg/reconciler/trigger/controller_test.go | 4 +- pkg/reconciler/trigger/trigger.go | 26 ++++++++ pkg/reconciler/trigger/trigger_test.go | 66 ++++++++++++++++++- 8 files changed, 175 insertions(+), 8 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/trigger_types.go b/pkg/apis/eventing/v1alpha1/trigger_types.go index 806a1aca407..97669f601ec 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_types.go +++ b/pkg/apis/eventing/v1alpha1/trigger_types.go @@ -31,6 +31,9 @@ const ( // DependencyAnnotation is the annotation key used to mark the sources that the Trigger depends on. // This will be used when the kn client creates an importer and trigger pair for the user such that the trigger only receives events produced by the paired importer. DependencyAnnotation = "knative.dev/dependency" + // InjectionAnnotation is the annotation key used to enable knative eventing injection for a namespace and automatically create a default broker. + // This will be used when the client creates a trigger paired with default broker and the default broker doesn't exist in the namespace + InjectionAnnotation = "knative-eventing-injection" ) // +genclient diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index 73096cd3eaa..38ec5433409 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -36,11 +36,8 @@ var ( // Validate the Trigger. func (t *Trigger) Validate(ctx context.Context) *apis.FieldError { errs := t.Spec.Validate(ctx).ViaField("spec") - dependencyAnnotation, ok := t.GetAnnotations()[DependencyAnnotation] - if ok { - dependencyAnnotationPrefix := fmt.Sprintf("metadata.annotations[%s]", DependencyAnnotation) - errs = errs.Also(t.validateDependencyAnnotation(dependencyAnnotation).ViaField(dependencyAnnotationPrefix)) - } + errs = t.validateAnnotation(errs, DependencyAnnotation, t.validateDependencyAnnotation) + errs = t.validateAnnotation(errs, InjectionAnnotation, t.validateInjectionAnnotation) return errs } @@ -137,6 +134,14 @@ func GetObjRefFromDependencyAnnotation(dependencyAnnotation string) (corev1.Obje return objectRef, nil } +func (t *Trigger) validateAnnotation(errs *apis.FieldError, annotation string, function func(string) *apis.FieldError) *apis.FieldError { + if annotationValue, ok := t.GetAnnotations()[annotation]; ok { + annotationPrefix := fmt.Sprintf("metadata.annotations[%s]", annotation) + errs = errs.Also(function(annotationValue).ViaField(annotationPrefix)) + } + return errs +} + func (t *Trigger) validateDependencyAnnotation(dependencyAnnotation string) *apis.FieldError { depObjRef, err := GetObjRefFromDependencyAnnotation(dependencyAnnotation) if err != nil { @@ -168,3 +173,19 @@ func (t *Trigger) validateDependencyAnnotation(dependencyAnnotation string) *api } return errs } + +func (t *Trigger) validateInjectionAnnotation(injectionAnnotation string) *apis.FieldError { + if injectionAnnotation != "enabled" { + return &apis.FieldError{ + Message: fmt.Sprintf(`The provided injection annotation value can only be "enabled", not %q`, injectionAnnotation), + Paths: []string{""}, + } + } + if t.Spec.Broker != "default" { + return &apis.FieldError{ + Message: fmt.Sprintf("The provided injection annotation is only used for default broker, but non-default broker specified here: %q", t.Spec.Broker), + Paths: []string{""}, + } + } + return nil +} diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go index 6cb01bbc885..a99d4df7755 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation_test.go @@ -61,9 +61,14 @@ var ( DeprecatedName: "subscriber_test", DeprecatedNamespace: "test_ns", } + // Dependency annotation validDependencyAnnotation = "{\"kind\":\"CronJobSource\",\"name\":\"test-cronjob-source\",\"apiVersion\":\"sources.eventing.knative.dev/v1alpha1\"}" invalidDependencyAnnotation = "invalid dependency annotation" dependencyAnnotationPath = fmt.Sprintf("metadata.annotations[%s]", DependencyAnnotation) + // Create default broker annotation + validInjectionAnnotation = "enabled" + invalidInjectionAnnotation = "disabled" + injectionAnnotationPath = fmt.Sprintf("metadata.annotations[%s]", InjectionAnnotation) ) func TestTriggerValidation(t *testing.T) { @@ -203,6 +208,42 @@ func TestTriggerValidation(t *testing.T) { Message: "missing field(s)", }, }, + { + name: "invalid injection annotation value", + t: &Trigger{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "test-ns", + Annotations: map[string]string{ + InjectionAnnotation: invalidInjectionAnnotation, + }}, + Spec: TriggerSpec{ + Broker: "default", + Filter: validEmptyFilter, + Subscriber: validSubscriber, + }}, + want: &apis.FieldError{ + Paths: []string{injectionAnnotationPath}, + Message: "The provided injection annotation value can only be \"enabled\", not \"disabled\"", + }, + }, + { + name: "valid injection annotation value, non-default broker specified", + t: &Trigger{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "test-ns", + Annotations: map[string]string{ + InjectionAnnotation: validInjectionAnnotation, + }}, + Spec: TriggerSpec{ + Broker: "test-broker", + Filter: validEmptyFilter, + Subscriber: validSubscriber, + }}, + want: &apis.FieldError{ + Paths: []string{injectionAnnotationPath}, + Message: "The provided injection annotation is only used for default broker, but non-default broker specified here: \"test-broker\"", + }, + }, } for _, test := range tests { diff --git a/pkg/reconciler/testing/trigger.go b/pkg/reconciler/testing/trigger.go index 5e0e56b49d7..8b130ae689a 100644 --- a/pkg/reconciler/testing/trigger.go +++ b/pkg/reconciler/testing/trigger.go @@ -137,6 +137,15 @@ func WithUnmarshalFailedDependencyAnnotation() TriggerOption { } } +func WithInjectionAnnotation(injectionAnnotation string) TriggerOption { + return func(t *v1alpha1.Trigger) { + if t.Annotations == nil { + t.Annotations = make(map[string]string) + } + t.Annotations[v1alpha1.InjectionAnnotation] = injectionAnnotation + } +} + func WithDependencyAnnotation(dependencyAnnotation string) TriggerOption { return func(t *v1alpha1.Trigger) { if t.Annotations == nil { diff --git a/pkg/reconciler/trigger/controller.go b/pkg/reconciler/trigger/controller.go index 11d466b15d8..e8f7307e96f 100644 --- a/pkg/reconciler/trigger/controller.go +++ b/pkg/reconciler/trigger/controller.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/client-go/tools/cache" + "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/resolver" @@ -56,6 +57,7 @@ func NewController( subscriptionInformer := subscription.Get(ctx) brokerInformer := broker.Get(ctx) serviceInformer := service.Get(ctx) + namespaceInformer := namespace.Get(ctx) r := &Reconciler{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), @@ -63,6 +65,7 @@ func NewController( subscriptionLister: subscriptionInformer.Lister(), brokerLister: brokerInformer.Lister(), serviceLister: serviceInformer.Lister(), + namespaceLister: namespaceInformer.Lister(), } impl := controller.NewImpl(r, r.Logger, ReconcilerName) diff --git a/pkg/reconciler/trigger/controller_test.go b/pkg/reconciler/trigger/controller_test.go index 0a50a52cae4..39b8c3cc58b 100644 --- a/pkg/reconciler/trigger/controller_test.go +++ b/pkg/reconciler/trigger/controller_test.go @@ -22,12 +22,12 @@ import ( "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" - _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" - // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/trigger/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1alpha1/subscription/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" ) func TestNew(t *testing.T) { diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 8813dfd0a1d..04ee9296ed6 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -69,6 +69,7 @@ type Reconciler struct { subscriptionLister messaginglisters.SubscriptionLister brokerLister listers.BrokerLister serviceLister corev1listers.ServiceLister + namespaceLister corev1listers.NamespaceLister // Regular tracker to track static resources. In particular, it tracks Broker's changes. tracker tracker.Interface // Dynamic tracker to track KResources. In particular, it tracks the dependency between Triggers and Sources. @@ -164,6 +165,12 @@ func (r *Reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error { logging.FromContext(ctx).Error("Unable to get the Broker", zap.Error(err)) if apierrs.IsNotFound(err) { t.Status.MarkBrokerFailed("DoesNotExist", "Broker does not exist") + _, needDefaultBroker := t.GetAnnotations()[v1alpha1.InjectionAnnotation] + if t.Spec.Broker == "default" && needDefaultBroker { + if e := r.labelNamespace(ctx, t); e != nil { + logging.FromContext(ctx).Error("Unable to label the namespace", zap.Error(e)) + } + } } else { t.Status.MarkBrokerFailed("BrokerGetFailed", "Failed to get broker") } @@ -312,6 +319,25 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.Trigger return trig, err } +// labelNamespace will label namespace with knative-eventing-injection=enabled +func (r *Reconciler) labelNamespace(ctx context.Context, t *v1alpha1.Trigger) error { + current, err := r.namespaceLister.Get(t.Namespace) + if err != nil { + t.Status.MarkBrokerFailed("NamespaceGetFailed", "Failed to get namespace resource to enable knative-eventing-injection") + return err + } + current = current.DeepCopy() + if current.Labels == nil { + current.Labels = map[string]string{} + } + current.Labels["knative-eventing-injection"] = "enabled" + if _, err = r.KubeClientSet.CoreV1().Namespaces().Update(current); err != nil { + t.Status.MarkBrokerFailed("NamespaceUpdateFailed", "Failed to label the namespace resource with knative-eventing-injection") + return err + } + return nil +} + // getBrokerFilterService returns the K8s service for trigger 't' if exists, // otherwise it returns an error. func (r *Reconciler) getBrokerFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) { diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index a3f175fb6f1..82084b52b22 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -112,6 +112,8 @@ const ( testData = "data" sinkName = "testsink" + injectionAnnotation = "enabled" + currentGeneration = 1 outdatedGeneration = 0 ) @@ -151,7 +153,7 @@ func TestAllCases(t *testing.T) { // Eventf(corev1.EventTypeWarning, "ChannelReferenceFetchFailed", "Failed to validate spec.channel exists: s \"\" not found"), // }, }, { - Name: "Broker not found", + Name: "Non-default broker not found", Key: triggerKey, Objects: []runtime.Object{ reconciletesting.NewTrigger(triggerName, testNS, brokerName, @@ -171,6 +173,61 @@ func TestAllCases(t *testing.T) { reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"), ), }}, + }, { + Name: "Default broker not found, with injection annotation enabled", + Key: triggerKey, + Objects: []runtime.Object{ + reconciletesting.NewTrigger(triggerName, testNS, "default", + reconciletesting.WithTriggerUID(triggerUID), + reconciletesting.WithTriggerSubscriberURI(subscriberURI), + reconciletesting.WithInitTriggerConditions, + reconciletesting.WithInjectionAnnotation(injectionAnnotation)), + reconciletesting.NewNamespace(testNS, + reconciletesting.WithNamespaceLabeled(map[string]string{})), + }, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: broker.eventing.knative.dev \"default\" not found"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewTrigger(triggerName, testNS, "default", + reconciletesting.WithTriggerUID(triggerUID), + reconciletesting.WithTriggerSubscriberURI(subscriberURI), + reconciletesting.WithInitTriggerConditions, + reconciletesting.WithInjectionAnnotation(injectionAnnotation), + reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"), + ), + }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNamespace(testNS, + reconciletesting.WithNamespaceLabeled(map[string]string{v1alpha1.InjectionAnnotation: injectionAnnotation})), + }}, + }, { + Name: "Default broker found, with injection annotation enabled", + Key: triggerKey, + Objects: []runtime.Object{ + makeReadyDefaultBroker(), + reconciletesting.NewTrigger(triggerName, testNS, "default", + reconciletesting.WithTriggerUID(triggerUID), + reconciletesting.WithTriggerSubscriberURI(subscriberURI), + reconciletesting.WithInitTriggerConditions, + reconciletesting.WithInjectionAnnotation(injectionAnnotation)), + }, + WantErr: true, + WantEvents: []string{ + // Only check if default broker is ready (not check other resources), so failed at the next step, check for filter service + Eventf(corev1.EventTypeWarning, "TriggerServiceFailed", "Broker's Filter service not found"), + Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: failed to find Broker's Filter service"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewTrigger(triggerName, testNS, "default", + reconciletesting.WithTriggerUID(triggerUID), + reconciletesting.WithTriggerSubscriberURI(subscriberURI), + reconciletesting.WithInitTriggerConditions, + reconciletesting.WithTriggerBrokerReady(), + reconciletesting.WithInjectionAnnotation(injectionAnnotation), + ), + }}, }, { Name: "Broker get failure", Key: triggerKey, @@ -797,6 +854,7 @@ func TestAllCases(t *testing.T) { subscriptionLister: listers.GetSubscriptionLister(), brokerLister: listers.GetBrokerLister(), serviceLister: listers.GetK8sServiceLister(), + namespaceLister: listers.GetNamespaceLister(), tracker: tracker.New(func(types.NamespacedName) {}, 0), addressableTracker: duck.NewListableTracker(ctx, &duckv1alpha1.AddressableType{}, func(types.NamespacedName) {}, 0), kresourceTracker: duck.NewListableTracker(ctx, &duckv1alpha1.KResource{}, func(types.NamespacedName) {}, 0), @@ -873,6 +931,12 @@ func makeReadyBroker() *v1alpha1.Broker { return b } +func makeReadyDefaultBroker() *v1alpha1.Broker { + b := makeReadyBroker() + b.Name = "default" + return b +} + func makeTriggerChannelRef() *corev1.ObjectReference { return &corev1.ObjectReference{ APIVersion: "eventing.knative.dev/v1alpha1",