Skip to content

Commit

Permalink
Skip reject message explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
yashipro13 committed May 2, 2024
1 parent ebd9f69 commit b24b8e2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 16 deletions.
13 changes: 7 additions & 6 deletions mw/rabbitmq/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,17 @@ func Test_view(t *testing.T) {
}
}
queueName := fmt.Sprintf("%s_%s_%s", c.qname, "dlq", "queue")
events, err := ar.view(ctx, queueName, c.viewCount, false)
viewEventsOnce, err := ar.view(ctx, queueName, c.viewCount, false)
viewEventsTwice, err := ar.view(ctx, queueName, c.viewCount, false)
if err != nil {
t.Errorf("error viewing messages: %v", err)
}
if len(events) != c.expectedViewCount {
t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(events))
if len(viewEventsOnce) != c.expectedViewCount {
t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(viewEventsOnce))
}
for idx, event := range events {
if string(event.Value) != c.expectedMessages[idx] {
t.Errorf("expected message %s but got %s", c.expectedMessages[0], string(event.Value))
for idx, event := range c.expectedMessages {
if string(viewEventsOnce[idx].Value) != event || string(viewEventsTwice[idx].Value) != event {
t.Errorf("expected message %s but got %s", event, string(viewEventsOnce[idx].Value))
}
}
err = ar.DeleteQueuesAndExchanges(context.Background(), c.qname)
Expand Down
13 changes: 3 additions & 10 deletions mw/rabbitmq/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,9 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack
actualCount = q.Messages
}
events := make([]*ziggurat.Event, actualCount)
var consumedDeliveries []amqp.Delivery
for i := 0; i < actualCount; i++ {

msg, _, err := ch.Get(qnameWithType, false)

if err != nil {
return []*ziggurat.Event{}, err
}
Expand All @@ -261,18 +259,13 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack
return []*ziggurat.Event{}, err
}

events[i] = &e
consumedDeliveries = append(consumedDeliveries, msg)
}
for _, delivery := range consumedDeliveries {
var ackErr error
if ack {
ackErr = delivery.Ack(true)
} else {
ackErr = delivery.Reject(true)
ackErr = msg.Ack(true)
}
r.ogLogger.Error("", ackErr)

r.ogLogger.Error("", ackErr)
events[i] = &e
}
r.ogLogger.Error("auto retry view: channel close error:", ch.Close())
return events, nil
Expand Down

0 comments on commit b24b8e2

Please sign in to comment.