From 95c663629c3d68a2a26bc335fc6b67b477674976 Mon Sep 17 00:00:00 2001 From: Gab Satchi Date: Tue, 28 Feb 2023 02:17:53 -0500 Subject: [PATCH] [release-1.8] Fixes issue where a CE response is truncated #6782 (#6783) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cherry-pick of #6758 ```release-note 🐛 Fixes an issue where a Cloud Event in a response from a sink was truncated to 1024 bytes ``` --- pkg/channel/message_dispatcher.go | 18 +++-- pkg/channel/message_dispatcher_test.go | 63 +++++++++++++++++ test/rekt/broker_test.go | 15 +++++ test/rekt/features/broker/feature.go | 93 ++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 10 deletions(-) diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index 51d0f8c37c5..e7ef377084a 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -234,16 +234,16 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, } execInfo.Time = dispatchTime - body := make([]byte, attributes.KnativeErrorDataExtensionMaxLength) + body := new(bytes.Buffer) + _, readErr := body.ReadFrom(response.Body) if isFailure(response.StatusCode) { // Read response body into execInfo for failures - readLen, err := response.Body.Read(body) - if err != nil && err != io.EOF { - d.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(err)) + if readErr != nil && readErr != io.EOF { + d.logger.Error("failed to read response body", zap.Error(err)) execInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) } else { - execInfo.ResponseBody = body[:readLen] + execInfo.ResponseBody = body.Bytes() } _ = response.Body.Close() // Reject non-successful responses. @@ -251,13 +251,11 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, } var responseMessageBody []byte - // Read response body into responseMessage for message accepted - readLen, err := response.Body.Read(body) - if err != nil && err != io.EOF { - d.logger.Error("failed to read response body into cloudevents' Message", zap.Error(err)) + if readErr != nil && readErr != io.EOF { + d.logger.Error("failed to read response body", zap.Error(err)) responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error())) } else { - responseMessageBody = body[:readLen] + responseMessageBody = body.Bytes() } responseMessage := http.NewMessage(response.Header, io.NopCloser(bytes.NewReader(responseMessageBody))) diff --git a/pkg/channel/message_dispatcher_test.go b/pkg/channel/message_dispatcher_test.go index d52f1d69971..74229ae0c14 100644 --- a/pkg/channel/message_dispatcher_test.go +++ b/pkg/channel/message_dispatcher_test.go @@ -722,6 +722,69 @@ func TestDispatchMessage(t *testing.T) { }, lastReceiver: "deadLetter", }, + "no restriction on message response size": { + sendToDestination: true, + sendToReply: true, + hasDeadLetterSink: false, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + expectedReplyRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: strings.Repeat("a", 2000), + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusAccepted, + Header: map[string][]string{ + "do-not-passthrough": {"no"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: io.NopCloser(bytes.NewBufferString(strings.Repeat("a", 2000))), + }, + + lastReceiver: "reply", + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index 264714ccf6d..0a60aa6c840 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -166,3 +166,18 @@ func TestBrokerDeliverLongMessage(t *testing.T) { env.TestSet(ctx, t, broker.BrokerDeliverLongMessage()) } + +func TestBrokerDeliverLongResponseMessage(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.TestSet(ctx, t, broker.BrokerDeliverLongResponseMessage()) +} diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 85031f48abb..349cd728e48 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -259,3 +259,96 @@ func brokerSubscriberLongMessage() *feature.Feature { ) return f } + +/* +Following test sends an event to the first sink, Sink1, which will send a long response destined to Sink2. +The test will assert that the long response is received by Sink2 + +EventSource ---> Broker ---> Trigger1 ---> Sink1(Transformation) ---> Trigger2 --> Sink2 +*/ + +func BrokerDeliverLongResponseMessage() *feature.FeatureSet { + fs := &feature.FeatureSet{ + Name: "Knative Broker - Long Response Message", + + Features: []*feature.Feature{ + brokerSubscriberLongResponseMessage(), + }, + } + return fs +} + +func brokerSubscriberLongResponseMessage() *feature.Feature { + f := feature.NewFeatureNamed("Broker, chain of Triggers, long response message from first subscriber") + + source := feature.MakeRandomK8sName("source") + sink1 := feature.MakeRandomK8sName("sink1") + sink2 := feature.MakeRandomK8sName("sink2") + trigger1 := feature.MakeRandomK8sName("trigger1") + trigger2 := feature.MakeRandomK8sName("trigger2") + + eventSource1 := "source1" + eventSource2 := "source2" + eventType1 := "type1" + eventType2 := "type2" + eventBody := `{"msg":"eventBody"}` + transformedEventBody := `{"msg":"` + strings.Repeat("X", 36000) + `"}` + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetType(eventType1) + event.SetSource(eventSource1) + event.SetData(cloudevents.TextPlain, []byte(eventBody)) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + // Sink1 will transform the event so it can be filtered by Trigger2 + f.Setup("install sink1", eventshub.Install( + sink1, + eventshub.ReplyWithTransformedEvent(eventType2, eventSource2, transformedEventBody), + eventshub.StartReceiver, + )) + + f.Setup("install sink2", eventshub.Install(sink2, eventshub.StartReceiver)) + + // Install the Triggers with appropriate Sinks and filters + f.Setup("install trigger1", trigger.Install( + trigger1, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sink1), ""), + trigger.WithFilter(map[string]string{"type": eventType1, "source": eventSource1}), + )) + f.Setup("trigger1 goes ready", trigger.IsReady(trigger1)) + + f.Setup("install trigger2", trigger.Install( + trigger2, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sink2), ""), + trigger.WithFilter(map[string]string{"type": eventType2, "source": eventSource2}), + )) + f.Setup("trigger2 goes ready", trigger.IsReady(trigger2)) + + // Install the Source + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("receive long event on sink1 exactly once", + eventasssert.OnStore(sink1). + MatchEvent(test.HasData([]byte(eventBody))). + Exact(1), + ) + + f.Assert("receive long event on sink2 exactly once", + eventasssert.OnStore(sink2). + MatchEvent(test.HasData([]byte(transformedEventBody))). + Exact(1), + ) + + return f +}