From 1fb094deedacd7071218849ba2837f0832e10df6 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Thu, 7 Nov 2024 23:36:26 -0800 Subject: [PATCH] Stop pumps on shutdown (#1094) * Stop all receivers * Cleanup * nullable enable * Stop might be called multiple times --------- Co-authored-by: Daniel Marbach (cherry picked from commit 0d0b36f915b0c9a7c0cd759b829a46ff9d19ad19) --- .../AzureServiceBusTransportInfrastructure.cs | 27 +++++++++++++------ src/Transport/Receiving/MessagePump.cs | 6 +++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/Transport/AzureServiceBusTransportInfrastructure.cs b/src/Transport/AzureServiceBusTransportInfrastructure.cs index abf8d5ed..d68da7cd 100644 --- a/src/Transport/AzureServiceBusTransportInfrastructure.cs +++ b/src/Transport/AzureServiceBusTransportInfrastructure.cs @@ -1,4 +1,6 @@ -namespace NServiceBus.Transport.AzureServiceBus +#nullable enable + +namespace NServiceBus.Transport.AzureServiceBus { using System.Linq; using System.Text; @@ -16,7 +18,12 @@ sealed class AzureServiceBusTransportInfrastructure : TransportInfrastructure readonly ServiceBusClient defaultClient; readonly (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs; - public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transportSettings, HostSettings hostSettings, (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs, ServiceBusClient defaultClient) + public AzureServiceBusTransportInfrastructure( + AzureServiceBusTransport transportSettings, + HostSettings hostSettings, + (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs, + ServiceBusClient defaultClient + ) { this.transportSettings = transportSettings; @@ -26,7 +33,10 @@ public AzureServiceBusTransportInfrastructure(AzureServiceBusTransport transport messageSenderRegistry = new MessageSenderRegistry(defaultClient); - Dispatcher = new MessageDispatcher(messageSenderRegistry, transportSettings.Topology.TopicToPublishTo); + Dispatcher = new MessageDispatcher( + messageSenderRegistry, + transportSettings.Topology.TopicToPublishTo + ); Receivers = receiveSettingsAndClientPairs.ToDictionary(static settingsAndClient => { var (receiveSettings, _) = settingsAndClient; @@ -65,15 +75,16 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, ServiceBusCl hostSettings.CriticalErrorAction, receiveSettings.UsePublishSubscribe ? new SubscriptionManager(receiveAddress, transportSettings, defaultClient) - : null); + : null + ); } public override async Task Shutdown(CancellationToken cancellationToken = default) { - if (messageSenderRegistry != null) - { - await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false); - } + await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken))) + .ConfigureAwait(false); + + await messageSenderRegistry.Close(cancellationToken).ConfigureAwait(false); foreach (var (_, serviceBusClient) in receiveSettingsAndClientPairs) { diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 30a63256..623593c9 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -199,6 +199,12 @@ void UpdateProcessingCapacity(int maxConcurrency) public async Task StopReceive(CancellationToken cancellationToken = default) { + if (messageProcessingCancellationTokenSource is null) + { + // Receiver hasn't been started or is already stopped + return; + } + // Wiring up the stop token to trigger the cancellation token that is being // used inside the message handling pipeline using var _ = cancellationToken