Skip to content

Commit

Permalink
Enable eventing when a Trigger is created for first time (#2034)
Browse files Browse the repository at this point in the history
* Enable eventing when a Trigger is created for first time

* Enable eventing when a Trigger is created for first time

* Code change

* code change

* code change

* code change after review

* code change after review

* code change after review

* code change after review
  • Loading branch information
grac3gao-zz authored and knative-prow-robot committed Oct 29, 2019
1 parent cb91d30 commit 5fe2b0a
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 8 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/eventing/v1alpha1/trigger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// DependencyAnnotation is the annotation key used to mark the sources that the Trigger depends on.
// This will be used when the kn client creates an importer and trigger pair for the user such that the trigger only receives events produced by the paired importer.
DependencyAnnotation = "knative.dev/dependency"
// InjectionAnnotation is the annotation key used to enable knative eventing injection for a namespace and automatically create a default broker.
// This will be used when the client creates a trigger paired with default broker and the default broker doesn't exist in the namespace
InjectionAnnotation = "knative-eventing-injection"
)

// +genclient
Expand Down
31 changes: 26 additions & 5 deletions pkg/apis/eventing/v1alpha1/trigger_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ var (
// Validate the Trigger.
func (t *Trigger) Validate(ctx context.Context) *apis.FieldError {
errs := t.Spec.Validate(ctx).ViaField("spec")
dependencyAnnotation, ok := t.GetAnnotations()[DependencyAnnotation]
if ok {
dependencyAnnotationPrefix := fmt.Sprintf("metadata.annotations[%s]", DependencyAnnotation)
errs = errs.Also(t.validateDependencyAnnotation(dependencyAnnotation).ViaField(dependencyAnnotationPrefix))
}
errs = t.validateAnnotation(errs, DependencyAnnotation, t.validateDependencyAnnotation)
errs = t.validateAnnotation(errs, InjectionAnnotation, t.validateInjectionAnnotation)
return errs
}

Expand Down Expand Up @@ -137,6 +134,14 @@ func GetObjRefFromDependencyAnnotation(dependencyAnnotation string) (corev1.Obje
return objectRef, nil
}

func (t *Trigger) validateAnnotation(errs *apis.FieldError, annotation string, function func(string) *apis.FieldError) *apis.FieldError {
if annotationValue, ok := t.GetAnnotations()[annotation]; ok {
annotationPrefix := fmt.Sprintf("metadata.annotations[%s]", annotation)
errs = errs.Also(function(annotationValue).ViaField(annotationPrefix))
}
return errs
}

func (t *Trigger) validateDependencyAnnotation(dependencyAnnotation string) *apis.FieldError {
depObjRef, err := GetObjRefFromDependencyAnnotation(dependencyAnnotation)
if err != nil {
Expand Down Expand Up @@ -168,3 +173,19 @@ func (t *Trigger) validateDependencyAnnotation(dependencyAnnotation string) *api
}
return errs
}

func (t *Trigger) validateInjectionAnnotation(injectionAnnotation string) *apis.FieldError {
if injectionAnnotation != "enabled" {
return &apis.FieldError{
Message: fmt.Sprintf(`The provided injection annotation value can only be "enabled", not %q`, injectionAnnotation),
Paths: []string{""},
}
}
if t.Spec.Broker != "default" {
return &apis.FieldError{
Message: fmt.Sprintf("The provided injection annotation is only used for default broker, but non-default broker specified here: %q", t.Spec.Broker),
Paths: []string{""},
}
}
return nil
}
41 changes: 41 additions & 0 deletions pkg/apis/eventing/v1alpha1/trigger_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ var (
DeprecatedName: "subscriber_test",
DeprecatedNamespace: "test_ns",
}
// Dependency annotation
validDependencyAnnotation = "{\"kind\":\"CronJobSource\",\"name\":\"test-cronjob-source\",\"apiVersion\":\"sources.eventing.knative.dev/v1alpha1\"}"
invalidDependencyAnnotation = "invalid dependency annotation"
dependencyAnnotationPath = fmt.Sprintf("metadata.annotations[%s]", DependencyAnnotation)
// Create default broker annotation
validInjectionAnnotation = "enabled"
invalidInjectionAnnotation = "disabled"
injectionAnnotationPath = fmt.Sprintf("metadata.annotations[%s]", InjectionAnnotation)
)

func TestTriggerValidation(t *testing.T) {
Expand Down Expand Up @@ -203,6 +208,42 @@ func TestTriggerValidation(t *testing.T) {
Message: "missing field(s)",
},
},
{
name: "invalid injection annotation value",
t: &Trigger{
ObjectMeta: v1.ObjectMeta{
Namespace: "test-ns",
Annotations: map[string]string{
InjectionAnnotation: invalidInjectionAnnotation,
}},
Spec: TriggerSpec{
Broker: "default",
Filter: validEmptyFilter,
Subscriber: validSubscriber,
}},
want: &apis.FieldError{
Paths: []string{injectionAnnotationPath},
Message: "The provided injection annotation value can only be \"enabled\", not \"disabled\"",
},
},
{
name: "valid injection annotation value, non-default broker specified",
t: &Trigger{
ObjectMeta: v1.ObjectMeta{
Namespace: "test-ns",
Annotations: map[string]string{
InjectionAnnotation: validInjectionAnnotation,
}},
Spec: TriggerSpec{
Broker: "test-broker",
Filter: validEmptyFilter,
Subscriber: validSubscriber,
}},
want: &apis.FieldError{
Paths: []string{injectionAnnotationPath},
Message: "The provided injection annotation is only used for default broker, but non-default broker specified here: \"test-broker\"",
},
},
}

for _, test := range tests {
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/testing/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ func WithUnmarshalFailedDependencyAnnotation() TriggerOption {
}
}

func WithInjectionAnnotation(injectionAnnotation string) TriggerOption {
return func(t *v1alpha1.Trigger) {
if t.Annotations == nil {
t.Annotations = make(map[string]string)
}
t.Annotations[v1alpha1.InjectionAnnotation] = injectionAnnotation
}
}

