Skip to content

Commit

Permalink
Fix the view handler to ensure, messages are viewed from RMQ in corre…
Browse files Browse the repository at this point in the history
…ct FCFS order
  • Loading branch information
yashi committed Apr 30, 2024
1 parent 5c20309 commit ebd9f69
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
10 changes: 10 additions & 0 deletions mw/rabbitmq/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func Test_view(t *testing.T) {
viewCount int
expectedViewCount int
name string
expectedMessages []string
}

cases := []test{
Expand All @@ -121,27 +122,31 @@ func Test_view(t *testing.T) {
publishCount: 5,
viewCount: 5,
expectedViewCount: 5,
expectedMessages: []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-4"},
},
{
name: "read excess number of messages than there are in the queue",
qname: "foo",
publishCount: 5,
viewCount: 10,
expectedViewCount: 5,
expectedMessages: []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-4"},
},
{
name: "read negative number of messages",
qname: "foo",
publishCount: 5,
viewCount: -1,
expectedViewCount: 0,
expectedMessages: []string{},
},
{
name: "read zero messages",
qname: "foo",
viewCount: 0,
publishCount: 5,
expectedViewCount: 0,
expectedMessages: []string{},
}}

for _, c := range cases {
Expand All @@ -168,6 +173,11 @@ func Test_view(t *testing.T) {
if len(events) != c.expectedViewCount {
t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(events))
}
for idx, event := range events {
if string(event.Value) != c.expectedMessages[idx] {
t.Errorf("expected message %s but got %s", c.expectedMessages[0], string(event.Value))
}
}
err = ar.DeleteQueuesAndExchanges(context.Background(), c.qname)
if err != nil {
t.Errorf("error deleting queues:%v", err)
Expand Down
13 changes: 9 additions & 4 deletions mw/rabbitmq/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,11 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack
actualCount = q.Messages
}
events := make([]*ziggurat.Event, actualCount)
var consumedDeliveries []amqp.Delivery
for i := 0; i < actualCount; i++ {

msg, _, err := ch.Get(qnameWithType, false)

if err != nil {
return []*ziggurat.Event{}, err
}
Expand All @@ -259,15 +261,18 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack
return []*ziggurat.Event{}, err
}

events[i] = &e
consumedDeliveries = append(consumedDeliveries, msg)
}
for _, delivery := range consumedDeliveries {
var ackErr error
if ack {
ackErr = msg.Ack(true)
ackErr = delivery.Ack(true)
} else {
ackErr = msg.Reject(true)
ackErr = delivery.Reject(true)
}

r.ogLogger.Error("", ackErr)
events[i] = &e

}
r.ogLogger.Error("auto retry view: channel close error:", ch.Close())
return events, nil
Expand Down

0 comments on commit ebd9f69

Please sign in to comment.