Skip to content

Commit

Permalink
[release-1.9] Fixes issue where a CE response is truncated (#6784)
Browse files Browse the repository at this point in the history
```release-note
🐛 Fixes an issue where a Cloud Event in a response from a sink was truncated to 1024 bytes
```
  • Loading branch information
gab-satchi authored Feb 28, 2023
1 parent 4e6fff4 commit f634181
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 10 deletions.
18 changes: 8 additions & 10 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,30 +239,28 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context,
}
execInfo.Time = dispatchTime

body := make([]byte, attributes.KnativeErrorDataExtensionMaxLength)
body := new(bytes.Buffer)
_, readErr := body.ReadFrom(response.Body)

if isFailure(response.StatusCode) {
// Read response body into execInfo for failures
readLen, err := response.Body.Read(body)
if err != nil && err != io.EOF {
d.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(err))
if readErr != nil && readErr != io.EOF {
d.logger.Error("failed to read response body", zap.Error(err))
execInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error()))
} else {
execInfo.ResponseBody = body[:readLen]
execInfo.ResponseBody = body.Bytes()
}
_ = response.Body.Close()
// Reject non-successful responses.
return ctx, nil, nil, &execInfo, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", response.StatusCode)
}

var responseMessageBody []byte
// Read response body into responseMessage for message accepted
readLen, err := response.Body.Read(body)
if err != nil && err != io.EOF {
d.logger.Error("failed to read response body into cloudevents' Message", zap.Error(err))
if readErr != nil && readErr != io.EOF {
d.logger.Error("failed to read response body", zap.Error(err))
responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error()))
} else {
responseMessageBody = body[:readLen]
responseMessageBody = body.Bytes()
}
responseMessage := http.NewMessage(response.Header, io.NopCloser(bytes.NewReader(responseMessageBody)))

Expand Down
63 changes: 63 additions & 0 deletions pkg/channel/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,69 @@ func TestDispatchMessage(t *testing.T) {
},
lastReceiver: "deadLetter",
},
"no restriction on message response size": {
sendToDestination: true,
sendToReply: true,
hasDeadLetterSink: false,
header: map[string][]string{
// do-not-forward should not get forwarded.
"do-not-forward": {"header"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
},
body: "destination",
eventExtensions: map[string]string{
"abc": `"ce-abc-value"`,
},
expectedDestRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
"prefer": {"reply"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: `"destination"`,
},
expectedReplyRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: strings.Repeat("a", 2000),
},
fakeResponse: &http.Response{
StatusCode: http.StatusAccepted,
Header: map[string][]string{
"do-not-passthrough": {"no"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"2002-10-02T15:00:00Z"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: io.NopCloser(bytes.NewBufferString(strings.Repeat("a", 2000))),
},

lastReceiver: "reply",
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,18 @@ func TestBrokerDeliverLongMessage(t *testing.T) {

env.TestSet(ctx, t, broker.BrokerDeliverLongMessage())
}

func TestBrokerDeliverLongResponseMessage(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.TestSet(ctx, t, broker.BrokerDeliverLongResponseMessage())
}
93 changes: 93 additions & 0 deletions test/rekt/features/broker/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,96 @@ func brokerSubscriberLongMessage() *feature.Feature {
)
return f
}

/*
Following test sends an event to the first sink, Sink1, which will send a long response destined to Sink2.
The test will assert that the long response is received by Sink2
EventSource ---> Broker ---> Trigger1 ---> Sink1(Transformation) ---> Trigger2 --> Sink2
*/

func BrokerDeliverLongResponseMessage() *feature.FeatureSet {
fs := &feature.FeatureSet{
Name: "Knative Broker - Long Response Message",

Features: []*feature.Feature{
brokerSubscriberLongResponseMessage(),
},
}
return fs
}

func brokerSubscriberLongResponseMessage() *feature.Feature {
f := feature.NewFeatureNamed("Broker, chain of Triggers, long response message from first subscriber")

source := feature.MakeRandomK8sName("source")
sink1 := feature.MakeRandomK8sName("sink1")
sink2 := feature.MakeRandomK8sName("sink2")
trigger1 := feature.MakeRandomK8sName("trigger1")
trigger2 := feature.MakeRandomK8sName("trigger2")

eventSource1 := "source1"
eventSource2 := "source2"
eventType1 := "type1"
eventType2 := "type2"
eventBody := `{"msg":"eventBody"}`
transformedEventBody := `{"msg":"` + strings.Repeat("X", 36000) + `"}`
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetType(eventType1)
event.SetSource(eventSource1)
event.SetData(cloudevents.TextPlain, []byte(eventBody))

//Install the broker
brokerName := feature.MakeRandomK8sName("broker")
f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Requirement("broker is ready", broker.IsReady(brokerName))
f.Requirement("broker is addressable", broker.IsAddressable(brokerName))

// Sink1 will transform the event so it can be filtered by Trigger2
f.Setup("install sink1", eventshub.Install(
sink1,
eventshub.ReplyWithTransformedEvent(eventType2, eventSource2, transformedEventBody),
eventshub.StartReceiver,
))

f.Setup("install sink2", eventshub.Install(sink2, eventshub.StartReceiver))

// Install the Triggers with appropriate Sinks and filters
f.Setup("install trigger1", trigger.Install(
trigger1,
brokerName,
trigger.WithSubscriber(svc.AsKReference(sink1), ""),
trigger.WithFilter(map[string]string{"type": eventType1, "source": eventSource1}),
))
f.Setup("trigger1 goes ready", trigger.IsReady(trigger1))

f.Setup("install trigger2", trigger.Install(
trigger2,
brokerName,
trigger.WithSubscriber(svc.AsKReference(sink2), ""),
trigger.WithFilter(map[string]string{"type": eventType2, "source": eventSource2}),
))
f.Setup("trigger2 goes ready", trigger.IsReady(trigger2))

// Install the Source
f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.InputEvent(event),
))

f.Assert("receive long event on sink1 exactly once",
eventasssert.OnStore(sink1).
MatchEvent(test.HasData([]byte(eventBody))).
Exact(1),
)

f.Assert("receive long event on sink2 exactly once",
eventasssert.OnStore(sink2).
MatchEvent(test.HasData([]byte(transformedEventBody))).
Exact(1),
)

return f
}

0 comments on commit f634181

Please sign in to comment.