Skip to content

Commit

Permalink
Track messages that successfully completed the message or error pipel…
Browse files Browse the repository at this point in the history
…ine but failed to get acknowledged due to expired leases in receiveonly mode (#1046)

* Track messages that successfully completed the message or error pipeline but failed to get acknowledged due to expired leases in receiveonly mode (#1034)

# Conflicts:
#	src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj
#	src/CommandLine/NServiceBus.Transport.AzureServiceBus.CommandLine.csproj
#	src/CommandLineTests/NServiceBus.Transport.AzureServiceBus.CommandLine.Tests.csproj
#	src/Tests/FakeReceiver.cs
#	src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj
#	src/Transport/NServiceBus.Transport.AzureServiceBus.csproj
#	src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj

* Update src/AcceptanceTests/Receiving/When_message_visibility_expired.cs

Co-authored-by: Travis Nickels <[email protected]>

---------

Co-authored-by: Travis Nickels <[email protected]>
  • Loading branch information
danielmarbach and TravisNickels authored Sep 25, 2024
1 parent 624a16b commit 8304add
Show file tree
Hide file tree
Showing 13 changed files with 487 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="8.0.3" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
115 changes: 115 additions & 0 deletions src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
namespace NServiceBus.Transport.AzureServiceBus.AcceptanceTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using Azure.Messaging.ServiceBus;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_message_visibility_expired : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.CustomConfig(c =>
{
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message()
{
var ctx = await Scenario.Define<Context>(c =>
{
c.ShouldThrow = true;
})
.WithEndpoint<Receiver>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
var recoverability = c.Recoverability();
recoverability.AddUnrecoverableException<InvalidOperationException>();

// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c)
=> l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed");

class Context : ScenarioContext
{
public bool ShouldThrow { get; set; }

public string NativeMessageId { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
var transport = c.ConfigureTransport<AzureServiceBusTransport>();
// Explicitly setting the transport transaction mode to ReceiveOnly because the message
// tracking only is implemented for this mode.
transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});
}

public class MyMessage : IMessage
{
}

class MyMessageHandler : IHandleMessages<MyMessage>
{
readonly Context _testContext;

public MyMessageHandler(Context testContext) => _testContext = testContext;

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var messageEventArgs = context.Extensions.Get<ProcessMessageEventArgs>();
// By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump
// has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus.
var serviceBusReceivedMessage = context.Extensions.Get<ServiceBusReceivedMessage>();
await messageEventArgs.AbandonMessageAsync(serviceBusReceivedMessage);

_testContext.NativeMessageId = serviceBusReceivedMessage.MessageId;

if (_testContext.ShouldThrow)
{
throw new InvalidOperationException("Simulated exception");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="4.0.2" />
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="4.1.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageReference Include="Azure.Identity" Version="1.11.2" />
<PackageReference Include="Particular.Packaging" Version="2.3.0" PrivateAssets="All" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Particular.Packaging" Version="4.1.0" PrivateAssets="All" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

<ItemGroup>
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
27 changes: 25 additions & 2 deletions src/Tests/FakeProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
Expand All @@ -21,7 +24,27 @@ public class FakeProcessor : ServiceBusProcessor
return Task.CompletedTask;
}

public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver = null, CancellationToken cancellationToken = default)
=> OnProcessMessageAsync(new ProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken));
public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver? receiver = null, CancellationToken cancellationToken = default)
{
var eventArgs = new CustomProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken);
receivedMessageToEventArgs.Add(message, eventArgs);
return OnProcessMessageAsync(eventArgs);
}

readonly ConditionalWeakTable<ServiceBusReceivedMessage, CustomProcessMessageEventArgs>
receivedMessageToEventArgs = new();

sealed class CustomProcessMessageEventArgs : ProcessMessageEventArgs
{
public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) : base(message, receiver, cancellationToken)
{
}

public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) : base(message, receiver, identifier, cancellationToken)
{
}

public Task RaiseMessageLockLost(MessageLockLostEventArgs args, CancellationToken cancellationToken = default) => OnMessageLockLostAsync(args);
}
}
}
12 changes: 10 additions & 2 deletions src/Tests/FakeReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,25 +10,32 @@ public class FakeReceiver : ServiceBusReceiver
{
readonly List<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> abandonedMessages = new();
readonly List<ServiceBusReceivedMessage> completedMessages = new();
readonly List<ServiceBusReceivedMessage> completingMessages = new();

public Func<ServiceBusReceivedMessage, CancellationToken, Task> CompleteMessageCallback = (_, _) => Task.CompletedTask;

public IReadOnlyCollection<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> AbandonedMessages
=> abandonedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletedMessages
=> completedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletingMessages
=> completingMessages;

public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default)
{
abandonedMessages.Add((message, propertiesToModify ?? new Dictionary<string, object>(0)));
return Task.CompletedTask;
}

public override Task CompleteMessageAsync(ServiceBusReceivedMessage message,
public override async Task CompleteMessageAsync(ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
completingMessages.Add(message);
await CompleteMessageCallback(message, cancellationToken);
completedMessages.Add(message);
return Task.CompletedTask;
}
}
}
17 changes: 9 additions & 8 deletions src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="8.0.8" />
<PackageReference Include="NServiceBus.Testing" Version="8.0.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="Particular.Approvals" Version="0.4.1" />
<PackageReference Include="PublicApiGenerator" Version="11.0.0" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
Loading

0 comments on commit 8304add

Please sign in to comment.