From b7d86c8b721b256d76e08808200d0d3e729038ef Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:56:09 -0400 Subject: [PATCH] Bump RabbitMQ.Client from 7.0.0-rc.11 to 7.0.0-rc.14 in /src (#1472) * Bump RabbitMQ.Client from 7.0.0-rc.11 to 7.0.0-rc.13 in /src Bumps [RabbitMQ.Client](https://github.com/rabbitmq/rabbitmq-dotnet-client) from 7.0.0-rc.11 to 7.0.0-rc.13. - [Release notes](https://github.com/rabbitmq/rabbitmq-dotnet-client/releases) - [Changelog](https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/main/CHANGELOG.md) - [Commits](https://github.com/rabbitmq/rabbitmq-dotnet-client/compare/v7.0.0-rc.11...v7.0.0-rc.13) --- updated-dependencies: - dependency-name: RabbitMQ.Client dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Adjust to new changes * Ignore PS0004 * Remove client header when forwarding * Fix editorconfig * Fix compiler warnings * Remove custom tracking * Disable rate limiting for now * Async disposable * 7.0.0-rc.14 code changes. (#1475) * Small adjustments * Tweaks --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: danielmarbach Co-authored-by: Luke Bakken Co-authored-by: Brandon Ording --- ....Transport.RabbitMQ.AcceptanceTests.csproj | 2 +- .../.editorconfig | 8 +- .../MigrateQueue/QueueMigrateToQuorumTests.cs | 39 ++-- .../Commands/Delays/DelaysMigrateCommand.cs | 35 +-- .../Commands/Queue/QueueMigrateCommand.cs | 44 ++-- ...eBus.Transport.RabbitMQ.CommandLine.csproj | 2 +- .../.editorconfig | 3 + .../Connection/ChannelProviderTests.cs | 58 ++--- ...ServiceBus.Transport.RabbitMQ.Tests.csproj | 2 +- ...s.Transport.RabbitMQ.TransportTests.csproj | 2 +- .../Connection/ChannelProvider.cs | 35 +-- .../Connection/ConfirmsAwareChannel.cs | 219 +++--------------- .../Connection/ConnectionFactory.cs | 18 +- .../NServiceBus.Transport.RabbitMQ.csproj | 2 +- .../RabbitMQTransportInfrastructure.cs | 2 +- .../Receiving/MessageConverter.cs | 5 +- .../Receiving/MessagePump.cs | 34 ++- .../Sending/BasicPropertiesExtensions.cs | 14 -- .../Sending/MessageDispatcher.cs | 11 +- 19 files changed, 165 insertions(+), 370 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj index 85c8921f0..0732b81e4 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig index 54b1eb36a..e2c832792 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig @@ -2,4 +2,10 @@ # Justification: Test project dotnet_diagnostic.CA2007.severity = none -dotnet_diagnostic.PS0018.severity = none + +# may be enabled in future +dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext +dotnet_diagnostic.PS0004.severity = none # Make the CancellationToken parameter required + +# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken +dotnet_diagnostic.NSB0002.severity = suggestion diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs index 51f741cda..b432cf736 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs @@ -55,7 +55,7 @@ public async Task Should_handle_failure_after_unbind() await PrepareTestEndpoint(endpointName); - await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None); + await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken)); await ExecuteMigration(endpointName); @@ -340,12 +340,10 @@ Task TryDeleteQueue(string queueName) => ExecuteBrokerCommand(async (channel, ca catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { } - }, - CancellationToken.None); + }); - Task CreateQueue(string queueName, bool quorum) - { - return ExecuteBrokerCommand(async (channel, cancellationToken) => + Task CreateQueue(string queueName, bool quorum) => + ExecuteBrokerCommand(async (channel, cancellationToken) => { var queueArguments = new Dictionary(); @@ -355,20 +353,15 @@ Task CreateQueue(string queueName, bool quorum) } await channel.QueueDeclareAsync(queueName, true, false, false, queueArguments, cancellationToken: cancellationToken); - }, - CancellationToken.None); - } + }); - Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken), CancellationToken.None); + Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken)); - Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None); + Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken)); - Task AddMessages(string queueName, int numMessages, Action modifications = null) - { - return ExecuteBrokerCommand(async (channel, cancellationToken) => + Task AddMessages(string queueName, int numMessages, Action modifications = null) => + ExecuteBrokerCommand(async (channel, cancellationToken) => { - await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken); - for (var i = 0; i < numMessages; i++) { var properties = new BasicProperties(); @@ -376,17 +369,14 @@ Task AddMessages(string queueName, int numMessages, Action mod modifications?.Invoke(properties); await channel.BasicPublishAsync(string.Empty, queueName, true, properties, ReadOnlyMemory.Empty, cancellationToken); - await channel.WaitForConfirmsOrDieAsync(cancellationToken); } - }, - CancellationToken.None); - } + }, new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null)); async Task MessageCount(string queueName) { uint messageCount = 0; - await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken), CancellationToken.None); + await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken)); return messageCount; } @@ -406,15 +396,14 @@ await ExecuteBrokerCommand(async (channel, cancellationToken) => { queueExists = false; } - }, - CancellationToken.None); + }); return queueExists; } - async Task ExecuteBrokerCommand(Func command, CancellationToken cancellationToken) + async Task ExecuteBrokerCommand(Func command, CreateChannelOptions createChannelOptions = default, CancellationToken cancellationToken = default) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken: cancellationToken); await command(channel, cancellationToken); } diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs index 18b0d8aba..43f366a02 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; using global::RabbitMQ.Client; - class DelaysMigrateCommand + class DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console) { const string poisonMessageQueue = "delays-migrate-poison-messages"; const string timeSentHeader = "NServiceBus.TimeSent"; @@ -39,18 +39,11 @@ public static Command CreateCommand() return command; } - public DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console) + async Task Run(CancellationToken cancellationToken) { - this.brokerConnection = brokerConnection; - this.routingTopology = routingTopology; - this.console = console; - } - - public async Task Run(CancellationToken cancellationToken = default) - { - using var connection = await brokerConnection.Create(cancellationToken); - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); - await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken); + await using var connection = await brokerConnection.Create(cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken); for (int currentDelayLevel = DelayInfrastructure.MaxLevel; currentDelayLevel >= 0 && !cancellationToken.IsCancellationRequested; currentDelayLevel--) { @@ -92,7 +85,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc } await channel.BasicPublishAsync(string.Empty, poisonMessageQueue, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken); - await channel.WaitForConfirmsOrDieAsync(cancellationToken); await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken); continue; @@ -135,7 +127,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc } await channel.BasicPublishAsync(publishExchange, newRoutingKey, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken); - await channel.WaitForConfirmsOrDieAsync(cancellationToken); await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken); processedMessages++; } @@ -172,18 +163,10 @@ static DateTimeOffset GetTimeSent(BasicGetResult message) return DateTimeOffset.ParseExact(timeSentString, dateTimeOffsetWireFormat, CultureInfo.InvariantCulture); } - static bool MessageIsInvalid(BasicGetResult? message) - { - return message == null - || message.BasicProperties == null - || message.BasicProperties.Headers == null - || !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader) - || !message.BasicProperties.Headers.ContainsKey(timeSentHeader); - } - - readonly BrokerConnection brokerConnection; - readonly IRoutingTopology routingTopology; - readonly IConsole console; + static bool MessageIsInvalid(BasicGetResult? message) => + message?.BasicProperties?.Headers == null + || !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader) + || !message.BasicProperties.Headers.ContainsKey(timeSentHeader); bool poisonQueueCreated = false; } diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs index 6f25849d7..541897f24 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs @@ -6,7 +6,7 @@ using global::RabbitMQ.Client; using global::RabbitMQ.Client.Exceptions; - class QueueMigrateCommand + class QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console) { public static Command CreateCommand() { @@ -32,20 +32,11 @@ public static Command CreateCommand() return command; } - public QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console) - { - this.queueName = queueName; - this.brokerConnection = brokerConnection; - this.console = console; - holdingQueueName = $"{queueName}-migration-temp"; - migrationState = new MigrationState(); - } - public async Task Run(CancellationToken cancellationToken = default) { console.WriteLine($"Starting migration of '{queueName}'"); - using var connection = await brokerConnection.Create(cancellationToken); + await using var connection = await brokerConnection.Create(cancellationToken); await migrationState.SetInitialMigrationStage(queueName, holdingQueueName, connection, cancellationToken); @@ -77,7 +68,8 @@ public async Task Run(CancellationToken cancellationToken = default) async Task MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken); console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'"); @@ -95,15 +87,12 @@ async Task MoveMessagesToHoldingQueue(IConnection connection, Ca console.WriteLine($"Unbound '{queueName}' from exchange '{queueName}' "); // move all existing messages to the holding queue - await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken); - var numMessagesMovedToHolding = await ProcessMessages( channel, queueName, - async (message, cancellationToken) => + async (message, token) => { - await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken); - await channel.WaitForConfirmsOrDieAsync(cancellationToken); + await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token); }, cancellationToken); @@ -114,7 +103,7 @@ async Task MoveMessagesToHoldingQueue(IConnection connection, Ca async Task DeleteMainQueue(IConnection connection, CancellationToken cancellationToken) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); if (await channel.MessageCountAsync(queueName, cancellationToken) > 0) { @@ -130,7 +119,7 @@ async Task DeleteMainQueue(IConnection connection, CancellationT async Task CreateMainQueueAsQuorum(IConnection connection, CancellationToken cancellationToken) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); await channel.QueueDeclareAsync(queueName, true, false, false, quorumQueueArguments, cancellationToken: cancellationToken); console.WriteLine($"Recreated '{queueName}' as a quorum queue"); @@ -140,7 +129,8 @@ async Task CreateMainQueueAsQuorum(IConnection connection, Cance async Task RestoreMessages(IConnection connection, CancellationToken cancellationToken) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null); + await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken); await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken); console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'"); @@ -151,8 +141,6 @@ async Task RestoreMessages(IConnection connection, CancellationT var messageIds = new Dictionary(); // move all messages in the holding queue back to the main queue - await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken); - var numMessageMovedBackToMain = await ProcessMessages( channel, holdingQueueName, @@ -174,7 +162,6 @@ async Task RestoreMessages(IConnection connection, CancellationT } await channel.BasicPublishAsync(string.Empty, queueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token); - await channel.WaitForConfirmsOrDieAsync(token); if (messageIdString != null) { @@ -190,7 +177,7 @@ async Task RestoreMessages(IConnection connection, CancellationT async Task CleanUpHoldingQueue(IConnection connection, CancellationToken cancellationToken) { - using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); if (await channel.MessageCountAsync(holdingQueueName, cancellationToken) != 0) { @@ -227,13 +214,10 @@ async Task ProcessMessages(IChannel channel, string sourceQueue, Func quorumQueueArguments = new() { { "x-queue-type", "quorum" } }; + static readonly Dictionary quorumQueueArguments = new() { { "x-queue-type", "quorum" } }; enum MigrationStage { diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj index f314a8613..322a2c176 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig b/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig index f9749f769..e895eab4f 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig @@ -2,7 +2,10 @@ # Justification: Test project dotnet_diagnostic.CA2007.severity = none + +# may be enabled in future dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required +dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext # Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken dotnet_diagnostic.NSB0002.severity = suggestion diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs index 911ff1a61..0076147db 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs @@ -18,7 +18,7 @@ public async Task Should_recover_connection_and_dispose_old_one_when_connection_ await channelProvider.CreateConnection(); var publishConnection = channelProvider.PublishConnections.Dequeue(); - publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); + await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); channelProvider.DelayTaskCompletionSource.SetResult(); @@ -37,7 +37,7 @@ public async Task Should_dispose_connection_when_disposed() await channelProvider.CreateConnection(); var publishConnection = channelProvider.PublishConnections.Dequeue(); - channelProvider.Dispose(); + await channelProvider.DisposeAsync(); Assert.That(publishConnection.WasDisposed, Is.True); } @@ -49,11 +49,11 @@ public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_ await channelProvider.CreateConnection(); var publishConnection = channelProvider.PublishConnections.Dequeue(); - publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); + await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); // Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing // to simulate a pending delay task - channelProvider.Dispose(); + await channelProvider.DisposeAsync(); await channelProvider.FireAndForgetAction(CancellationToken.None); @@ -68,14 +68,14 @@ public async Task Should_dispose_newly_established_connection() await channelProvider.CreateConnection(); var publishConnection = channelProvider.PublishConnections.Dequeue(); - publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); + await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test")); // This simulates the race of the reconnection loop being fired off with the delay task completed during // the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop // and await its completion after the channel provider has been disposed. var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None); channelProvider.DelayTaskCompletionSource.SetResult(); - channelProvider.Dispose(); + await channelProvider.DisposeAsync(); await fireAndForgetTask; @@ -121,29 +121,7 @@ class FakeConnection : IConnection public bool WasDisposed { get; private set; } - public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException(); - - public void Abort() => throw new NotImplementedException(); - - public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException(); - - public void Abort(TimeSpan timeout) => throw new NotImplementedException(); - - public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException(); - - public void Close() => throw new NotImplementedException(); - - public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException(); - - public void Close(TimeSpan timeout) => throw new NotImplementedException(); - - public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException(); - - public void HandleConnectionBlocked(string reason) => throw new NotImplementedException(); - - public void HandleConnectionUnblocked() => throw new NotImplementedException(); - - public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = new CancellationToken()) => throw new NotImplementedException(); + public Task CreateChannelAsync(CreateChannelOptions options = null, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ushort ChannelMax { get; } public IDictionary ClientProperties { get; } @@ -157,24 +135,24 @@ class FakeConnection : IConnection public IDictionary ServerProperties { get; } public IList ShutdownReport { get; } public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}"; + public event AsyncEventHandler CallbackExceptionAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler ConnectionShutdownAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler RecoverySucceededAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler ConnectionRecoveryErrorAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler ConsumerTagChangeAfterRecoveryAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler QueueNameChangedAfterRecoveryAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler RecoveringConsumerAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler ConnectionBlockedAsync = (_, _) => Task.CompletedTask; + public event AsyncEventHandler ConnectionUnblockedAsync = (_, _) => Task.CompletedTask; IEnumerable IConnection.ShutdownReport => throw new NotImplementedException(); - public event EventHandler CallbackException = (_, _) => { }; - public event EventHandler ConnectionBlocked = (_, _) => { }; - public event EventHandler ConnectionShutdown = (_, _) => { }; - public event EventHandler ConnectionUnblocked = (_, _) => { }; - public event EventHandler RecoverySucceeded = (_, _) => { }; - public event EventHandler ConnectionRecoveryError = (_, _) => { }; - public event EventHandler ConsumerTagChangeAfterRecovery = (_, _) => { }; - public event EventHandler QueueNameChangedAfterRecovery = (_, _) => { }; - public event EventHandler RecoveringConsumer = (_, _) => { }; - - public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args); + public Task RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdownAsync.Invoke(this, args); public Task UpdateSecretAsync(string newSecret, string reason, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default) => throw new NotImplementedException(); static int connectionCounter; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj index def66cb74..4b06edcec 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj @@ -23,7 +23,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj index d778a2f83..c8701d774 100644 --- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj @@ -19,7 +19,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs index 4ab33c163..8def9d181 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs @@ -7,9 +7,10 @@ namespace NServiceBus.Transport.RabbitMQ using System.Threading; using System.Threading.Tasks; using global::RabbitMQ.Client; + using global::RabbitMQ.Client.Events; using Logging; - class ChannelProvider : IDisposable + class ChannelProvider : IAsyncDisposable { public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology) { @@ -28,20 +29,23 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, async Task CreateConnectionWithShutdownListener(CancellationToken cancellationToken) { var newConnection = await CreatePublishConnection(cancellationToken).ConfigureAwait(false); - newConnection.ConnectionShutdown += Connection_ConnectionShutdown; + newConnection.ConnectionShutdownAsync += Connection_ConnectionShutdown; return newConnection; } - void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e) +#pragma warning disable PS0018 + Task Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e) +#pragma warning restore PS0018 { if (e.Initiator == ShutdownInitiator.Application || sender is null) { - return; + return Task.CompletedTask; } var connectionThatWasShutdown = (IConnection)sender; FireAndForget(cancellationToken => ReconnectSwallowingExceptions(connectionThatWasShutdown.ClientProvidedName, cancellationToken), stoppingTokenSource.Token); + return Task.CompletedTask; } async Task ReconnectSwallowingExceptions(string? connectionName, CancellationToken cancellationToken) @@ -95,7 +99,11 @@ public async ValueTask GetPublishChannel(CancellationToken return channel; } - channel?.Dispose(); + if (channel is not null) + { + await channel.DisposeAsync() + .ConfigureAwait(false); + } channel = new ConfirmsAwareChannel(connection, routingTopology); await channel.Initialize(cancellationToken).ConfigureAwait(false); @@ -103,26 +111,27 @@ public async ValueTask GetPublishChannel(CancellationToken return channel; } - public void ReturnPublishChannel(ConfirmsAwareChannel channel) + public ValueTask ReturnPublishChannel(ConfirmsAwareChannel channel, CancellationToken cancellationToken = default) { if (channel.IsOpen) { channels.Enqueue(channel); + return ValueTask.CompletedTask; } - else - { - channel.Dispose(); - } + + return channel.DisposeAsync(); } - public void Dispose() +#pragma warning disable PS0018 + public async ValueTask DisposeAsync() +#pragma warning restore PS0018 { if (disposed) { return; } - stoppingTokenSource.Cancel(); + await stoppingTokenSource.CancelAsync().ConfigureAwait(false); stoppingTokenSource.Dispose(); var oldConnection = Interlocked.Exchange(ref connection, null); @@ -130,7 +139,7 @@ public void Dispose() foreach (var channel in channels) { - channel.Dispose(); + await channel.DisposeAsync().ConfigureAwait(false); } disposed = true; diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs index c16d1856f..4594a99bf 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs @@ -1,15 +1,12 @@ namespace NServiceBus.Transport.RabbitMQ { using System; - using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using global::RabbitMQ.Client; - using global::RabbitMQ.Client.Events; - using NServiceBus.Logging; - sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IDisposable + sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IAsyncDisposable { public bool IsOpen => channel.IsOpen; @@ -17,212 +14,48 @@ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routi public async Task Initialize(CancellationToken cancellationToken = default) { - channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - - channel.BasicAcks += Channel_BasicAcks; - channel.BasicNacks += Channel_BasicNacks; - channel.BasicReturn += Channel_BasicReturn; - channel.ChannelShutdown += Channel_ModelShutdown; - - await channel.ConfirmSelectAsync(trackConfirmations: false, cancellationToken).ConfigureAwait(false); - } - - public async Task SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) - { - TaskCompletionSource taskCompletionSource; - - try - { - await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - - (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken); - await using var _ = registration.ConfigureAwait(false); - - properties.SetConfirmationId(channel.NextPublishSeqNo); - - if (properties.Headers != null && - properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue)) - { - var routingKey = - DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel); - - await routingTopology.BindToDelayInfrastructure(channel, address, - DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address), - cancellationToken).ConfigureAwait(false); - // The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure - await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true, - properties, message.Body, cancellationToken).ConfigureAwait(false); - } - else - { - await routingTopology.Send(channel, address, message, properties, cancellationToken) - .ConfigureAwait(false); - } - } - finally - { - sequenceNumberSemaphore.Release(); - } - - await taskCompletionSource.Task.ConfigureAwait(false); - } - - public async Task PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) - { - TaskCompletionSource taskCompletionSource; - - try - { - await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - - (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken); - await using var _ = registration.ConfigureAwait(false); - - properties.SetConfirmationId(channel.NextPublishSeqNo); - - await routingTopology.Publish(channel, type, message, properties, cancellationToken) - .ConfigureAwait(false); - } - finally - { - sequenceNumberSemaphore.Release(); - } - - await taskCompletionSource.Task.ConfigureAwait(false); + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null); + channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken: cancellationToken).ConfigureAwait(false); } - public async Task RawSendInCaseOfFailure(string address, ReadOnlyMemory body, BasicProperties properties, CancellationToken cancellationToken = default) + public async ValueTask SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) { - properties.Headers ??= new Dictionary(); - - TaskCompletionSource taskCompletionSource; - - try + if (properties.Headers != null && + properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue)) { - await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - - (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken); + var routingKey = + DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel); - await using var _ = registration.ConfigureAwait(false); - - properties.SetConfirmationId(channel.NextPublishSeqNo); - - await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken) - .ConfigureAwait(false); - } - finally - { - sequenceNumberSemaphore.Release(); - } - - await taskCompletionSource.Task.ConfigureAwait(false); - } - - (TaskCompletionSource, IAsyncDisposable) GetCancellableTaskCompletionSource(CancellationToken cancellationToken) - { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - // There is no need to capture the execution context therefore using UnsafeRegister - var registration = cancellationToken.UnsafeRegister(static state => - { - var (tcs, cancellationToken) = ((TaskCompletionSource, CancellationToken))state!; - tcs.TrySetCanceled(cancellationToken); - }, (tcs, cancellationToken)); - - var added = messages.TryAdd(channel.NextPublishSeqNo, tcs); - - if (!added) - { - throw new Exception($"Cannot publish a message with sequence number '{channel.NextPublishSeqNo}' on this channel. A message was already published on this channel with the same confirmation number."); - } - - return (tcs, registration); - } - - void Channel_BasicAcks(object sender, BasicAckEventArgs e) - { - if (!e.Multiple) - { - SetResult(e.DeliveryTag); + await routingTopology.BindToDelayInfrastructure(channel, address, + DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address), + cancellationToken).ConfigureAwait(false); + // The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure + await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true, + properties, message.Body, cancellationToken).ConfigureAwait(false); } else { - foreach (var message in messages) - { - if (message.Key <= e.DeliveryTag) - { - SetResult(message.Key); - } - } - } - } - - void Channel_BasicNacks(object sender, BasicNackEventArgs e) - { - if (!e.Multiple) - { - SetException(e.DeliveryTag, "Message rejected by broker."); - } - else - { - foreach (var message in messages) - { - if (message.Key <= e.DeliveryTag) - { - SetException(message.Key, "Message rejected by broker."); - } - } - } - } - - void Channel_BasicReturn(object sender, BasicReturnEventArgs e) - { - var message = $"Message could not be routed to {e.Exchange + e.RoutingKey}: {e.ReplyCode} {e.ReplyText}"; - - if (e.BasicProperties.TryGetConfirmationId(out var deliveryTag)) - { - SetException(deliveryTag, message); - } - else - { - Logger.Warn(message); + await routingTopology.Send(channel, address, message, properties, cancellationToken) + .ConfigureAwait(false); } } - void Channel_ModelShutdown(object sender, ShutdownEventArgs e) - { - do - { - foreach (var message in messages) - { - SetException(message.Key, $"Channel has been closed: {e}"); - } - } - while (!messages.IsEmpty); - } + public async ValueTask PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) => + await routingTopology.Publish(channel, type, message, properties, cancellationToken) + .ConfigureAwait(false); - void SetResult(ulong key) + public async ValueTask RawSendInCaseOfFailure(string address, ReadOnlyMemory body, BasicProperties properties, CancellationToken cancellationToken = default) { - if (messages.TryRemove(key, out var tcs)) - { - tcs.SetResult(); - } - } + properties.Headers ??= new Dictionary(); - void SetException(ulong key, string exceptionMessage) - { - if (messages.TryRemove(key, out var tcs)) - { - tcs.SetException(new Exception(exceptionMessage)); - } + await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken) + .ConfigureAwait(false); } - public void Dispose() => channel?.Dispose(); +#pragma warning disable PS0018 + public ValueTask DisposeAsync() => channel is not null ? channel.DisposeAsync() : ValueTask.CompletedTask; +#pragma warning restore PS0018 IChannel channel; - readonly ConcurrentDictionary messages = new(); - readonly SemaphoreSlim sequenceNumberSemaphore = new(1, 1); - - static readonly ILog Logger = LogManager.GetLogger(typeof(ConfirmsAwareChannel)); } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs index 3fd0aa6cc..5be2fd683 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs @@ -119,17 +119,27 @@ public async Task CreateConnection(string connectionName, Cancellat { var connection = await connectionFactory.CreateConnectionAsync(endpoints, connectionName, cancellationToken).ConfigureAwait(false); - connection.ConnectionBlocked += (sender, e) => Logger.WarnFormat("'{0}' connection blocked: {1}", connectionName, e.Reason); - connection.ConnectionUnblocked += (sender, e) => Logger.WarnFormat("'{0}' connection unblocked}", connectionName); + connection.ConnectionBlockedAsync += (sender, e) => + { + Logger.WarnFormat("'{0}' connection blocked: {1}", connectionName, e.Reason); + return Task.CompletedTask; + }; + connection.ConnectionUnblockedAsync += (sender, e) => + { + Logger.WarnFormat("'{0}' connection unblocked}", connectionName); + return Task.CompletedTask; + }; - connection.ConnectionShutdown += (sender, e) => + connection.ConnectionShutdownAsync += (sender, e) => { if (e.Initiator == ShutdownInitiator.Application && e.ReplyCode == 200) { - return; + return Task.CompletedTask; } Logger.WarnFormat("'{0}' connection shutdown: {1}", connectionName, e); + + return Task.CompletedTask; }; return connection; diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj index 345493190..e2ef50a8f 100644 --- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj +++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs index b603d5e0a..ddda3c9a4 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs @@ -70,7 +70,7 @@ public override async Task Shutdown(CancellationToken cancellationToken = defaul await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken))) .ConfigureAwait(false); - channelProvider.Dispose(); + await channelProvider.DisposeAsync().ConfigureAwait(false); } public override string ToTransportAddress(QueueAddress address) => TranslateAddress(address); diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs index d2b919a16..1fd1043c1 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs @@ -40,7 +40,8 @@ public Dictionary RetrieveHeaders(BasicDeliverEventArgs message) messageHeaders.Remove(DelayInfrastructure.XFirstDeathExchangeHeader); messageHeaders.Remove(DelayInfrastructure.XFirstDeathQueueHeader); messageHeaders.Remove(DelayInfrastructure.XFirstDeathReasonHeader); - messageHeaders.Remove(BasicPropertiesExtensions.ConfirmationIdHeader); + messageHeaders.Remove(LegacyConfirmationIdHeader); + messageHeaders.Remove(Constants.PublishSequenceNumberHeader); } // Leaving space for ReplyTo, CorrelationId, DeliveryMode, EnclosedMessageTypes conditionally @@ -166,5 +167,7 @@ static string ValueToString(object value) readonly Func messageIdStrategy; static readonly DateTimeOffset UnixEpoch = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero); + + public const string LegacyConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId"; } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs index 1bdb4b35f..2893e0a18 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs @@ -120,7 +120,7 @@ public async Task ChangeConcurrency(PushRuntimeSettings limitations, Cancellatio async Task ConnectToBroker(CancellationToken cancellationToken) { connection = await connectionFactory.CreateConnection(name, cancellationToken).ConfigureAwait(false); - connection.ConnectionShutdown += Connection_ConnectionShutdown; + connection.ConnectionShutdownAsync += Connection_ConnectionShutdown; var prefetchCount = prefetchCountCalculation(maxConcurrency); @@ -130,15 +130,16 @@ async Task ConnectToBroker(CancellationToken cancellationToken) prefetchCount = maxConcurrency; } - var channel = await connection.CreateChannelAsync((ushort)maxConcurrency, cancellationToken).ConfigureAwait(false); - channel.ChannelShutdown += Channel_ModelShutdown; + var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false, consumerDispatchConcurrency: (ushort)maxConcurrency); + var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken).ConfigureAwait(false); + channel.ChannelShutdownAsync += Channel_ModelShutdown; await channel.BasicQosAsync(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false, cancellationToken).ConfigureAwait(false); var consumer = new AsyncEventingBasicConsumer(channel); - consumer.Unregistered += Consumer_Unregistered; - consumer.Registered += Consumer_Registered; - consumer.Received += Consumer_Received; + consumer.UnregisteredAsync += Consumer_Unregistered; + consumer.RegisteredAsync += Consumer_Registered; + consumer.ReceivedAsync += Consumer_Received; await channel.BasicConsumeAsync(ReceiveAddress, false, consumerTag, consumer, cancellationToken: cancellationToken).ConfigureAwait(false); } @@ -214,7 +215,9 @@ Task Consumer_Registered(object sender, ConsumerEventArgs e) return Task.CompletedTask; } - void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) +#pragma warning disable PS0018 + Task Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) +#pragma warning restore PS0018 { if (e.Initiator == ShutdownInitiator.Application && e.ReplyCode == 200) { @@ -230,18 +233,22 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { Logger.WarnFormat("'{0}' connection shutdown while reconnect already in progress: {1}", name, e); } + + return Task.CompletedTask; } - void Channel_ModelShutdown(object sender, ShutdownEventArgs e) +#pragma warning disable PS0018 + Task Channel_ModelShutdown(object sender, ShutdownEventArgs e) +#pragma warning restore PS0018 { if (e.Initiator == ShutdownInitiator.Application) { - return; + return Task.CompletedTask; } if (e.Initiator == ShutdownInitiator.Peer && e.ReplyCode == 404) { - return; + return Task.CompletedTask; } if (circuitBreaker.Disarmed) @@ -254,6 +261,8 @@ void Channel_ModelShutdown(object sender, ShutdownEventArgs e) { Logger.WarnFormat("'{0}' channel shutdown while reconnect already in progress: {1}", name, e); } + + return Task.CompletedTask; } #pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext @@ -498,7 +507,7 @@ int GetDeliveryAttempts(BasicDeliverEventArgs message, string messageIdKey) return attempts; } - async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken) + async ValueTask MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken) { try { @@ -510,7 +519,8 @@ async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEv } finally { - channelProvider.ReturnPublishChannel(channel); + await channelProvider.ReturnPublishChannel(channel, messageProcessingCancellationToken) + .ConfigureAwait(false); } } catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken)) diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs index cf651880e..668bdb815 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs @@ -102,20 +102,6 @@ static bool CalculateDelay(DispatchProperties dispatchProperties, out long delay return delayed; } - public static void SetConfirmationId(this IBasicProperties properties, ulong confirmationId) - { - properties.Headers[ConfirmationIdHeader] = confirmationId.ToString(); - } - - public static bool TryGetConfirmationId(this IReadOnlyBasicProperties properties, out ulong confirmationId) - { - confirmationId = 0; - - return properties.Headers.TryGetValue(ConfirmationIdHeader, out var value) && - ulong.TryParse(value as byte[] ?? [], out confirmationId); - } - - public const string ConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId"; public const string UseNonPersistentDeliveryHeader = "NServiceBus.Transport.RabbitMQ.UseNonPersistentDelivery"; } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs index ee9a64e5c..7a1ba3d9f 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs @@ -31,23 +31,24 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa foreach (var operation in unicastTransportOperations) { - tasks.Add(SendMessage(operation, channel, cancellationToken)); + tasks.Add(SendMessage(operation, channel, cancellationToken).AsTask()); } foreach (var operation in multicastTransportOperations) { - tasks.Add(PublishMessage(operation, channel, cancellationToken)); + tasks.Add(PublishMessage(operation, channel, cancellationToken).AsTask()); } await Task.WhenAll(tasks).ConfigureAwait(false); } finally { - channelProvider.ReturnPublishChannel(channel); + await channelProvider.ReturnPublishChannel(channel, cancellationToken) + .ConfigureAwait(false); } } - Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken) + ValueTask SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken) { ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation); @@ -59,7 +60,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken); } - Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken) + ValueTask PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken) { ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation);