Skip to content

Commit

Permalink
Set dead letter sink URI in the Channel status (#6256) (#6264)
Browse files Browse the repository at this point in the history
* Generic Channel dead letter sink URI

We were dropping the dead letter sink URI in the status of the
`Channel`.

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add Example_withTemplate

Signed-off-by: Pierangelo Di Pilato <[email protected]>

Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Mar 14, 2022
1 parent 836d70f commit 61f9bc7
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 14 deletions.
3 changes: 3 additions & 0 deletions config/core/resources/channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ spec:
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.'
type: string
deadLetterSinkUri:
description: DeadLetterSinkURI is the resolved URI of the dead letter sink that will be used as a fallback when not specified by Triggers.
type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
Expand Down
5 changes: 5 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3768,6 +3768,11 @@ in verbatim to the Channel CRD as Spec section.</p>
</tr>
</tbody>
</table>
<h3 id="messaging.knative.dev/v1.ChannelTemplateSpecOption">ChannelTemplateSpecOption
</h3>
<p>
<p>ChannelTemplateSpecOption is an optional function for ChannelTemplateSpec.</p>
</p>
<h3 id="messaging.knative.dev/v1.InMemoryChannelSpec">InMemoryChannelSpec
</h3>
<p>
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/messaging/v1/channel_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ type ChannelTemplateSpec struct {
// +optional
Spec *runtime.RawExtension `json:"spec,omitempty"`
}

// ChannelTemplateSpecOption is an optional function for ChannelTemplateSpec.
type ChannelTemplateSpecOption func(*ChannelTemplateSpec) error
30 changes: 26 additions & 4 deletions test/rekt/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ import (
"testing"

"github.com/cloudevents/sdk-go/v2/binding"
"knative.dev/eventing/test/rekt/features/channel"
ch "knative.dev/eventing/test/rekt/resources/channel"
chimpl "knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
"knative.dev/reconciler-test/pkg/manifest"

"knative.dev/eventing/test/rekt/features/channel"
ch "knative.dev/eventing/test/rekt/resources/channel"
chimpl "knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
)

// TestChannelConformance
Expand Down Expand Up @@ -182,6 +185,25 @@ func TestChannelDeadLetterSink(t *testing.T) {
env.Test(ctx, t, channel.DeadLetterSink())
}

// TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in
// the dead letter sink.
func TestGenericChannelDeadLetterSink(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri)
}
env.Test(ctx, t, channel.DeadLetterSinkGenericChannel(createSubscriberFn))
}

