Skip to content

Commit

Permalink
Fix immediate retries
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Sep 14, 2022
1 parent 38a730b commit 0c54957
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Fody" Version="6.6.0" PrivateAssets="All" />
<PackageReference Include="NServiceBus" Version="[7.2.3, 8.0.0)" />
<PackageReference Include="BitFaster.Caching" Version="[2.0.0, 3.0.0)" />
<PackageReference Include="Fody" Version="6.6.3" PrivateAssets="All" />
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="2.0.0" PrivateAssets="All" />
<PackageReference Include="RabbitMQ.Client" Version="[6.1.0, 7.0.0)" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public Dictionary<string, string> RetrieveHeaders(BasicDeliverEventArgs message)
deserializedHeaders[Headers.ReplyToAddress] = deserializedHeaders["NServiceBus.RabbitMQ.CallbackQueue"];
}

//These headers need to be removed so that they won't be copied to an outgoing message if this message gets forwarded
//They can't be removed before deserialization because the value is used by the message pump
deserializedHeaders.Remove("x-delivery-count");

return deserializedHeaders;
}

Expand Down
93 changes: 59 additions & 34 deletions src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
{
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using BitFaster.Caching.Lru;
using Extensibility;
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Events;
Expand All @@ -24,6 +26,7 @@ sealed class MessagePump : IPushMessages, IDisposable
readonly int prefetchMultiplier;
readonly ushort overriddenPrefetchCount;
readonly TimeSpan retryDelay;
readonly FastConcurrentLru<string, int> deliveryAttempts = new FastConcurrentLru<string, int>(100);

// Init
Func<MessageContext, Task> onMessage;
Expand Down Expand Up @@ -352,51 +355,43 @@ async Task Process(EventingBasicConsumer consumer, BasicDeliverEventArgs message

using (var tokenSource = new CancellationTokenSource())
{
var processed = false;
var errorHandled = false;
var numberOfDeliveryAttempts = 0;

while (!processed && !errorHandled)
try
{
try
{
var contextBag = new ContextBag();
contextBag.Set(message);
var contextBag = new ContextBag();
contextBag.Set(message);

var messageContext = new MessageContext(messageId, headers, messageBody ?? new byte[0], transportTransaction, tokenSource, contextBag);
var messageContext = new MessageContext(messageId, headers, messageBody ?? new byte[0], transportTransaction, tokenSource, contextBag);

await onMessage(messageContext).ConfigureAwait(false);
processed = true;
}
catch (Exception exception)
{
++numberOfDeliveryAttempts;
headers = messageConverter.RetrieveHeaders(message);
var contextBag = new ContextBag();
contextBag.Set(message);

var errorContext = new ErrorContext(exception, headers, messageId, messageBody ?? new byte[0], transportTransaction, numberOfDeliveryAttempts, contextBag);
await onMessage(messageContext).ConfigureAwait(false);
}
catch (Exception exception)
{
var numberOfDeliveryAttempts = GetDeliveryAttempts(message, messageId);
headers = messageConverter.RetrieveHeaders(message);
var contextBag = new ContextBag();
contextBag.Set(message);

try
{
errorHandled = await onError(errorContext).ConfigureAwait(false) == ErrorHandleResult.Handled;
var errorContext = new ErrorContext(exception, headers, messageId, messageBody ?? new byte[0], transportTransaction, numberOfDeliveryAttempts, contextBag);

if (!errorHandled)
{
headers = messageConverter.RetrieveHeaders(message);
}
}
catch (Exception ex)
try
{
var result = await onError(errorContext).ConfigureAwait(false);
if (result == ErrorHandleResult.RetryRequired)
{
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);

return;
}
}
catch (Exception ex)
{
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);

return;
}
}

if (processed && tokenSource.IsCancellationRequested)
if (tokenSource.IsCancellationRequested)
{
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
}
Expand All @@ -408,12 +403,42 @@ async Task Process(EventingBasicConsumer consumer, BasicDeliverEventArgs message
}
catch (AlreadyClosedException ex)
{
Logger.Warn($"Failed to acknowledge message '{messageId}' because the channel was closed. The message was returned to the queue.", ex);
if (Regex.IsMatch(ex.ShutdownReason.ReplyText, @"PRECONDITION_FAILED - delivery acknowledgement on channel [0-9]+ timed out\. Timeout value used: [0-9]+ ms\. This timeout value can be configured, see consumers doc guide to learn more"))
{
Logger.Warn($"Failed to acknowledge message '{messageId}' because the handler execution time exceeded the broker delivery acknowledgement timeout. Increase the length of the timeout on the broker. The message was returned to the queue.", ex);
}
else
{
Logger.Warn($"Failed to acknowledge message '{messageId}' because the channel was closed. The message was returned to the queue.", ex);
}
}
}
}
}

int GetDeliveryAttempts(BasicDeliverEventArgs message, string messageId)
{
var attempts = 1;

if (!message.Redelivered)
{
return attempts;
}

if (message.BasicProperties.Headers.TryGetValue("x-delivery-count", out var headerValue))
{
attempts = Convert.ToInt32(headerValue) + 1;
}
else
{
attempts = deliveryAttempts.GetOrAdd(messageId, k => 1);
attempts++;
deliveryAttempts.AddOrUpdate(messageId, attempts);
}

return attempts;
}

async Task MovePoisonMessage(EventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue)
{
try
Expand Down

0 comments on commit 0c54957

Please sign in to comment.