diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs index 05261e61..f80a857c 100644 --- a/src/NMS.AMQP/NmsMessageConsumer.cs +++ b/src/NMS.AMQP/NmsMessageConsumer.cs @@ -361,8 +361,7 @@ private async Task DeliverNextPendingAsync() Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}"); } - // TODO: Apply redelivery policy - await DoAckExpiredAsync(envelope).Await(); + await DoAckRejectedAsync(envelope).Await(); } else { @@ -373,7 +372,9 @@ private async Task DeliverNextPendingAsync() await DoAckDeliveredAsync(envelope).Await(); else await AckFromReceiveAsync(envelope).Await(); - + + await ApplyRedeliveryPolicy(envelope).Await(); + try { Listener.Invoke(envelope.Message.Copy()); @@ -429,6 +430,18 @@ private bool IsRedeliveryExceeded(InboundMessageDispatch envelope) return false; } + + private int GetRedeliveryDelay(InboundMessageDispatch envelope) + { + Tracer.DebugFormat("Checking if envelope is redelivered"); + IRedeliveryPolicy redeliveryPolicy = Session.Connection.RedeliveryPolicy; + + if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) + return 0; + + var redeliveryDelay = redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount); + return redeliveryDelay; + } private Task DoAckReleasedAsync(InboundMessageDispatch envelope) { @@ -517,8 +530,7 @@ private async Task ReceiveInternalBaseAsync(int timeout, Func ReceiveInternalBaseAsync(int timeout, Func ReceiveInternalBaseAsync(int timeout, Func 0) + { + Tracer.DebugFormat("Envelope has been redelivered, apply redelivery policy wait {0} milliseconds", redeliveryDelay); + await Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await(); + } + } + private static long GetDeadline(int timeout) { @@ -578,6 +603,11 @@ private Task DoAckExpiredAsync(InboundMessageDispatch envelope) return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope); } + private Task DoAckRejectedAsync(InboundMessageDispatch envelope) + { + return Session.AcknowledgeAsync(AckType.REJECTED, envelope); + } + private void SetAcknowledgeCallback(InboundMessageDispatch envelope) { if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs index 99666c88..85989359 100644 --- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs +++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs @@ -332,6 +332,10 @@ public void Acknowledge(InboundMessageDispatch envelope, AckType ackType) receiverLink.Modify(message, true, true); RemoveMessage(envelope); break; + case AckType.REJECTED: + receiverLink.Reject(message); + RemoveMessage(envelope); + break; default: Tracer.ErrorFormat("Unsupported Ack Type for message: {0}", envelope); throw new ArgumentException($"Unsupported Ack Type for message: {envelope}"); diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs new file mode 100644 index 00000000..a492a5a4 --- /dev/null +++ b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Amqp.Framing; +using Apache.NMS; +using Apache.NMS.Policies; +using NMS.AMQP.Test.TestAmqp; +using NUnit.Framework; + +namespace NMS.AMQP.Test.Integration +{ + [TestFixture] + public class MessageRedeliveryPolicyIntegrationTest : IntegrationTestFixture + { + [Test, Timeout(20_000)] + public void TestIncomingDeliveryCountExceededMessageGetsRejected() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + IConnection connection = EstablishConnection(testPeer); + int initialRedeliveryDelay = 1000; + int clockResolution = 15; + connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay}; + connection.Start(); + + testPeer.ExpectBegin(); + + ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + IQueue queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } }); + testPeer.ExpectDispositionThatIsRejectedAndSettled(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + + IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf(m); + session.Recover(); + + DateTime startTimer = DateTime.UtcNow; + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.That(DateTime.UtcNow.Subtract(startTimer).TotalMilliseconds, Is.GreaterThanOrEqualTo(initialRedeliveryDelay - clockResolution)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + IConnection connection = EstablishConnection(testPeer); + int initialRedeliveryDelay = 1000; + connection.RedeliveryPolicy = new RedeliveryPolicy() { MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay}; + connection.Start(); + + testPeer.ExpectBegin(); + + ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + IQueue queue = session.GetQueue("myQueue"); + + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } }); + testPeer.ExpectDispositionThatIsRejectedAndSettled(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + + CountdownEvent success = new CountdownEvent(2); + + consumer.Listener += m => + { + session.Recover(); + success.Signal(); + }; + + Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(3)), "Didn't get expected messages"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + } +} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs index e61cc882..95ddce6e 100644 --- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs +++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs @@ -726,6 +726,11 @@ public void ExpectDispositionThatIsReleasedAndSettled() ExpectDisposition(settled: true, stateMatcher: stateMatcher); } + + public void ExpectDispositionThatIsRejectedAndSettled() + { + ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf); + } public void ExpectDisposition(bool settled, Action stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null) {