diff --git a/mw/rabbitmq/int_test.go b/mw/rabbitmq/int_test.go index 679dd96..10ac9c7 100644 --- a/mw/rabbitmq/int_test.go +++ b/mw/rabbitmq/int_test.go @@ -166,16 +166,17 @@ func Test_view(t *testing.T) { } } queueName := fmt.Sprintf("%s_%s_%s", c.qname, "dlq", "queue") - events, err := ar.view(ctx, queueName, c.viewCount, false) + viewEventsOnce, err := ar.view(ctx, queueName, c.viewCount, false) + viewEventsTwice, err := ar.view(ctx, queueName, c.viewCount, false) if err != nil { t.Errorf("error viewing messages: %v", err) } - if len(events) != c.expectedViewCount { - t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(events)) + if len(viewEventsOnce) != c.expectedViewCount { + t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(viewEventsOnce)) } - for idx, event := range events { - if string(event.Value) != c.expectedMessages[idx] { - t.Errorf("expected message %s but got %s", c.expectedMessages[0], string(event.Value)) + for idx, event := range c.expectedMessages { + if string(viewEventsOnce[idx].Value) != event || string(viewEventsTwice[idx].Value) != event { + t.Errorf("expected message %s but got %s", event, string(viewEventsOnce[idx].Value)) } } err = ar.DeleteQueuesAndExchanges(context.Background(), c.qname) diff --git a/mw/rabbitmq/retry.go b/mw/rabbitmq/retry.go index 74695b8..42ff933 100644 --- a/mw/rabbitmq/retry.go +++ b/mw/rabbitmq/retry.go @@ -246,11 +246,9 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack actualCount = q.Messages } events := make([]*ziggurat.Event, actualCount) - var consumedDeliveries []amqp.Delivery for i := 0; i < actualCount; i++ { msg, _, err := ch.Get(qnameWithType, false) - if err != nil { return []*ziggurat.Event{}, err } @@ -261,18 +259,13 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack return []*ziggurat.Event{}, err } - events[i] = &e - consumedDeliveries = append(consumedDeliveries, msg) - } - for _, delivery := range consumedDeliveries { var ackErr error if ack { - ackErr = delivery.Ack(true) - } else { - ackErr = delivery.Reject(true) + ackErr = msg.Ack(true) } - r.ogLogger.Error("", ackErr) + r.ogLogger.Error("", ackErr) + events[i] = &e } r.ogLogger.Error("auto retry view: channel close error:", ch.Close()) return events, nil