diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
index 85c8921f0..0732b81e4 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
@@ -21,7 +21,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig
index 54b1eb36a..e2c832792 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/.editorconfig
@@ -2,4 +2,10 @@
# Justification: Test project
dotnet_diagnostic.CA2007.severity = none
-dotnet_diagnostic.PS0018.severity = none
+
+# may be enabled in future
+dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
+dotnet_diagnostic.PS0004.severity = none # Make the CancellationToken parameter required
+
+# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
+dotnet_diagnostic.NSB0002.severity = suggestion
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs
index 51f741cda..b432cf736 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs
@@ -55,7 +55,7 @@ public async Task Should_handle_failure_after_unbind()
await PrepareTestEndpoint(endpointName);
- await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None);
+ await ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueUnbindAsync(endpointName, endpointName, string.Empty, cancellationToken: cancellationToken));
await ExecuteMigration(endpointName);
@@ -340,12 +340,10 @@ Task TryDeleteQueue(string queueName) => ExecuteBrokerCommand(async (channel, ca
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
{
}
- },
- CancellationToken.None);
+ });
- Task CreateQueue(string queueName, bool quorum)
- {
- return ExecuteBrokerCommand(async (channel, cancellationToken) =>
+ Task CreateQueue(string queueName, bool quorum) =>
+ ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
var queueArguments = new Dictionary();
@@ -355,20 +353,15 @@ Task CreateQueue(string queueName, bool quorum)
}
await channel.QueueDeclareAsync(queueName, true, false, false, queueArguments, cancellationToken: cancellationToken);
- },
- CancellationToken.None);
- }
+ });
- Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken), CancellationToken.None);
+ Task CreateExchange(string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, true, cancellationToken: cancellationToken));
- Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken), CancellationToken.None);
+ Task BindQueue(string queueName, string exchangeName) => ExecuteBrokerCommand(async (channel, cancellationToken) => await channel.QueueBindAsync(queueName, exchangeName, string.Empty, cancellationToken: cancellationToken));
- Task AddMessages(string queueName, int numMessages, Action modifications = null)
- {
- return ExecuteBrokerCommand(async (channel, cancellationToken) =>
+ Task AddMessages(string queueName, int numMessages, Action modifications = null) =>
+ ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
- await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);
-
for (var i = 0; i < numMessages; i++)
{
var properties = new BasicProperties();
@@ -376,17 +369,14 @@ Task AddMessages(string queueName, int numMessages, Action mod
modifications?.Invoke(properties);
await channel.BasicPublishAsync(string.Empty, queueName, true, properties, ReadOnlyMemory.Empty, cancellationToken);
- await channel.WaitForConfirmsOrDieAsync(cancellationToken);
}
- },
- CancellationToken.None);
- }
+ }, new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null));
async Task MessageCount(string queueName)
{
uint messageCount = 0;
- await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken), CancellationToken.None);
+ await ExecuteBrokerCommand(async (channel, cancellationToken) => messageCount = await channel.MessageCountAsync(queueName, cancellationToken));
return messageCount;
}
@@ -406,15 +396,14 @@ await ExecuteBrokerCommand(async (channel, cancellationToken) =>
{
queueExists = false;
}
- },
- CancellationToken.None);
+ });
return queueExists;
}
- async Task ExecuteBrokerCommand(Func command, CancellationToken cancellationToken)
+ async Task ExecuteBrokerCommand(Func command, CreateChannelOptions createChannelOptions = default, CancellationToken cancellationToken = default)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken: cancellationToken);
await command(channel, cancellationToken);
}
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs
index 18b0d8aba..43f366a02 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Delays/DelaysMigrateCommand.cs
@@ -7,7 +7,7 @@
using System.Threading.Tasks;
using global::RabbitMQ.Client;
- class DelaysMigrateCommand
+ class DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console)
{
const string poisonMessageQueue = "delays-migrate-poison-messages";
const string timeSentHeader = "NServiceBus.TimeSent";
@@ -39,18 +39,11 @@ public static Command CreateCommand()
return command;
}
- public DelaysMigrateCommand(BrokerConnection brokerConnection, IRoutingTopology routingTopology, IConsole console)
+ async Task Run(CancellationToken cancellationToken)
{
- this.brokerConnection = brokerConnection;
- this.routingTopology = routingTopology;
- this.console = console;
- }
-
- public async Task Run(CancellationToken cancellationToken = default)
- {
- using var connection = await brokerConnection.Create(cancellationToken);
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
- await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);
+ await using var connection = await brokerConnection.Create(cancellationToken);
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
+ await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken);
for (int currentDelayLevel = DelayInfrastructure.MaxLevel; currentDelayLevel >= 0 && !cancellationToken.IsCancellationRequested; currentDelayLevel--)
{
@@ -92,7 +85,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc
}
await channel.BasicPublishAsync(string.Empty, poisonMessageQueue, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
- await channel.WaitForConfirmsOrDieAsync(cancellationToken);
await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken);
continue;
@@ -135,7 +127,6 @@ async Task MigrateQueue(IChannel channel, int delayLevel, CancellationToken canc
}
await channel.BasicPublishAsync(publishExchange, newRoutingKey, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
- await channel.WaitForConfirmsOrDieAsync(cancellationToken);
await channel.BasicAckAsync(message.DeliveryTag, false, cancellationToken);
processedMessages++;
}
@@ -172,18 +163,10 @@ static DateTimeOffset GetTimeSent(BasicGetResult message)
return DateTimeOffset.ParseExact(timeSentString, dateTimeOffsetWireFormat, CultureInfo.InvariantCulture);
}
- static bool MessageIsInvalid(BasicGetResult? message)
- {
- return message == null
- || message.BasicProperties == null
- || message.BasicProperties.Headers == null
- || !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader)
- || !message.BasicProperties.Headers.ContainsKey(timeSentHeader);
- }
-
- readonly BrokerConnection brokerConnection;
- readonly IRoutingTopology routingTopology;
- readonly IConsole console;
+ static bool MessageIsInvalid(BasicGetResult? message) =>
+ message?.BasicProperties?.Headers == null
+ || !message.BasicProperties.Headers.ContainsKey(DelayInfrastructure.DelayHeader)
+ || !message.BasicProperties.Headers.ContainsKey(timeSentHeader);
bool poisonQueueCreated = false;
}
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs
index 6f25849d7..541897f24 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs
@@ -6,7 +6,7 @@
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Exceptions;
- class QueueMigrateCommand
+ class QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console)
{
public static Command CreateCommand()
{
@@ -32,20 +32,11 @@ public static Command CreateCommand()
return command;
}
- public QueueMigrateCommand(string queueName, BrokerConnection brokerConnection, IConsole console)
- {
- this.queueName = queueName;
- this.brokerConnection = brokerConnection;
- this.console = console;
- holdingQueueName = $"{queueName}-migration-temp";
- migrationState = new MigrationState();
- }
-
public async Task Run(CancellationToken cancellationToken = default)
{
console.WriteLine($"Starting migration of '{queueName}'");
- using var connection = await brokerConnection.Create(cancellationToken);
+ await using var connection = await brokerConnection.Create(cancellationToken);
await migrationState.SetInitialMigrationStage(queueName, holdingQueueName, connection, cancellationToken);
@@ -77,7 +68,8 @@ public async Task Run(CancellationToken cancellationToken = default)
async Task MoveMessagesToHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
+ await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken);
console.WriteLine($"Migrating messages from '{queueName}' to '{holdingQueueName}'");
@@ -95,15 +87,12 @@ async Task MoveMessagesToHoldingQueue(IConnection connection, Ca
console.WriteLine($"Unbound '{queueName}' from exchange '{queueName}' ");
// move all existing messages to the holding queue
- await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);
-
var numMessagesMovedToHolding = await ProcessMessages(
channel,
queueName,
- async (message, cancellationToken) =>
+ async (message, token) =>
{
- await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: cancellationToken);
- await channel.WaitForConfirmsOrDieAsync(cancellationToken);
+ await channel.BasicPublishAsync(string.Empty, holdingQueueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token);
},
cancellationToken);
@@ -114,7 +103,7 @@ async Task MoveMessagesToHoldingQueue(IConnection connection, Ca
async Task DeleteMainQueue(IConnection connection, CancellationToken cancellationToken)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
if (await channel.MessageCountAsync(queueName, cancellationToken) > 0)
{
@@ -130,7 +119,7 @@ async Task DeleteMainQueue(IConnection connection, CancellationT
async Task CreateMainQueueAsQuorum(IConnection connection, CancellationToken cancellationToken)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
await channel.QueueDeclareAsync(queueName, true, false, false, quorumQueueArguments, cancellationToken: cancellationToken);
console.WriteLine($"Recreated '{queueName}' as a quorum queue");
@@ -140,7 +129,8 @@ async Task CreateMainQueueAsQuorum(IConnection connection, Cance
async Task RestoreMessages(IConnection connection, CancellationToken cancellationToken)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
+ await using var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken);
await channel.QueueBindAsync(queueName, queueName, string.Empty, cancellationToken: cancellationToken);
console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'");
@@ -151,8 +141,6 @@ async Task RestoreMessages(IConnection connection, CancellationT
var messageIds = new Dictionary();
// move all messages in the holding queue back to the main queue
- await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken);
-
var numMessageMovedBackToMain = await ProcessMessages(
channel,
holdingQueueName,
@@ -174,7 +162,6 @@ async Task RestoreMessages(IConnection connection, CancellationT
}
await channel.BasicPublishAsync(string.Empty, queueName, false, new BasicProperties(message.BasicProperties), message.Body, cancellationToken: token);
- await channel.WaitForConfirmsOrDieAsync(token);
if (messageIdString != null)
{
@@ -190,7 +177,7 @@ async Task RestoreMessages(IConnection connection, CancellationT
async Task CleanUpHoldingQueue(IConnection connection, CancellationToken cancellationToken)
{
- using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
+ await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
if (await channel.MessageCountAsync(holdingQueueName, cancellationToken) != 0)
{
@@ -227,13 +214,10 @@ async Task ProcessMessages(IChannel channel, string sourceQueue, Func quorumQueueArguments = new() { { "x-queue-type", "quorum" } };
+ static readonly Dictionary quorumQueueArguments = new() { { "x-queue-type", "quorum" } };
enum MigrationStage
{
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
index f314a8613..322a2c176 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
@@ -16,7 +16,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig b/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig
index f9749f769..e895eab4f 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/.editorconfig
@@ -2,7 +2,10 @@
# Justification: Test project
dotnet_diagnostic.CA2007.severity = none
+
+# may be enabled in future
dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required
+dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs
index 911ff1a61..0076147db 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ChannelProviderTests.cs
@@ -18,7 +18,7 @@ public async Task Should_recover_connection_and_dispose_old_one_when_connection_
await channelProvider.CreateConnection();
var publishConnection = channelProvider.PublishConnections.Dequeue();
- publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
+ await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
channelProvider.DelayTaskCompletionSource.SetResult();
@@ -37,7 +37,7 @@ public async Task Should_dispose_connection_when_disposed()
await channelProvider.CreateConnection();
var publishConnection = channelProvider.PublishConnections.Dequeue();
- channelProvider.Dispose();
+ await channelProvider.DisposeAsync();
Assert.That(publishConnection.WasDisposed, Is.True);
}
@@ -49,11 +49,11 @@ public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_
await channelProvider.CreateConnection();
var publishConnection = channelProvider.PublishConnections.Dequeue();
- publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
+ await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
// to simulate a pending delay task
- channelProvider.Dispose();
+ await channelProvider.DisposeAsync();
await channelProvider.FireAndForgetAction(CancellationToken.None);
@@ -68,14 +68,14 @@ public async Task Should_dispose_newly_established_connection()
await channelProvider.CreateConnection();
var publishConnection = channelProvider.PublishConnections.Dequeue();
- publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
+ await publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
// This simulates the race of the reconnection loop being fired off with the delay task completed during
// the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop
// and await its completion after the channel provider has been disposed.
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
channelProvider.DelayTaskCompletionSource.SetResult();
- channelProvider.Dispose();
+ await channelProvider.DisposeAsync();
await fireAndForgetTask;
@@ -121,29 +121,7 @@ class FakeConnection : IConnection
public bool WasDisposed { get; private set; }
- public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException();
-
- public void Abort() => throw new NotImplementedException();
-
- public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException();
-
- public void Abort(TimeSpan timeout) => throw new NotImplementedException();
-
- public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
-
- public void Close() => throw new NotImplementedException();
-
- public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException();
-
- public void Close(TimeSpan timeout) => throw new NotImplementedException();
-
- public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
-
- public void HandleConnectionBlocked(string reason) => throw new NotImplementedException();
-
- public void HandleConnectionUnblocked() => throw new NotImplementedException();
-
- public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = new CancellationToken()) => throw new NotImplementedException();
+ public Task CreateChannelAsync(CreateChannelOptions options = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public ushort ChannelMax { get; }
public IDictionary ClientProperties { get; }
@@ -157,24 +135,24 @@ class FakeConnection : IConnection
public IDictionary ServerProperties { get; }
public IList ShutdownReport { get; }
public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}";
+ public event AsyncEventHandler CallbackExceptionAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler ConnectionShutdownAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler RecoverySucceededAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler ConnectionRecoveryErrorAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler ConsumerTagChangeAfterRecoveryAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler QueueNameChangedAfterRecoveryAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler RecoveringConsumerAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler ConnectionBlockedAsync = (_, _) => Task.CompletedTask;
+ public event AsyncEventHandler ConnectionUnblockedAsync = (_, _) => Task.CompletedTask;
IEnumerable IConnection.ShutdownReport => throw new NotImplementedException();
- public event EventHandler CallbackException = (_, _) => { };
- public event EventHandler ConnectionBlocked = (_, _) => { };
- public event EventHandler ConnectionShutdown = (_, _) => { };
- public event EventHandler ConnectionUnblocked = (_, _) => { };
- public event EventHandler RecoverySucceeded = (_, _) => { };
- public event EventHandler ConnectionRecoveryError = (_, _) => { };
- public event EventHandler ConsumerTagChangeAfterRecovery = (_, _) => { };
- public event EventHandler QueueNameChangedAfterRecovery = (_, _) => { };
- public event EventHandler RecoveringConsumer = (_, _) => { };
-
- public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args);
+ public Task RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdownAsync.Invoke(this, args);
public Task UpdateSecretAsync(string newSecret, string reason, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default) => throw new NotImplementedException();
static int connectionCounter;
+ public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}
}
\ No newline at end of file
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
index def66cb74..4b06edcec 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
@@ -23,7 +23,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
index d778a2f83..c8701d774 100644
--- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
@@ -19,7 +19,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
index 4ab33c163..8def9d181 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
@@ -7,9 +7,10 @@ namespace NServiceBus.Transport.RabbitMQ
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
+ using global::RabbitMQ.Client.Events;
using Logging;
- class ChannelProvider : IDisposable
+ class ChannelProvider : IAsyncDisposable
{
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
{
@@ -28,20 +29,23 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
async Task CreateConnectionWithShutdownListener(CancellationToken cancellationToken)
{
var newConnection = await CreatePublishConnection(cancellationToken).ConfigureAwait(false);
- newConnection.ConnectionShutdown += Connection_ConnectionShutdown;
+ newConnection.ConnectionShutdownAsync += Connection_ConnectionShutdown;
return newConnection;
}
- void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
+#pragma warning disable PS0018
+ Task Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
+#pragma warning restore PS0018
{
if (e.Initiator == ShutdownInitiator.Application || sender is null)
{
- return;
+ return Task.CompletedTask;
}
var connectionThatWasShutdown = (IConnection)sender;
FireAndForget(cancellationToken => ReconnectSwallowingExceptions(connectionThatWasShutdown.ClientProvidedName, cancellationToken), stoppingTokenSource.Token);
+ return Task.CompletedTask;
}
async Task ReconnectSwallowingExceptions(string? connectionName, CancellationToken cancellationToken)
@@ -95,7 +99,11 @@ public async ValueTask GetPublishChannel(CancellationToken
return channel;
}
- channel?.Dispose();
+ if (channel is not null)
+ {
+ await channel.DisposeAsync()
+ .ConfigureAwait(false);
+ }
channel = new ConfirmsAwareChannel(connection, routingTopology);
await channel.Initialize(cancellationToken).ConfigureAwait(false);
@@ -103,26 +111,27 @@ public async ValueTask GetPublishChannel(CancellationToken
return channel;
}
- public void ReturnPublishChannel(ConfirmsAwareChannel channel)
+ public ValueTask ReturnPublishChannel(ConfirmsAwareChannel channel, CancellationToken cancellationToken = default)
{
if (channel.IsOpen)
{
channels.Enqueue(channel);
+ return ValueTask.CompletedTask;
}
- else
- {
- channel.Dispose();
- }
+
+ return channel.DisposeAsync();
}
- public void Dispose()
+#pragma warning disable PS0018
+ public async ValueTask DisposeAsync()
+#pragma warning restore PS0018
{
if (disposed)
{
return;
}
- stoppingTokenSource.Cancel();
+ await stoppingTokenSource.CancelAsync().ConfigureAwait(false);
stoppingTokenSource.Dispose();
var oldConnection = Interlocked.Exchange(ref connection, null);
@@ -130,7 +139,7 @@ public void Dispose()
foreach (var channel in channels)
{
- channel.Dispose();
+ await channel.DisposeAsync().ConfigureAwait(false);
}
disposed = true;
diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs
index c16d1856f..4594a99bf 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs
@@ -1,15 +1,12 @@
namespace NServiceBus.Transport.RabbitMQ
{
using System;
- using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
- using global::RabbitMQ.Client.Events;
- using NServiceBus.Logging;
- sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IDisposable
+ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IAsyncDisposable
{
public bool IsOpen => channel.IsOpen;
@@ -17,212 +14,48 @@ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routi
public async Task Initialize(CancellationToken cancellationToken = default)
{
- channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
-
- channel.BasicAcks += Channel_BasicAcks;
- channel.BasicNacks += Channel_BasicNacks;
- channel.BasicReturn += Channel_BasicReturn;
- channel.ChannelShutdown += Channel_ModelShutdown;
-
- await channel.ConfirmSelectAsync(trackConfirmations: false, cancellationToken).ConfigureAwait(false);
- }
-
- public async Task SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
- {
- TaskCompletionSource taskCompletionSource;
-
- try
- {
- await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
- (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken);
- await using var _ = registration.ConfigureAwait(false);
-
- properties.SetConfirmationId(channel.NextPublishSeqNo);
-
- if (properties.Headers != null &&
- properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue))
- {
- var routingKey =
- DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);
-
- await routingTopology.BindToDelayInfrastructure(channel, address,
- DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address),
- cancellationToken).ConfigureAwait(false);
- // The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure
- await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true,
- properties, message.Body, cancellationToken).ConfigureAwait(false);
- }
- else
- {
- await routingTopology.Send(channel, address, message, properties, cancellationToken)
- .ConfigureAwait(false);
- }
- }
- finally
- {
- sequenceNumberSemaphore.Release();
- }
-
- await taskCompletionSource.Task.ConfigureAwait(false);
- }
-
- public async Task PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
- {
- TaskCompletionSource taskCompletionSource;
-
- try
- {
- await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
- (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken);
- await using var _ = registration.ConfigureAwait(false);
-
- properties.SetConfirmationId(channel.NextPublishSeqNo);
-
- await routingTopology.Publish(channel, type, message, properties, cancellationToken)
- .ConfigureAwait(false);
- }
- finally
- {
- sequenceNumberSemaphore.Release();
- }
-
- await taskCompletionSource.Task.ConfigureAwait(false);
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true, outstandingPublisherConfirmationsRateLimiter: null);
+ channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
}
- public async Task RawSendInCaseOfFailure(string address, ReadOnlyMemory body, BasicProperties properties, CancellationToken cancellationToken = default)
+ public async ValueTask SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
- properties.Headers ??= new Dictionary();
-
- TaskCompletionSource taskCompletionSource;
-
- try
+ if (properties.Headers != null &&
+ properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue))
{
- await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
- (taskCompletionSource, var registration) = GetCancellableTaskCompletionSource(cancellationToken);
+ var routingKey =
+ DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);
- await using var _ = registration.ConfigureAwait(false);
-
- properties.SetConfirmationId(channel.NextPublishSeqNo);
-
- await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken)
- .ConfigureAwait(false);
- }
- finally
- {
- sequenceNumberSemaphore.Release();
- }
-
- await taskCompletionSource.Task.ConfigureAwait(false);
- }
-
- (TaskCompletionSource, IAsyncDisposable) GetCancellableTaskCompletionSource(CancellationToken cancellationToken)
- {
- var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
-
- // There is no need to capture the execution context therefore using UnsafeRegister
- var registration = cancellationToken.UnsafeRegister(static state =>
- {
- var (tcs, cancellationToken) = ((TaskCompletionSource, CancellationToken))state!;
- tcs.TrySetCanceled(cancellationToken);
- }, (tcs, cancellationToken));
-
- var added = messages.TryAdd(channel.NextPublishSeqNo, tcs);
-
- if (!added)
- {
- throw new Exception($"Cannot publish a message with sequence number '{channel.NextPublishSeqNo}' on this channel. A message was already published on this channel with the same confirmation number.");
- }
-
- return (tcs, registration);
- }
-
- void Channel_BasicAcks(object sender, BasicAckEventArgs e)
- {
- if (!e.Multiple)
- {
- SetResult(e.DeliveryTag);
+ await routingTopology.BindToDelayInfrastructure(channel, address,
+ DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address),
+ cancellationToken).ConfigureAwait(false);
+ // The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure
+ await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true,
+ properties, message.Body, cancellationToken).ConfigureAwait(false);
}
else
{
- foreach (var message in messages)
- {
- if (message.Key <= e.DeliveryTag)
- {
- SetResult(message.Key);
- }
- }
- }
- }
-
- void Channel_BasicNacks(object sender, BasicNackEventArgs e)
- {
- if (!e.Multiple)
- {
- SetException(e.DeliveryTag, "Message rejected by broker.");
- }
- else
- {
- foreach (var message in messages)
- {
- if (message.Key <= e.DeliveryTag)
- {
- SetException(message.Key, "Message rejected by broker.");
- }
- }
- }
- }
-
- void Channel_BasicReturn(object sender, BasicReturnEventArgs e)
- {
- var message = $"Message could not be routed to {e.Exchange + e.RoutingKey}: {e.ReplyCode} {e.ReplyText}";
-
- if (e.BasicProperties.TryGetConfirmationId(out var deliveryTag))
- {
- SetException(deliveryTag, message);
- }
- else
- {
- Logger.Warn(message);
+ await routingTopology.Send(channel, address, message, properties, cancellationToken)
+ .ConfigureAwait(false);
}
}
- void Channel_ModelShutdown(object sender, ShutdownEventArgs e)
- {
- do
- {
- foreach (var message in messages)
- {
- SetException(message.Key, $"Channel has been closed: {e}");
- }
- }
- while (!messages.IsEmpty);
- }
+ public async ValueTask PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default) =>
+ await routingTopology.Publish(channel, type, message, properties, cancellationToken)
+ .ConfigureAwait(false);
- void SetResult(ulong key)
+ public async ValueTask RawSendInCaseOfFailure(string address, ReadOnlyMemory body, BasicProperties properties, CancellationToken cancellationToken = default)
{
- if (messages.TryRemove(key, out var tcs))
- {
- tcs.SetResult();
- }
- }
+ properties.Headers ??= new Dictionary();
- void SetException(ulong key, string exceptionMessage)
- {
- if (messages.TryRemove(key, out var tcs))
- {
- tcs.SetException(new Exception(exceptionMessage));
- }
+ await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken)
+ .ConfigureAwait(false);
}
- public void Dispose() => channel?.Dispose();
+#pragma warning disable PS0018
+ public ValueTask DisposeAsync() => channel is not null ? channel.DisposeAsync() : ValueTask.CompletedTask;
+#pragma warning restore PS0018
IChannel channel;
- readonly ConcurrentDictionary messages = new();
- readonly SemaphoreSlim sequenceNumberSemaphore = new(1, 1);
-
- static readonly ILog Logger = LogManager.GetLogger(typeof(ConfirmsAwareChannel));
}
}
diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs
index 3fd0aa6cc..5be2fd683 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs
@@ -119,17 +119,27 @@ public async Task CreateConnection(string connectionName, Cancellat
{
var connection = await connectionFactory.CreateConnectionAsync(endpoints, connectionName, cancellationToken).ConfigureAwait(false);
- connection.ConnectionBlocked += (sender, e) => Logger.WarnFormat("'{0}' connection blocked: {1}", connectionName, e.Reason);
- connection.ConnectionUnblocked += (sender, e) => Logger.WarnFormat("'{0}' connection unblocked}", connectionName);
+ connection.ConnectionBlockedAsync += (sender, e) =>
+ {
+ Logger.WarnFormat("'{0}' connection blocked: {1}", connectionName, e.Reason);
+ return Task.CompletedTask;
+ };
+ connection.ConnectionUnblockedAsync += (sender, e) =>
+ {
+ Logger.WarnFormat("'{0}' connection unblocked}", connectionName);
+ return Task.CompletedTask;
+ };
- connection.ConnectionShutdown += (sender, e) =>
+ connection.ConnectionShutdownAsync += (sender, e) =>
{
if (e.Initiator == ShutdownInitiator.Application && e.ReplyCode == 200)
{
- return;
+ return Task.CompletedTask;
}
Logger.WarnFormat("'{0}' connection shutdown: {1}", connectionName, e);
+
+ return Task.CompletedTask;
};
return connection;
diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
index 345493190..e2ef50a8f 100644
--- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
@@ -10,7 +10,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs
index b603d5e0a..ddda3c9a4 100644
--- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs
@@ -70,7 +70,7 @@ public override async Task Shutdown(CancellationToken cancellationToken = defaul
await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)))
.ConfigureAwait(false);
- channelProvider.Dispose();
+ await channelProvider.DisposeAsync().ConfigureAwait(false);
}
public override string ToTransportAddress(QueueAddress address) => TranslateAddress(address);
diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs
index d2b919a16..1fd1043c1 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs
@@ -40,7 +40,8 @@ public Dictionary RetrieveHeaders(BasicDeliverEventArgs message)
messageHeaders.Remove(DelayInfrastructure.XFirstDeathExchangeHeader);
messageHeaders.Remove(DelayInfrastructure.XFirstDeathQueueHeader);
messageHeaders.Remove(DelayInfrastructure.XFirstDeathReasonHeader);
- messageHeaders.Remove(BasicPropertiesExtensions.ConfirmationIdHeader);
+ messageHeaders.Remove(LegacyConfirmationIdHeader);
+ messageHeaders.Remove(Constants.PublishSequenceNumberHeader);
}
// Leaving space for ReplyTo, CorrelationId, DeliveryMode, EnclosedMessageTypes conditionally
@@ -166,5 +167,7 @@ static string ValueToString(object value)
readonly Func messageIdStrategy;
static readonly DateTimeOffset UnixEpoch = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero);
+
+ public const string LegacyConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId";
}
}
diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
index 1bdb4b35f..2893e0a18 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
@@ -120,7 +120,7 @@ public async Task ChangeConcurrency(PushRuntimeSettings limitations, Cancellatio
async Task ConnectToBroker(CancellationToken cancellationToken)
{
connection = await connectionFactory.CreateConnection(name, cancellationToken).ConfigureAwait(false);
- connection.ConnectionShutdown += Connection_ConnectionShutdown;
+ connection.ConnectionShutdownAsync += Connection_ConnectionShutdown;
var prefetchCount = prefetchCountCalculation(maxConcurrency);
@@ -130,15 +130,16 @@ async Task ConnectToBroker(CancellationToken cancellationToken)
prefetchCount = maxConcurrency;
}
- var channel = await connection.CreateChannelAsync((ushort)maxConcurrency, cancellationToken).ConfigureAwait(false);
- channel.ChannelShutdown += Channel_ModelShutdown;
+ var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false, consumerDispatchConcurrency: (ushort)maxConcurrency);
+ var channel = await connection.CreateChannelAsync(createChannelOptions, cancellationToken).ConfigureAwait(false);
+ channel.ChannelShutdownAsync += Channel_ModelShutdown;
await channel.BasicQosAsync(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false, cancellationToken).ConfigureAwait(false);
var consumer = new AsyncEventingBasicConsumer(channel);
- consumer.Unregistered += Consumer_Unregistered;
- consumer.Registered += Consumer_Registered;
- consumer.Received += Consumer_Received;
+ consumer.UnregisteredAsync += Consumer_Unregistered;
+ consumer.RegisteredAsync += Consumer_Registered;
+ consumer.ReceivedAsync += Consumer_Received;
await channel.BasicConsumeAsync(ReceiveAddress, false, consumerTag, consumer, cancellationToken: cancellationToken).ConfigureAwait(false);
}
@@ -214,7 +215,9 @@ Task Consumer_Registered(object sender, ConsumerEventArgs e)
return Task.CompletedTask;
}
- void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
+#pragma warning disable PS0018
+ Task Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
+#pragma warning restore PS0018
{
if (e.Initiator == ShutdownInitiator.Application && e.ReplyCode == 200)
{
@@ -230,18 +233,22 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Logger.WarnFormat("'{0}' connection shutdown while reconnect already in progress: {1}", name, e);
}
+
+ return Task.CompletedTask;
}
- void Channel_ModelShutdown(object sender, ShutdownEventArgs e)
+#pragma warning disable PS0018
+ Task Channel_ModelShutdown(object sender, ShutdownEventArgs e)
+#pragma warning restore PS0018
{
if (e.Initiator == ShutdownInitiator.Application)
{
- return;
+ return Task.CompletedTask;
}
if (e.Initiator == ShutdownInitiator.Peer && e.ReplyCode == 404)
{
- return;
+ return Task.CompletedTask;
}
if (circuitBreaker.Disarmed)
@@ -254,6 +261,8 @@ void Channel_ModelShutdown(object sender, ShutdownEventArgs e)
{
Logger.WarnFormat("'{0}' channel shutdown while reconnect already in progress: {1}", name, e);
}
+
+ return Task.CompletedTask;
}
#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
@@ -498,7 +507,7 @@ int GetDeliveryAttempts(BasicDeliverEventArgs message, string messageIdKey)
return attempts;
}
- async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken)
+ async ValueTask MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken)
{
try
{
@@ -510,7 +519,8 @@ async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEv
}
finally
{
- channelProvider.ReturnPublishChannel(channel);
+ await channelProvider.ReturnPublishChannel(channel, messageProcessingCancellationToken)
+ .ConfigureAwait(false);
}
}
catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken))
diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs
index cf651880e..668bdb815 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs
@@ -102,20 +102,6 @@ static bool CalculateDelay(DispatchProperties dispatchProperties, out long delay
return delayed;
}
- public static void SetConfirmationId(this IBasicProperties properties, ulong confirmationId)
- {
- properties.Headers[ConfirmationIdHeader] = confirmationId.ToString();
- }
-
- public static bool TryGetConfirmationId(this IReadOnlyBasicProperties properties, out ulong confirmationId)
- {
- confirmationId = 0;
-
- return properties.Headers.TryGetValue(ConfirmationIdHeader, out var value) &&
- ulong.TryParse(value as byte[] ?? [], out confirmationId);
- }
-
- public const string ConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId";
public const string UseNonPersistentDeliveryHeader = "NServiceBus.Transport.RabbitMQ.UseNonPersistentDelivery";
}
}
diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
index ee9a64e5c..7a1ba3d9f 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
@@ -31,23 +31,24 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
foreach (var operation in unicastTransportOperations)
{
- tasks.Add(SendMessage(operation, channel, cancellationToken));
+ tasks.Add(SendMessage(operation, channel, cancellationToken).AsTask());
}
foreach (var operation in multicastTransportOperations)
{
- tasks.Add(PublishMessage(operation, channel, cancellationToken));
+ tasks.Add(PublishMessage(operation, channel, cancellationToken).AsTask());
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
finally
{
- channelProvider.ReturnPublishChannel(channel);
+ await channelProvider.ReturnPublishChannel(channel, cancellationToken)
+ .ConfigureAwait(false);
}
}
- Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
+ ValueTask SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
{
ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation);
@@ -59,7 +60,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan
return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
}
- Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
+ ValueTask PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwareChannel channel, CancellationToken cancellationToken)
{
ThrowIfDelayedDeliveryIsDisabledAndMessageIsDelayed(transportOperation);