Skip to content

Commit

Permalink
Merge pull request #1122 from Particular/transport-pump-stop-release-3.2
Browse files Browse the repository at this point in the history
Transport pump stop for release 3.2
  • Loading branch information
tamararivera authored Dec 19, 2024
2 parents bf2cb4a + 1fb094d commit 9e73e6e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/Transport/AzureServiceBusTransportInfrastructure.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace NServiceBus.Transport.AzureServiceBus
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus
{
using System.Linq;
using System.Text;
Expand All @@ -16,7 +18,12 @@ sealed class AzureServiceBusTransportInfrastructure : TransportInfrastructure
readonly ServiceBusClient defaultClient;
readonly (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs;

public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transportSettings, HostSettings hostSettings, (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs, ServiceBusClient defaultClient)
public AzureServiceBusTransportInfrastructure(
AzureServiceBusTransport transportSettings,
HostSettings hostSettings,
(ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs,
ServiceBusClient defaultClient
)
{
this.transportSettings = transportSettings;

Expand All @@ -26,7 +33,10 @@ public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transport

messageSenderRegistry = new MessageSenderRegistry(defaultClient);

Dispatcher = new MessageDispatcher(messageSenderRegistry, transportSettings.Topology.TopicToPublishTo);
Dispatcher = new MessageDispatcher(
messageSenderRegistry,
transportSettings.Topology.TopicToPublishTo
);
Receivers = receiveSettingsAndClientPairs.ToDictionary(static settingsAndClient =>
{
var (receiveSettings, _) = settingsAndClient;
Expand Down Expand Up @@ -65,15 +75,16 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, ServiceBusCl
hostSettings.CriticalErrorAction,
receiveSettings.UsePublishSubscribe
? new SubscriptionManager(receiveAddress, transportSettings, defaultClient)
: null);
: null
);
}

public override async Task Shutdown(CancellationToken cancellationToken = default)
{
if (messageSenderRegistry != null)
{
await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false);
}
await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)))
.ConfigureAwait(false);

await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false);

foreach (var (_, serviceBusClient) in receiveSettingsAndClientPairs)
{
Expand Down
6 changes: 6 additions & 0 deletions src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ void UpdateProcessingCapacity(int maxConcurrency)

public async Task StopReceive(CancellationToken cancellationToken = default)
{
if (messageProcessingCancellationTokenSource is null)
{
// Receiver hasn't been started or is already stopped
return;
}

// Wiring up the stop token to trigger the cancellation token that is being
// used inside the message handling pipeline
using var _ = cancellationToken
Expand Down

0 comments on commit 9e73e6e

Please sign in to comment.