Skip to content

Commit

Permalink
Ttl fix (#2244) (#2249)
Browse files Browse the repository at this point in the history
* Fixed issue #2214 where CE 1.0 events get dropped when replying to the broker

* Fix comment

* NO returning context from SetTTL

* Changed DeleteTTL method signature to match with SetTTL
  • Loading branch information
grantr authored and knative-prow-robot committed Dec 3, 2019
1 parent e937685 commit 468eab4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 35 deletions.
12 changes: 7 additions & 5 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
}

// Remove the TTL attribute that is used by the Broker.
ttl, ttlKey := broker.GetTTL(event.Context.AsV03())
if ttl == nil {
ttl, err := broker.GetTTL(event.Context)
if err != nil {
// Only messages sent by the Broker should be here. If the attribute isn't here, then the
// event wasn't sent by the Broker, so we can drop it.
r.logger.Warn("No TTL seen, dropping", zap.Any("triggerRef", triggerRef), zap.Any("event", event))
Expand All @@ -181,7 +181,9 @@ func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
// framework returns a 500 to the caller, so the Channel would send this repeatedly.
return nil
}
event.SetExtension(ttlKey, nil)
if err := broker.DeleteTTL(event.Context); err != nil {
r.logger.Warn("Failed to delete TTL.", zap.Error(err))
}

r.logger.Debug("Received message", zap.Any("triggerRef", triggerRef))

Expand All @@ -197,8 +199,8 @@ func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
}

// Reattach the TTL (with the same value) to the response event before sending it to the Broker.
responseEvent.Context, err = broker.SetTTL(responseEvent.Context, ttl)
if err != nil {

if err := broker.SetTTL(responseEvent.Context, ttl); err != nil {
return err
}
resp.Event = responseEvent
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
h.requestReceived = true

for n, v := range h.headers {
if strings.Contains(strings.ToLower(n), strings.ToLower(broker.V03TTLAttribute)) {
if strings.Contains(strings.ToLower(n), strings.ToLower(broker.TTLAttribute)) {
h.t.Errorf("Broker TTL should not be seen by the subscriber: %s", n)
}
if diff := cmp.Diff(v, req.Header[n]); diff != "" {
Expand Down Expand Up @@ -571,7 +571,7 @@ func makeEvent() *cloudevents.Event {
}

func addTTLToEvent(e cloudevents.Event) cloudevents.Event {
e.Context, _ = broker.SetTTL(e.Context, 1)
broker.SetTTL(e.Context, 1)
return e
}

Expand Down
22 changes: 7 additions & 15 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"net/http"
"net/url"
"reflect"
"time"

cloudevents "github.com/cloudevents/sdk-go"
Expand All @@ -18,7 +17,7 @@ import (
var (
shutdownTimeout = 1 * time.Minute

defaultTTL = 255
defaultTTL int32 = 255
)

type Handler struct {
Expand Down Expand Up @@ -109,24 +108,17 @@ func (h *Handler) decrementTTL(event *cloudevents.Event) bool {
return false
}

var err error
event.Context, err = broker.SetTTL(event.Context, ttl)
if err != nil {
if err := broker.SetTTL(event.Context, ttl); err != nil {
h.Logger.Error("failed to set TTL", zap.Error(err))
}
return true
}

func (h *Handler) getTTLToSet(event *cloudevents.Event) int {
ttlInterface, _ := broker.GetTTL(event.Context)
if ttlInterface == nil {
h.Logger.Debug("No TTL found, defaulting")
func (h *Handler) getTTLToSet(event *cloudevents.Event) int32 {
ttl, err := broker.GetTTL(event.Context)
if err != nil {
h.Logger.Info("Error retrieving TTL, defaulting.", zap.Error(err))
return defaultTTL
}
// This should be a JSON number, which json.Unmarshalls as a float64.
ttl, ok := ttlInterface.(float64)
if !ok {
h.Logger.Info("TTL attribute wasn't a float64, defaulting", zap.Any("ttlInterface", ttlInterface), zap.Any("typeOf(ttlInterface)", reflect.TypeOf(ttlInterface)))
}
return int(ttl) - 1
return ttl - 1
}
28 changes: 15 additions & 13 deletions pkg/broker/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,34 @@
package broker

import (
"strings"

cloudevents "github.com/cloudevents/sdk-go"
cetypes "github.com/cloudevents/sdk-go/pkg/cloudevents/types"
)

const (
// V03TTLAttribute is the name of the CloudEvents 0.3 extension attribute used to store the
// TTLAttribute is the name of the CloudEvents extension attribute used to store the
// Broker's TTL (number of times a single event can reply through a Broker continuously). All
// interactions with the attribute should be done through the GetTTL and SetTTL functions.
V03TTLAttribute = "knativebrokerttl"
TTLAttribute = "knativebrokerttl"
)

// GetTTL finds the TTL in the EventContext using a case insensitive comparison
// for the key. The second return param, is the case preserved key that matched.
// Depending on the encoding/transport, the extension case could be changed.
func GetTTL(ctx cloudevents.EventContext) (interface{}, string) {
for k, v := range ctx.AsV03().Extensions {
if lower := strings.ToLower(k); lower == V03TTLAttribute {
return v, k
}
func GetTTL(ctx cloudevents.EventContext) (int32, error) {
ttl, err := ctx.GetExtension(TTLAttribute)
if err != nil {
return 0, err
}
return nil, V03TTLAttribute
return cetypes.ToInteger(ttl)
}

// SetTTL sets the TTL into the EventContext. ttl should be a positive integer.
func SetTTL(ctx cloudevents.EventContext, ttl interface{}) (cloudevents.EventContext, error) {
err := ctx.SetExtension(V03TTLAttribute, ttl)
return ctx, err
func SetTTL(ctx cloudevents.EventContext, ttl int32) error {
return ctx.SetExtension(TTLAttribute, ttl)
}

// DeleteTTL removes the TTL CE extension attribute
func DeleteTTL(ctx cloudevents.EventContext) error {
return ctx.SetExtension(TTLAttribute, nil)
}

0 comments on commit 468eab4

Please sign in to comment.