Skip to content

Commit

Permalink
Merge pull request #16 from dloukadakis/feature/support-ordering-key
Browse files Browse the repository at this point in the history
Support ordering key
  • Loading branch information
m110 authored Oct 9, 2021
2 parents e1ca59c + a437716 commit 312214d
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 9 deletions.
62 changes: 60 additions & 2 deletions pkg/googlecloud/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type MarshalerUnmarshaler interface {
Unmarshaler
}

func (m DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
func (DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}
Expand All @@ -51,7 +51,7 @@ func (m DefaultMarshalerUnmarshaler) Marshal(topic string, msg *message.Message)
return marshaledMsg, nil
}

func (u DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
metadata := make(message.Metadata, len(pubsubMsg.Attributes))

var id string
Expand All @@ -70,3 +70,61 @@ func (u DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*mess

return msg, nil
}

type GenerateOrderingKey func(topic string, msg *message.Message) (string, error)

type orderingMarshaler struct {
Marshaler

generateOrderingKey GenerateOrderingKey
}

func NewOrderingMarshaler(generateOrderingKey GenerateOrderingKey) Marshaler {
return &orderingMarshaler{
Marshaler: DefaultMarshalerUnmarshaler{},
generateOrderingKey: generateOrderingKey,
}
}

func (om orderingMarshaler) Marshal(topic string, msg *message.Message) (*pubsub.Message, error) {
marshaledMsg, err := om.Marshaler.Marshal(topic, msg)
if err != nil {
return nil, err
}

orderingKey, err := om.generateOrderingKey(topic, msg)
if err != nil {
return nil, errors.Wrap(err, "cannot generate ordering key")
}
marshaledMsg.OrderingKey = orderingKey

return marshaledMsg, nil
}

type ExtractOrderingKey func(orderingKey string, msg *message.Message) error

type orderingUnmarshaler struct {
Unmarshaler

extractOrderingKey ExtractOrderingKey
}

func NewOrderingUnmarshaler(extractOrderingKey ExtractOrderingKey) Unmarshaler {
return &orderingUnmarshaler{
Unmarshaler: DefaultMarshalerUnmarshaler{},
extractOrderingKey: extractOrderingKey,
}
}

func (ou orderingUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error) {
msg, err := ou.Unmarshaler.Unmarshal(pubsubMsg)
if err != nil {
return nil, err
}

if err := ou.extractOrderingKey(pubsubMsg.OrderingKey, msg); err != nil {
return nil, errors.Wrap(err, "cannot extract ordering key")
}

return msg, nil
}
3 changes: 3 additions & 0 deletions pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type PublisherConfig struct {
// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// Enables the topic message ordering
EnableMessageOrdering bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
Expand Down Expand Up @@ -170,6 +172,7 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e
}()

t = p.client.Topic(topic)
t.EnableMessageOrdering = p.config.EnableMessageOrdering

// todo: theoretically, one could want different publish settings per topic, which is supported by the client lib
// different instances of publisher may be used then
Expand Down
66 changes: 59 additions & 7 deletions pkg/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (

// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work

func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscriptionName googlecloud.SubscriptionNameFn) (message.Publisher, message.Subscriber) {
func newPubSub(t *testing.T, enableMessageOrdering bool, marshaler googlecloud.Marshaler, unmarshaler googlecloud.Unmarshaler, subscriptionName googlecloud.SubscriptionNameFn) (message.Publisher, message.Subscriber) {
logger := watermill.NewStdLogger(true, true)

publisher, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
Marshaler: marshaler,
EnableMessageOrdering: enableMessageOrdering,
Marshaler: marshaler,
},
logger,
)
Expand All @@ -34,9 +35,10 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
googlecloud.SubscriberConfig{
GenerateSubscriptionName: subscriptionName,
SubscriptionConfig: pubsub.SubscriptionConfig{
RetainAckedMessages: false,
RetainAckedMessages: false,
EnableMessageOrdering: enableMessageOrdering,
},
Unmarshaler: marshaler,
Unmarshaler: unmarshaler,
},
logger,
)
Expand All @@ -45,14 +47,44 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
return publisher, subscriber
}

func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
var defaultMarshalerUnmarshaler googlecloud.DefaultMarshalerUnmarshaler
return newPubSub(t, false, defaultMarshalerUnmarshaler, defaultMarshalerUnmarshaler, googlecloud.TopicSubscriptionName)
}

func createPubSubWithSubscriptionName(t *testing.T, subscriptionName string) (message.Publisher, message.Subscriber) {
return newPubSub(t, googlecloud.DefaultMarshalerUnmarshaler{},
var defaultMarshalerUnmarshaler googlecloud.DefaultMarshalerUnmarshaler
return newPubSub(t, false, defaultMarshalerUnmarshaler, defaultMarshalerUnmarshaler,
googlecloud.TopicSubscriptionNameWithSuffix(subscriptionName),
)
}

func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
return newPubSub(t, googlecloud.DefaultMarshalerUnmarshaler{}, googlecloud.TopicSubscriptionName)
func createPubSubWithOrdering(t *testing.T) (message.Publisher, message.Subscriber) {
return newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return "ordering_key", nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionName,
)
}

func createPubSubWithSubscriptionNameWithOrdering(t *testing.T, subscriptionName string) (message.Publisher, message.Subscriber) {
return newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return "ordering_key", nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionNameWithSuffix(subscriptionName),
)
}

func TestPublishSubscribe(t *testing.T) {
Expand All @@ -69,6 +101,26 @@ func TestPublishSubscribe(t *testing.T) {
)
}

func TestPublishSubscribeOrdering(t *testing.T) {
t.Skip("skipping because the emulator does not currently redeliver nacked messages when ordering is enabled")

if testing.Short() {
t.Skip("skipping long tests")
}

tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
Persistent: true,
},
createPubSubWithOrdering,
createPubSubWithSubscriptionNameWithOrdering,
)
}

func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
rand.Seed(time.Now().Unix())
testNumber := rand.Int()
Expand Down

0 comments on commit 312214d

Please sign in to comment.