From bf938e96d8e6723ae0ee4566bf1aafbc05439599 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 25 Sep 2024 09:44:50 +0200 Subject: [PATCH] Track messages that successfully completed the message or error pipeline but failed to get acknowledged due to expired leases in receiveonly mode (#1044) * Track messages that successfully completed the message or error pipeline but failed to get acknowledged due to expired leases in receiveonly mode (#1034) * Update src/AcceptanceTests/Receiving/When_message_visibility_expired.cs Co-authored-by: Travis Nickels --------- Co-authored-by: Travis Nickels --- ...ort.AzureServiceBus.AcceptanceTests.csproj | 8 +- .../When_message_visibility_expired.cs | 109 ++++++++++++++ ...t.AzureServiceBus.CommandLine.Tests.csproj | 2 +- src/Tests/FakeProcessor.cs | 27 +++- src/Tests/FakeReceiver.cs | 12 +- ...Bus.Transport.AzureServiceBus.Tests.csproj | 12 +- src/Tests/Receiving/MessagePumpTests.cs | 137 ++++++++++++++++++ ...erviceBus.Transport.AzureServiceBus.csproj | 1 + src/Transport/Receiving/MessagePump.cs | 122 ++++++---------- .../ProcessMessageEventArgsExtensions.cs | 129 +++++++++++++++-- .../Utilities/TransactionExtensions.cs | 10 +- ...port.AzureServiceBus.TransportTests.csproj | 8 +- 12 files changed, 471 insertions(+), 106 deletions(-) create mode 100644 src/AcceptanceTests/Receiving/When_message_visibility_expired.cs diff --git a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj index 3a306724..ba61951c 100644 --- a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj +++ b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj @@ -13,9 +13,13 @@ - - + + + + + + diff --git a/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs b/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs new file mode 100644 index 00000000..65867542 --- /dev/null +++ b/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs @@ -0,0 +1,109 @@ +namespace NServiceBus.Transport.AzureServiceBus.AcceptanceTests +{ + using System; + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using Azure.Messaging.ServiceBus; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + + public class When_message_visibility_expired : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_complete_message_on_next_receive_when_pipeline_successful() + { + var ctx = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + // Limiting the concurrency for this test to make sure messages that are made available again are + // not concurrently processed. This is not necessary for the test to pass but it makes + // reasoning about the test easier. + c.LimitMessageProcessingConcurrencyTo(1); + }); + b.When((session, _) => session.SendLocal(new MyMessage())); + }) + .Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) + .Run(); + + var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); + + Assert.That(items, Is.Not.Empty); + } + + [Test] + public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message() + { + var ctx = await Scenario.Define(c => + { + c.ShouldThrow = true; + }) + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.CustomConfig(c => + { + var recoverability = c.Recoverability(); + recoverability.AddUnrecoverableException(); + + // Limiting the concurrency for this test to make sure messages that are made available again are + // not concurrently processed. This is not necessary for the test to pass but it makes + // reasoning about the test easier. + c.LimitMessageProcessingConcurrencyTo(1); + }); + b.When((session, _) => session.SendLocal(new MyMessage())); + }) + .Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) + .Run(); + + var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); + + Assert.That(items, Is.Not.Empty); + } + + static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c) + => l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed"); + + class Context : ScenarioContext + { + public bool ShouldThrow { get; set; } + + public string NativeMessageId { get; set; } + } + + class Receiver : EndpointConfigurationBuilder + { + public Receiver() => EndpointSetup(c => + { + var transport = c.ConfigureTransport(); + // Explicitly setting the transport transaction mode to ReceiveOnly because the message + // tracking only is implemented for this mode. + transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + } + + public class MyMessage : IMessage; + + class MyMessageHandler(Context testContext) : IHandleMessages + { + public async Task Handle(MyMessage message, IMessageHandlerContext context) + { + var messageEventArgs = context.Extensions.Get(); + // By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump + // has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus. + var serviceBusReceivedMessage = context.Extensions.Get(); + await messageEventArgs.AbandonMessageAsync(serviceBusReceivedMessage); + + testContext.NativeMessageId = serviceBusReceivedMessage.MessageId; + + if (testContext.ShouldThrow) + { + throw new InvalidOperationException("Simulated exception"); + } + } + } + } +} \ No newline at end of file diff --git a/src/CommandLineTests/NServiceBus.Transport.AzureServiceBus.CommandLine.Tests.csproj b/src/CommandLineTests/NServiceBus.Transport.AzureServiceBus.CommandLine.Tests.csproj index 56759473..3eabb009 100644 --- a/src/CommandLineTests/NServiceBus.Transport.AzureServiceBus.CommandLine.Tests.csproj +++ b/src/CommandLineTests/NServiceBus.Transport.AzureServiceBus.CommandLine.Tests.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Tests/FakeProcessor.cs b/src/Tests/FakeProcessor.cs index 2195fb63..80e38553 100644 --- a/src/Tests/FakeProcessor.cs +++ b/src/Tests/FakeProcessor.cs @@ -1,5 +1,8 @@ +#nullable enable + namespace NServiceBus.Transport.AzureServiceBus.Tests { + using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; @@ -21,7 +24,27 @@ public class FakeProcessor : ServiceBusProcessor return Task.CompletedTask; } - public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver = null, CancellationToken cancellationToken = default) - => OnProcessMessageAsync(new ProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken)); + public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver? receiver = null, CancellationToken cancellationToken = default) + { + var eventArgs = new CustomProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken); + receivedMessageToEventArgs.Add(message, eventArgs); + return OnProcessMessageAsync(eventArgs); + } + + readonly ConditionalWeakTable + receivedMessageToEventArgs = []; + + sealed class CustomProcessMessageEventArgs : ProcessMessageEventArgs + { + public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) : base(message, receiver, cancellationToken) + { + } + + public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) : base(message, receiver, identifier, cancellationToken) + { + } + + public Task RaiseMessageLockLost(MessageLockLostEventArgs args, CancellationToken cancellationToken = default) => OnMessageLockLostAsync(args); + } } } \ No newline at end of file diff --git a/src/Tests/FakeReceiver.cs b/src/Tests/FakeReceiver.cs index 304c2eaf..239caa7f 100644 --- a/src/Tests/FakeReceiver.cs +++ b/src/Tests/FakeReceiver.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests { + using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -9,6 +10,9 @@ public class FakeReceiver : ServiceBusReceiver { readonly List<(ServiceBusReceivedMessage, IDictionary propertiesToModify)> abandonedMessages = []; readonly List completedMessages = []; + readonly List completingMessages = []; + + public Func CompleteMessageCallback = (_, _) => Task.CompletedTask; public IReadOnlyCollection<(ServiceBusReceivedMessage, IDictionary propertiesToModify)> AbandonedMessages => abandonedMessages; @@ -16,6 +20,9 @@ public class FakeReceiver : ServiceBusReceiver public IReadOnlyCollection CompletedMessages => completedMessages; + public IReadOnlyCollection CompletingMessages + => completingMessages; + public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) { @@ -23,11 +30,12 @@ public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDic return Task.CompletedTask; } - public override Task CompleteMessageAsync(ServiceBusReceivedMessage message, + public override async Task CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken = default) { + completingMessages.Add(message); + await CompleteMessageCallback(message, cancellationToken); completedMessages.Add(message); - return Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj index 4a4b81ed..97b1d3c9 100644 --- a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj +++ b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj @@ -12,14 +12,18 @@ - - + - - + + + + + + + \ No newline at end of file diff --git a/src/Tests/Receiving/MessagePumpTests.cs b/src/Tests/Receiving/MessagePumpTests.cs index fc929b65..4d9cf13b 100644 --- a/src/Tests/Receiving/MessagePumpTests.cs +++ b/src/Tests/Receiving/MessagePumpTests.cs @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving { using System; using System.Collections.Generic; + using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; @@ -61,6 +62,142 @@ await pump.Initialize(new PushRuntimeSettings(1), (context, token) => Assert.That(pumpWasCalled, Is.False); } + [Test] + public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.CompletedTask; + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver); + await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver); + + Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage)); + Assert.That(fakeReceiver.AbandonedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.Zero); + } + + [Test] + public async Task Should_abandon_message_in_atomic_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.CompletedTask; + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == receivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(receivedMessage, fakeReceiver); + + Assert.Multiple(() => + { + Assert.That(fakeReceiver.AbandonedMessages.Select((tuple, _) => { var (message, _) = tuple; return message; }) + .ToList(), Does.Contain(receivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.EqualTo(1)); + }); + } + + [Test] + public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_error_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.FromException(new InvalidOperationException()); + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver); + await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver); + + Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage)); + Assert.That(fakeReceiver.AbandonedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.EqualTo(1)); + } + [Test] public async Task Should_abandon_message_upon_failure_with_retry_required() { diff --git a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj index 62145730..9614bed5 100644 --- a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj +++ b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index daafad33..f8608bfd 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using System.Transactions; using Azure.Messaging.ServiceBus; + using BitFaster.Caching.Lru; using Extensibility; using Logging; @@ -15,6 +16,7 @@ class MessagePump : IMessageReceiver readonly ReceiveSettings receiveSettings; readonly Action criticalErrorAction; readonly ServiceBusClient serviceBusClient; + readonly FastConcurrentLru messagesToBeCompleted = new(1_000); OnMessage onMessage; OnError onError; @@ -70,7 +72,7 @@ public async Task StartReceive(CancellationToken cancellationToken = default) var receiveOptions = new ServiceBusProcessorOptions { PrefetchCount = prefetchCount, - ReceiveMode = transportSettings.TransportTransactionMode == TransportTransactionMode.None + ReceiveMode = TransactionMode == TransportTransactionMode.None ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock, Identifier = $"Processor-{Id}-{ReceiveAddress}-{Guid.NewGuid()}", @@ -95,19 +97,16 @@ public async Task StartReceive(CancellationToken cancellationToken = default) criticalErrorAction("Failed to receive message from Azure Service Bus.", ex, messageProcessingCancellationTokenSource.Token); }, () => - { //We don't have to update the prefetch count since we are failing to receive anyway - processor.UpdateConcurrency(1); - }, - () => - { - processor.UpdateConcurrency(limitations.MaxConcurrency); - }); + processor.UpdateConcurrency(1), + () => processor.UpdateConcurrency(limitations.MaxConcurrency)); await processor.StartProcessingAsync(cancellationToken) .ConfigureAwait(false); } + TransportTransactionMode TransactionMode => transportSettings.TransportTransactionMode; + int CalculatePrefetchCount() { var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier; @@ -135,28 +134,17 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) { messageId = message.GetMessageId(); - if (processor.ReceiveMode == ServiceBusReceiveMode.PeekLock && message.LockedUntil < DateTimeOffset.UtcNow) + // Deliberately not using the cancellation token to make sure we abandon the message even when the + // cancellation token is already set. + if (await arg.TrySafeCompleteMessage(message, TransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) { - Logger.Warn( - $"Skip handling the message with id '{messageId}' because the lock has expired at '{message.LockedUntil}'. " + - "This is usually an indication that the endpoint prefetches more messages than it is able to handle within the configured" + - " peek lock duration. Consider tweaking the prefetch configuration to values that are better aligned with the concurrency" + - " of the endpoint and the time it takes to handle the messages."); + return; + } - try - { - // Deliberately not using the cancellation token to make sure we abandon the message even when the - // cancellation token is already set. - await arg.SafeAbandonMessageAsync(message, - transportSettings.TransportTransactionMode, - cancellationToken: CancellationToken.None) - .ConfigureAwait(false); - } - catch (Exception abandonException) - { - // nothing we can do about it, message will be retried - Logger.Debug($"Error abandoning the message with id '{messageId}' because the lock has expired at '{message.LockedUntil}.", abandonException); - } + // Deliberately not using the cancellation token to make sure we abandon the message even when the + // cancellation token is already set. + if (await arg.TrySafeAbandonMessage(message, TransactionMode, CancellationToken.None).ConfigureAwait(false)) + { return; } @@ -165,26 +153,7 @@ await arg.SafeAbandonMessageAsync(message, } catch (Exception ex) { - var tryDeadlettering = transportSettings.TransportTransactionMode != TransportTransactionMode.None; - - Logger.Warn($"Poison message detected. Message {(tryDeadlettering ? "will be moved to the poison queue" : "will be discarded, transaction mode is set to None")}. Exception: {ex.Message}", ex); - - if (tryDeadlettering) - { - try - { - await arg.DeadLetterMessageAsync(message, - deadLetterReason: "Poisoned message", - deadLetterErrorDescription: ex.Message, - cancellationToken: arg.CancellationToken) - .ConfigureAwait(false); - } - catch (Exception deadLetterEx) when (!deadLetterEx.IsCausedBy(arg.CancellationToken)) - { - // nothing we can do about it, message will be retried - Logger.Debug("Error dead lettering poisoned message.", deadLetterEx); - } - } + await arg.SafeDeadLetterMessage(message, TransactionMode, ex, CancellationToken.None).ConfigureAwait(false); return; } @@ -233,10 +202,10 @@ public async Task StopReceive(CancellationToken cancellationToken = default) { // Wiring up the stop token to trigger the cancellation token that is being // used inside the message handling pipeline - using var _ = cancellationToken + await using var _ = cancellationToken .Register(state => (state as CancellationTokenSource)?.Cancel(), messageProcessingCancellationTokenSource, - useSynchronizationContext: false); + useSynchronizationContext: false).ConfigureAwait(false); // Deliberately not passing the cancellation token forward in order to make sure // the processor waits until all processing handlers have returned. This makes // the code compliant to the previous version that uses manual receives and is aligned @@ -272,26 +241,24 @@ async Task ProcessMessage(ServiceBusReceivedMessage message, // args.CancellationToken is currently not used because the v8 version that supports cancellation was designed // to not flip the cancellation token until the very last moment in time when the stop token is flipped. var contextBag = new ContextBag(); + contextBag.Set(message); + contextBag.Set(processMessageEventArgs); try { - using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey)) - { - contextBag.Set(message); - contextBag.Set(processMessageEventArgs); + using var azureServiceBusTransaction = CreateTransaction(message.PartitionKey); + var messageContext = new MessageContext(messageId, headers, body, azureServiceBusTransaction.TransportTransaction, ReceiveAddress, contextBag); - var messageContext = new MessageContext(messageId, headers, body, azureServiceBusTransaction.TransportTransaction, ReceiveAddress, contextBag); + await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); - await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); - - await processMessageEventArgs.SafeCompleteMessageAsync(message, - transportSettings.TransportTransactionMode, - azureServiceBusTransaction, - cancellationToken: messageProcessingCancellationToken) - .ConfigureAwait(false); + await processMessageEventArgs.SafeCompleteMessage(message, + TransactionMode, + azureServiceBusTransaction, + messagesToBeCompleted, + cancellationToken: messageProcessingCancellationToken) + .ConfigureAwait(false); - azureServiceBusTransaction.Commit(); - } + azureServiceBusTransaction.Commit(); } catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken)) { @@ -308,9 +275,10 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message, if (result == ErrorHandleResult.Handled) { - await processMessageEventArgs.SafeCompleteMessageAsync(message, - transportSettings.TransportTransactionMode, + await processMessageEventArgs.SafeCompleteMessage(message, + TransactionMode, azureServiceBusTransaction, + messagesToBeCompleted, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -320,31 +288,27 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message, if (result == ErrorHandleResult.RetryRequired) { - await processMessageEventArgs.SafeAbandonMessageAsync(message, - transportSettings.TransportTransactionMode, + await processMessageEventArgs.SafeAbandonMessage(message, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } } - catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient || onErrorEx.Reason is ServiceBusFailureReason.MessageLockLost) + catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient || onErrorEx.Reason == ServiceBusFailureReason.MessageLockLost) { Logger.Debug("Failed to execute recoverability.", onErrorEx); - await processMessageEventArgs.SafeAbandonMessageAsync(message, - transportSettings.TransportTransactionMode, + await processMessageEventArgs.SafeAbandonMessage(message, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } - catch (Exception onErrorEx) when (onErrorEx.IsCausedBy(messageProcessingCancellationToken)) - { - throw; - } - catch (Exception onErrorEx) + catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(messageProcessingCancellationToken)) { criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorEx, messageProcessingCancellationToken); - await processMessageEventArgs.SafeAbandonMessageAsync(message, - transportSettings.TransportTransactionMode, + await processMessageEventArgs.SafeAbandonMessage(message, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -352,7 +316,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message, } AzureServiceBusTransportTransaction CreateTransaction(string incomingQueuePartitionKey) => - transportSettings.TransportTransactionMode == TransportTransactionMode.SendsAtomicWithReceive + TransactionMode == TransportTransactionMode.SendsAtomicWithReceive ? new AzureServiceBusTransportTransaction(serviceBusClient, incomingQueuePartitionKey, new TransactionOptions { diff --git a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs index 12f32663..211ec1cc 100644 --- a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs +++ b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs @@ -1,29 +1,140 @@ namespace NServiceBus.Transport.AzureServiceBus { + using System; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; + using BitFaster.Caching; + using Logging; static class ProcessMessageEventArgsExtensions { - public static async Task SafeCompleteMessageAsync(this ProcessMessageEventArgs args, + public static async ValueTask TrySafeCompleteMessage(this ProcessMessageEventArgs args, + ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, + ICache messagesToBeCompleted, + CancellationToken cancellationToken = default) + { + if (transportTransactionMode == TransportTransactionMode.ReceiveOnly && messagesToBeCompleted.TryGet(message.GetMessageId(), out _)) + { + Logger.DebugFormat("Received message with id '{0}' was marked as successfully completed. Trying to immediately acknowledge the message without invoking the pipeline.", message.GetMessageId()); + + try + { + await args.CompleteMessageAsync(message, cancellationToken: cancellationToken) + .ConfigureAwait(false); + return true; + } + // Doing a more generous catch here to make sure we are not losing the ID and can mark it to be completed another time + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) + { + messagesToBeCompleted.AddOrUpdate(message.GetMessageId(), true); + throw; + } + } + return false; + } + + public static async ValueTask TrySafeAbandonMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, - AzureServiceBusTransportTransaction azureServiceBusTransaction, CancellationToken cancellationToken = default) + { + // TransportTransactionMode.None uses ReceiveAndDelete mode which means the message is already removed from the queue + // once we get it. Therefore, we don't need to abandon it. + if (transportTransactionMode != TransportTransactionMode.None && message.LockedUntil < DateTimeOffset.UtcNow) + { + Logger.Warn( + $"Skip handling the message with id '{message.GetMessageId()}' because the lock has expired at '{message.LockedUntil}'. " + + "This is usually an indication that the endpoint prefetches more messages than it is able to handle within the configured" + + " peek lock duration. Consider tweaking the prefetch configuration to values that are better aligned with the concurrency" + + " of the endpoint and the time it takes to handle the messages."); + + try + { + await args.SafeAbandonMessage(message, transportTransactionMode, cancellationToken: cancellationToken) + .ConfigureAwait(false); + return true; + } + catch (Exception e) when (!e.IsCausedBy(cancellationToken)) + { + // nothing we can do about it, message will be retried + Logger.Debug($"Error abandoning the message with id '{message.GetMessageId()}' because the lock has expired at '{message.LockedUntil}.", e); + } + } + return false; + } + + public static async Task SafeDeadLetterMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + TransportTransactionMode transportTransactionMode, Exception exception, CancellationToken cancellationToken = default) { if (transportTransactionMode != TransportTransactionMode.None) { - using var scope = azureServiceBusTransaction.ToTransactionScope(); - await args.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); + Logger.Warn($"Poison message detected. Message will be moved to the poison queue. Exception: {exception.Message}", exception); + + try + { + await args.DeadLetterMessageAsync(message, + deadLetterReason: "Poisoned message", + deadLetterErrorDescription: exception.Message, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + catch (Exception deadLetterEx) when (!deadLetterEx.IsCausedBy(cancellationToken)) + { + // nothing we can do about it, message will be retried + Logger.Debug("Error dead lettering poisoned message.", deadLetterEx); + } + } + else + { + Logger.Warn($"Poison message detected. Message will be discarded, transaction mode is set to None. Exception: {exception.Message}", exception); + } + } - scope.Complete(); + public static async Task SafeCompleteMessage(this ProcessMessageEventArgs args, + ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, + AzureServiceBusTransportTransaction azureServiceBusTransaction, + ICache messagesToBeCompleted, + CancellationToken cancellationToken = default) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + try + { + using var scope = azureServiceBusTransaction.ToTransactionScope(); + await args.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); + scope.Complete(); + } + catch (ServiceBusException e) when (transportTransactionMode == TransportTransactionMode.ReceiveOnly && e.Reason == ServiceBusFailureReason.MessageLockLost) + { + // We tried to complete the message because it was successfully either by the pipeline or recoverability, but the lock was lost. + // To make sure we are not reprocessing it unnecessarily we are tracking the message ID and will complete it + // on the next receive. For SendsWithAtomicReceive it is necessary to throw which causes the rollback + // of the transaction and will trigger recoverability. + messagesToBeCompleted.AddOrUpdate(message.GetMessageId(), true); + } } } - public static Task SafeAbandonMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + public static async Task SafeAbandonMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, CancellationToken cancellationToken = default) - => transportTransactionMode != TransportTransactionMode.None - ? args.AbandonMessageAsync(message, cancellationToken: cancellationToken) - : Task.CompletedTask; + { + if (transportTransactionMode != TransportTransactionMode.None) + { + try + { + await args.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost) + { + // We tried to abandon the message because it needs to be retried, but the lock was lost. + // the message will reappear on the next receive anyway so we can just ignore this case. + Logger.DebugFormat("Attempted to abandon the message with id '{0}' but the lock was lost.", message.GetMessageId()); + } + } + } + + // The extension methods here are related to functionality of the message pump. Therefore the same logger name + // is used as the message pump. + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/Transport/Utilities/TransactionExtensions.cs b/src/Transport/Utilities/TransactionExtensions.cs index 2de71429..0d259703 100644 --- a/src/Transport/Utilities/TransactionExtensions.cs +++ b/src/Transport/Utilities/TransactionExtensions.cs @@ -1,14 +1,14 @@ -namespace NServiceBus.Transport.AzureServiceBus +#nullable enable + +namespace NServiceBus.Transport.AzureServiceBus { using System.Transactions; static class TransactionExtensions { - public static TransactionScope ToScope(this Transaction transaction) - { - return transaction != null + public static TransactionScope ToScope(this Transaction? transaction) => + transaction != null ? new TransactionScope(transaction, TransactionScopeAsyncFlowOption.Enabled) : new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled); - } } } \ No newline at end of file diff --git a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj index 36911bb2..51468008 100644 --- a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj @@ -10,9 +10,13 @@ - - + + + + + +