func WithDependencyAnnotation(dependencyAnnotation string) TriggerOption {
return func(t *v1alpha1.Trigger) {
if t.Annotations == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/resolver"
Expand Down Expand Up @@ -56,13 +57,15 @@ func NewController(
subscriptionInformer := subscription.Get(ctx)
brokerInformer := broker.Get(ctx)
serviceInformer := service.Get(ctx)
namespaceInformer := namespace.Get(ctx)

r := &Reconciler{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
triggerLister: triggerInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
brokerLister: brokerInformer.Lister(),
serviceLister: serviceInformer.Lister(),
namespaceLister: namespaceInformer.Lister(),
}
impl := controller.NewImpl(r, r.Logger, ReconcilerName)

Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"

// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/broker/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/trigger/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1alpha1/subscription/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"
)

func TestNew(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Reconciler struct {
subscriptionLister messaginglisters.SubscriptionLister
brokerLister listers.BrokerLister
serviceLister corev1listers.ServiceLister
namespaceLister corev1listers.NamespaceLister
// Regular tracker to track static resources. In particular, it tracks Broker's changes.
tracker tracker.Interface
// Dynamic tracker to track KResources. In particular, it tracks the dependency between Triggers and Sources.
Expand Down Expand Up @@ -164,6 +165,12 @@ func (r *Reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error {
logging.FromContext(ctx).Error("Unable to get the Broker", zap.Error(err))
if apierrs.IsNotFound(err) {
t.Status.MarkBrokerFailed("DoesNotExist", "Broker does not exist")
_, needDefaultBroker := t.GetAnnotations()[v1alpha1.InjectionAnnotation]
if t.Spec.Broker == "default" && needDefaultBroker {
if e := r.labelNamespace(ctx, t); e != nil {
logging.FromContext(ctx).Error("Unable to label the namespace", zap.Error(e))
}
}
} else {
t.Status.MarkBrokerFailed("BrokerGetFailed", "Failed to get broker")
}
Expand Down Expand Up @@ -312,6 +319,25 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.Trigger
return trig, err
}

// labelNamespace will label namespace with knative-eventing-injection=enabled
func (r *Reconciler) labelNamespace(ctx context.Context, t *v1alpha1.Trigger) error {
current, err := r.namespaceLister.Get(t.Namespace)
if err != nil {
t.Status.MarkBrokerFailed("NamespaceGetFailed", "Failed to get namespace resource to enable knative-eventing-injection")
return err
}
current = current.DeepCopy()
if current.Labels == nil {
current.Labels = map[string]string{}
}
current.Labels["knative-eventing-injection"] = "enabled"
if _, err = r.KubeClientSet.CoreV1().Namespaces().Update(current); err != nil {
t.Status.MarkBrokerFailed("NamespaceUpdateFailed", "Failed to label the namespace resource with knative-eventing-injection")
return err
}
return nil
}

// getBrokerFilterService returns the K8s service for trigger 't' if exists,
// otherwise it returns an error.
func (r *Reconciler) getBrokerFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
Expand Down
66 changes: 65 additions & 1 deletion pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const (
testData = "data"
sinkName = "testsink"

injectionAnnotation = "enabled"

currentGeneration = 1
outdatedGeneration = 0
)
Expand Down Expand Up @@ -151,7 +153,7 @@ func TestAllCases(t *testing.T) {
// Eventf(corev1.EventTypeWarning, "ChannelReferenceFetchFailed", "Failed to validate spec.channel exists: s \"\" not found"),
// },
}, {
Name: "Broker not found",
Name: "Non-default broker not found",
Key: triggerKey,
Objects: []runtime.Object{
reconciletesting.NewTrigger(triggerName, testNS, brokerName,
Expand All @@ -171,6 +173,61 @@ func TestAllCases(t *testing.T) {
reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"),
),
}},
}, {
Name: "Default broker not found, with injection annotation enabled",
Key: triggerKey,
Objects: []runtime.Object{
reconciletesting.NewTrigger(triggerName, testNS, "default",
reconciletesting.WithTriggerUID(triggerUID),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithInjectionAnnotation(injectionAnnotation)),
reconciletesting.NewNamespace(testNS,
reconciletesting.WithNamespaceLabeled(map[string]string{})),
},
WantErr: true,
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: broker.eventing.knative.dev \"default\" not found"),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, "default",
reconciletesting.WithTriggerUID(triggerUID),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithInjectionAnnotation(injectionAnnotation),
reconciletesting.WithTriggerBrokerFailed("DoesNotExist", "Broker does not exist"),
),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewNamespace(testNS,
reconciletesting.WithNamespaceLabeled(map[string]string{v1alpha1.InjectionAnnotation: injectionAnnotation})),
}},
}, {
Name: "Default broker found, with injection annotation enabled",
Key: triggerKey,
Objects: []runtime.Object{
makeReadyDefaultBroker(),
reconciletesting.NewTrigger(triggerName, testNS, "default",
reconciletesting.WithTriggerUID(triggerUID),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithInjectionAnnotation(injectionAnnotation)),
},
WantErr: true,
WantEvents: []string{
// Only check if default broker is ready (not check other resources), so failed at the next step, check for filter service
Eventf(corev1.EventTypeWarning, "TriggerServiceFailed", "Broker's Filter service not found"),
Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconciliation failed: failed to find Broker's Filter service"),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewTrigger(triggerName, testNS, "default",
reconciletesting.WithTriggerUID(triggerUID),
reconciletesting.WithTriggerSubscriberURI(subscriberURI),
reconciletesting.WithInitTriggerConditions,
reconciletesting.WithTriggerBrokerReady(),
reconciletesting.WithInjectionAnnotation(injectionAnnotation),
),
}},
}, {
Name: "Broker get failure",
Key: triggerKey,
Expand Down Expand Up @@ -797,6 +854,7 @@ func TestAllCases(t *testing.T) {
subscriptionLister: listers.GetSubscriptionLister(),
brokerLister: listers.GetBrokerLister(),
serviceLister: listers.GetK8sServiceLister(),
namespaceLister: listers.GetNamespaceLister(),
tracker: tracker.New(func(types.NamespacedName) {}, 0),
addressableTracker: duck.NewListableTracker(ctx, &duckv1alpha1.AddressableType{}, func(types.NamespacedName) {}, 0),
kresourceTracker: duck.NewListableTracker(ctx, &duckv1alpha1.KResource{}, func(types.NamespacedName) {}, 0),
Expand Down Expand Up @@ -873,6 +931,12 @@ func makeReadyBroker() *v1alpha1.Broker {
return b
}

func makeReadyDefaultBroker() *v1alpha1.Broker {
b := makeReadyBroker()
b.Name = "default"
return b
}

func makeTriggerChannelRef() *corev1.ObjectReference {
return &corev1.ObjectReference{
APIVersion: "eventing.knative.dev/v1alpha1",
Expand Down

0 comments on commit 5fe2b0a

Please sign in to comment.