/*
TestEventTransformationForSubscription tests the following scenario:
Expand Down
57 changes: 50 additions & 7 deletions test/rekt/features/channel/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,23 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/eventshub"

"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/resources/svc"

"knative.dev/reconciler-test/pkg/eventshub/assert"

"knative.dev/eventing/test/rekt/resources/channel"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/containersource"
"knative.dev/eventing/test/rekt/resources/delivery"
"knative.dev/eventing/test/rekt/resources/eventlibrary"
"knative.dev/eventing/test/rekt/resources/pingsource"
"knative.dev/eventing/test/rekt/resources/source"
"knative.dev/eventing/test/rekt/resources/subscription"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/resources/svc"
)

func ChannelChain(length int) *feature.Feature {
Expand Down Expand Up @@ -92,10 +98,47 @@ func DeadLetterSink() *feature.Feature {
subscription.WithReply(svc.AsKReference(failer), ""),
))

f.Requirement("channel is ready", channel_impl.IsReady(name))
f.Requirement("containersource is ready", containersource.IsReady(cs))
f.Setup("channel is ready", channel_impl.IsReady(name))
f.Setup("containersource is ready", containersource.IsReady(cs))

f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel_impl.GVR()))

f.Assert("dls receives events", assert.OnStore(sink).
MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")).
AtLeast(1),
)

return f
}

func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature {
f := feature.NewFeature()
sink := feature.MakeRandomK8sName("sink")
failer := feature.MakeK8sNamePrefix("failer")
cs := feature.MakeRandomK8sName("containersource")
name := feature.MakeRandomK8sName("channel")

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
f.Setup("install failing receiver", eventshub.Install(failer, eventshub.StartReceiver, eventshub.DropFirstN(1)))
f.Setup("install channel", channel.Install(name,
channel.WithTemplate(),
delivery.WithDeadLetterSink(svc.AsKReference(sink), "")),
)
f.Setup("install containersource", containersource.Install(cs, source.WithSink(channel.AsRef(name), "")))
f.Setup("install subscription", subscription.Install(feature.MakeRandomK8sName("subscription"),
subscription.WithChannel(channel.AsRef(name)),
createSubscriberFn(svc.AsKReference(failer), ""),
))

f.Setup("channel is ready", channel.IsReady(name))
f.Setup("containersource is ready", containersource.IsReady(cs))

f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel.GVR()))

f.Assert("dls receives events", assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")).AtLeast(1))
f.Assert("dls receives events", assert.OnStore(sink).
MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")).
AtLeast(1),
)

return f
}
Expand Down
48 changes: 47 additions & 1 deletion test/rekt/resources/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ package channel
import (
"context"
"embed"
"encoding/json"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/manifest"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/channel_impl"
)

//go:embed *.yaml
Expand Down Expand Up @@ -81,3 +87,43 @@ func AsRef(name string) *duckv1.KReference {
Name: name,
}
}

// WithTemplate adds channelTemplate to the Channel's config after apply the provided
// options.
func WithTemplate(options ...messagingv1.ChannelTemplateSpecOption) manifest.CfgFn {
return func(m map[string]interface{}) {
t := withTemplate(options...)
channelTemplate := map[string]interface{}{
"apiVersion": t.APIVersion,
"kind": t.Kind,
}
m["channelTemplate"] = channelTemplate
if t.Spec != nil {
s := map[string]string{}
bytes, err := t.Spec.MarshalJSON()
if err != nil {
panic(fmt.Errorf("failed to marshal spec: %w", err))
}
if err := json.Unmarshal(bytes, &s); err != nil {
panic(fmt.Errorf("failed to unmarshal spec '%s': %v", bytes, err))
}
channelTemplate["spec"] = s
}
}
}

func withTemplate(options ...messagingv1.ChannelTemplateSpecOption) *messagingv1.ChannelTemplateSpec {
gvk := channel_impl.GVK()
t := &messagingv1.ChannelTemplateSpec{
TypeMeta: metav1.TypeMeta{
Kind: gvk.Kind,
APIVersion: gvk.GroupVersion().String(),
},
}
for _, opt := range options {
if err := opt(t); err != nil {
panic(err)
}
}
return t
}
56 changes: 56 additions & 0 deletions test/rekt/resources/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ limitations under the License.
package channel_test

import (
"embed"
"encoding/json"
"os"

"k8s.io/apimachinery/pkg/runtime"
"knative.dev/reconciler-test/pkg/manifest"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/test/rekt/resources/channel"
)

// The following examples validate the processing of the With* helper methods
Expand Down Expand Up @@ -106,3 +112,53 @@ func Example_full() {
// backoffPolicy: exponential
// backoffDelay: "2007-03-01T13:00:00Z/P1Y2M10DT2H30M"
}

//go:embed *.yaml
var yaml embed.FS

func Example_withTemplate() {

spec := map[string]string{
"thing1": "value1",
"thing2": "value2",
}
bytesSpec, err := json.Marshal(spec)
if err != nil {
panic(err)
}

re := &runtime.RawExtension{
Raw: bytesSpec,
}

images := map[string]string{}
cfg := map[string]interface{}{
"name": "foo",
"namespace": "bar",
}
withTemplate := channel.WithTemplate(func(spec *messagingv1.ChannelTemplateSpec) error {
spec.Spec = re
return nil
})
withTemplate(cfg)

files, err := manifest.ExecuteYAML(yaml, images, cfg)
if err != nil {
panic(err)
}

manifest.OutputYAML(os.Stdout, files)
// Output:
// apiVersion: messaging.knative.dev/v1
// kind: Channel
// metadata:
// name: foo
// namespace: bar
// spec:
// channelTemplate:
// apiVersion: messaging.knative.dev/v1
// kind: InMemoryChannel
// spec:
// thing1: value1
// thing2: value2
}
34 changes: 32 additions & 2 deletions test/rekt/resources/channel_impl/channel_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ import (

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/delivery"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/manifest"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/delivery"
)

//go:embed *.yaml
Expand Down Expand Up @@ -89,6 +95,30 @@ func IsAddressable(name string, timing ...time.Duration) feature.StepFn {
return k8s.IsAddressable(GVR(), name, timing...)
}

// HasDeadLetterSinkURI asserts that the Channel has the resolved dead letter sink URI
// in the status.
func HasDeadLetterSinkURI(name string, gvr schema.GroupVersionResource) feature.StepFn {
return func(ctx context.Context, t feature.T) {
ns := environment.FromContext(ctx).Namespace()
ch, err := dynamicclient.Get(ctx).
Resource(gvr).
Namespace(ns).
Get(ctx, name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get %s/%s channel: %v", ns, name, err)
}

channelable := &eventingduck.Channelable{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(ch.UnstructuredContent(), channelable); err != nil {
t.Fatal(err)
}

if channelable.Status.DeadLetterSinkURI.String() == "" {
t.Fatalf("channel %s/%s has no dead letter sink uri in the status", ns, name)
}
}
}

// Address returns a Channel's address.
func Address(ctx context.Context, name string, timings ...time.Duration) (*apis.URL, error) {
return addressable.Address(ctx, GVR(), name, timings...)
Expand Down

0 comments on commit 61f9bc7

Please sign in to comment.