Skip to content

Commit

Permalink
Merge pull request #1121 from Particular/transport-pump-stop-release-4.0
Browse files Browse the repository at this point in the history
Transport pump stop for release 4.0
  • Loading branch information
tamararivera authored Dec 19, 2024
2 parents 68235c5 + 312a633 commit 3d34e8c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
26 changes: 18 additions & 8 deletions src/Transport/AzureServiceBusTransportInfrastructure.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace NServiceBus.Transport.AzureServiceBus
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus
{
using System.Linq;
using System.Text;
Expand All @@ -16,7 +18,12 @@ sealed class AzureServiceBusTransportInfrastructure : TransportInfrastructure
readonly ServiceBusClient defaultClient;
readonly (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs;

public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transportSettings, HostSettings hostSettings, (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs, ServiceBusClient defaultClient)
public AzureServiceBusTransportInfrastructure(
AzureServiceBusTransport transportSettings,
HostSettings hostSettings,
(ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs,
ServiceBusClient defaultClient
)
{
this.transportSettings = transportSettings;

Expand All @@ -26,7 +33,10 @@ public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transport

messageSenderRegistry = new MessageSenderRegistry(defaultClient);

Dispatcher = new MessageDispatcher(messageSenderRegistry, transportSettings.Topology.TopicToPublishTo);
Dispatcher = new MessageDispatcher(
messageSenderRegistry,
transportSettings.Topology.TopicToPublishTo
);
Receivers = receiveSettingsAndClientPairs.ToDictionary(static settingsAndClient =>
{
var (receiveSettings, _) = settingsAndClient;
Expand Down Expand Up @@ -70,10 +80,10 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, ServiceBusCl

public override async Task Shutdown(CancellationToken cancellationToken = default)
{
if (messageSenderRegistry != null)
{
await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false);
}
await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)))
.ConfigureAwait(false);

await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false);

foreach (var (_, serviceBusClient) in receiveSettingsAndClientPairs)
{
Expand All @@ -100,4 +110,4 @@ public override string ToTransportAddress(QueueAddress address)
return queue.ToString();
}
}
}
}
6 changes: 6 additions & 0 deletions src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ void UpdateProcessingCapacity(int maxConcurrency)

public async Task StopReceive(CancellationToken cancellationToken = default)
{
if (messageProcessingCancellationTokenSource is null)
{
// Receiver hasn't been started or is already stopped
return;
}

// Wiring up the stop token to trigger the cancellation token that is being
// used inside the message handling pipeline
await using var _ = cancellationToken
Expand Down

0 comments on commit 3d34e8c

Please sign in to comment.