diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
index 0732b81e4..84fb265bc 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj
@@ -21,7 +21,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs
index 4e9d9b551..8174bc403 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs
@@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());
Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.quorum-error'"));
- Assert.That(exception.Message, Does.Contain("received 'classic' but current is 'quorum'"));
+ Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")
+ .Or.Contain("received 'classic' but current is 'quorum'"));
}
class ClassicQueueEndpoint : EndpointConfigurationBuilder
diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs
index 14a8f4653..655a6321b 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs
@@ -25,7 +25,8 @@ public async Task Should_fail_to_start()
.Run());
Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'ClassicEndpointUsesQuorumQueue.ClassicQueueEndpoint'"));
- Assert.That(exception.Message, Does.Contain("received 'classic' but current is 'quorum'"));
+ Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")
+ .Or.Contain("received 'classic' but current is 'quorum'"));
}
class ClassicQueueEndpoint : EndpointConfigurationBuilder
diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs
index 7167c648b..1fd9da994 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs
@@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());
Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.classic-error'"));
- Assert.That(exception.Message, Does.Contain("received 'quorum' but current is 'classic'"));
+ Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x
+ .Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x
}
class QuorumQueueEndpoint : EndpointConfigurationBuilder
diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs
index 2bec0bef5..4b8733263 100644
--- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs
@@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());
Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'QuorumEndpointUsesClassicQueue.QuorumQueueEndpoint'"));
- Assert.That(exception.Message, Does.Contain("received 'quorum' but current is 'classic'"));
+ Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x
+ .Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x
}
class QuorumQueueEndpoint : EndpointConfigurationBuilder
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs
index 9565339fe..4b98b4c82 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs
@@ -36,13 +36,13 @@ protected override BrokerConnection GetBoundValue(BindingContext bindingContext)
certificateCollection.Add(certificate);
}
- var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), new List<(string, int, bool)>());
+ var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []);
var brokerConnection = new BrokerConnection(connectionFactory);
return brokerConnection;
}
- string GetConnectionString(string? connectionString, string? connectionStringEnv)
+ static string GetConnectionString(string? connectionString, string? connectionStringEnv)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
index 322a2c176..80fb0ccc3 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
@@ -16,7 +16,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt
index 76ab4163d..a194fb5b7 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt
@@ -19,6 +19,7 @@ namespace NServiceBus
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
+ public bool DoNotUseManagementClient { get; set; }
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func MessageIdStrategy { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ManagementConnection/When_connecting_to_the_rabbitmq_management_api.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ManagementConnection/When_connecting_to_the_rabbitmq_management_api.cs
index 66effe5d4..7436e8a21 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ManagementConnection/When_connecting_to_the_rabbitmq_management_api.cs
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ManagementConnection/When_connecting_to_the_rabbitmq_management_api.cs
@@ -2,6 +2,7 @@
namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection
{
+ using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
@@ -14,9 +15,10 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection
[TestFixture]
- class When_connecting_to_the_rabbitmq_management_api : RabbitMQTransport
+ class When_connecting_to_the_rabbitmq_management_api
{
- ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create("host=localhost");
+ static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";
+ readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString);
protected QueueType queueType = QueueType.Quorum;
protected string ReceiverQueue => GetTestQueueName("ManagementAPITestQueue");
protected string GetTestQueueName(string queueName) => $"{queueName}-{queueType}";
@@ -24,7 +26,7 @@ class When_connecting_to_the_rabbitmq_management_api : RabbitMQTransport
[SetUp]
public async Task SetUp()
{
- var connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfiguration, null, !ValidateRemoteCertificate, UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, []);
+ var connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []);
IConnection connection = await connectionFactory.CreateConnection(ReceiverQueue).ConfigureAwait(false);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false);
var channel = await connection.CreateChannelAsync(createChannelOptions).ConfigureAwait(false);
diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
index 4b06edcec..cd100ea98 100644
--- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj
@@ -1,4 +1,4 @@
-
+
net8.0
@@ -12,7 +12,7 @@
-
+
@@ -20,10 +20,10 @@
-
-
+
+
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
index c8701d774..c9b0fd530 100644
--- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj
@@ -19,7 +19,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs
index b8a1cc71a..01d0cd3c4 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs
@@ -10,36 +10,41 @@ namespace NServiceBus.Transport.RabbitMQ.Administration;
using NServiceBus.Logging;
using Polly;
-class BrokerVerifier(ConnectionFactory connectionFactory, IManagementClientFactory managementClientFactory) : IBrokerVerifier
+class BrokerVerifier(ConnectionFactory connectionFactory, IManagementClientFactory? managementClientFactory) : IBrokerVerifier
{
static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier));
static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0");
+ static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0");
- readonly IManagementClient managementClient = managementClientFactory.CreateManagementClient();
+ readonly IManagementClient? managementClient = managementClientFactory?.CreateManagementClient();
- Overview? overview;
Version? brokerVersion;
public async Task Initialize(CancellationToken cancellationToken = default)
{
- var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false);
- if (response.HasValue)
+ if (managementClient != null)
{
- overview = response.Value;
- brokerVersion = overview.RabbitMqVersion;
+ var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false);
+ if (response.HasValue)
+ {
+ brokerVersion = response.Value.RabbitMqVersion;
+ return;
+ }
+
+ throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
}
- else
- {
- // TODO: Need logic/config settings for determining which action to take if management API unavailable, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("Could not access RabbitMQ Management API. ({0}: {1})", response.StatusCode, response.Reason);
- using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
- brokerVersion = connection.GetBrokerVersion();
+ using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
+ brokerVersion = connection.GetBrokerVersion();
+
+ if (brokerVersion >= RabbitMqVersion4)
+ {
+ Logger.Warn("Use of RabbitMQ Management API has been disabled." +
+ "The transport will not be able to override the default delivery limit on each queue " +
+ "which is necessary in order to guarantee that messages are not lost after repeated retries.");
}
}
- bool HasManagementClientAccess => overview != null;
-
Version BrokerVersion
{
get
@@ -61,7 +66,7 @@ public async Task VerifyRequirements(CancellationToken cancellationToken = defau
}
bool streamsEnabled;
- if (HasManagementClientAccess)
+ if (managementClient != null)
{
var response = await managementClient.GetFeatureFlags(cancellationToken).ConfigureAwait(false);
streamsEnabled = response.HasValue && response.Value.HasEnabledFeature(FeatureFlags.StreamQueue);
@@ -80,57 +85,59 @@ public async Task VerifyRequirements(CancellationToken cancellationToken = defau
public async Task ValidateDeliveryLimit(string queueName, CancellationToken cancellationToken = default)
{
- if (!HasManagementClientAccess)
+ if (managementClient == null)
{
return;
}
- var queue = await GetFullQueueDetails(queueName, cancellationToken).ConfigureAwait(false);
- if (queue is null)
+ var queue = await GetFullQueueDetails(managementClient, queueName, cancellationToken).ConfigureAwait(false)
+ ?? throw new InvalidOperationException($"Could not retrieve full queue details for {queueName}.");
+
+ if (ShouldOverrideDeliveryLimit(queue))
{
- // TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("Could not retrieve full queue details for {0}.", queueName);
- return;
+ await SetDeliveryLimitViaPolicy(managementClient, queue, BrokerVersion, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ bool ShouldOverrideDeliveryLimit(Queue queue)
+ {
+ if (BrokerVersion < RabbitMqVersion4)
+ {
+ return false;
}
if (queue.DeliveryLimit == -1)
{
- return;
+ return false;
}
- if (queue.Arguments.DeliveryLimit.HasValue &&
- queue.Arguments.DeliveryLimit != -1)
+ if (queue.Arguments.DeliveryLimit.HasValue && queue.Arguments.DeliveryLimit != -1)
{
- // TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("The delivery limit for {0} is set to {1} by a queue argument. This can interfere with the transport's retry implementation",
- queue.Name, queue.Arguments.DeliveryLimit);
- return;
+ throw new InvalidOperationException($"The delivery limit for {queue.Name} is set to {queue.Arguments.DeliveryLimit} by a queue argument. " +
+ "This can interfere with the transport's retry implementation");
}
- if (queue.EffectivePolicyDefinition!.DeliveryLimit.HasValue &&
- queue.EffectivePolicyDefinition.DeliveryLimit != -1)
+ if (queue.EffectivePolicyDefinition!.DeliveryLimit.HasValue && queue.EffectivePolicyDefinition.DeliveryLimit != -1)
{
- // TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("The RabbitMQ policy {2} is setting delivery limit to {1} for {0}.",
- queue.Name, queue.EffectivePolicyDefinition.DeliveryLimit, queue.AppliedPolicyName);
- return;
+ throw new InvalidOperationException($"The RabbitMQ policy {queue.AppliedPolicyName} " +
+ $"is setting delivery limit to {queue.EffectivePolicyDefinition.DeliveryLimit} for {queue.Name}.");
}
- await SetDeliveryLimitViaPolicy(queue, cancellationToken).ConfigureAwait(false);
+ return true;
}
- async Task GetFullQueueDetails(string queueName, CancellationToken cancellationToken)
+ static async Task GetFullQueueDetails(IManagementClient managementClient, string queueName, CancellationToken cancellationToken)
{
var retryPolicy = Polly.Policy
.HandleResult>(response => response.Value?.EffectivePolicyDefinition is null)
.WaitAndRetryAsync(
5,
- attempt => TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt - 1)),
+ attempt => TimeSpan.FromMilliseconds(3000 * Math.Pow(2, attempt - 1)),
onRetry: (outcome, timespan, retryCount, context) =>
{
if (outcome.Exception is not null)
{
- Logger.Error($"Failed to get {queueName} queue", outcome.Exception);
+ Logger.Error($"Failed to get {queueName} queue - Attempt #{retryCount}.", outcome.Exception);
}
else if (!outcome.Result.HasValue)
{
@@ -149,20 +156,16 @@ public async Task ValidateDeliveryLimit(string queueName, CancellationToken canc
return response?.Value?.EffectivePolicyDefinition is not null ? response.Value : null;
}
- async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellationToken)
+ static async Task SetDeliveryLimitViaPolicy(IManagementClient managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken)
{
if (!string.IsNullOrEmpty(queue.AppliedPolicyName))
{
- // TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("The {0} queue already has an associated policy.", queue.Name, queue.AppliedPolicyName);
- return;
+ throw new InvalidOperationException($"The {queue.Name} queue already has the '{queue.AppliedPolicyName}' policy applied.");
}
- if (BrokerVersion.Major < 4)
+ if (brokerVersion < RabbitMqVersion4)
{
- // TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
- Logger.WarnFormat("Cannot override delivery limit on the {0} queue by policy in RabbitMQ versions prior to 4.", queue.Name);
- return;
+ throw new InvalidOperationException($"Cannot override delivery limit on the {queue.Name} queue by policy in RabbitMQ versions prior to 4. Version is {brokerVersion}.");
}
var policy = new ManagementClient.Models.Policy
diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
index e9207ac9b..fe35d5c7b 100644
--- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj
@@ -10,8 +10,8 @@
-
-
+
+
diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
index 332290ef2..d10cfb810 100644
--- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
@@ -121,6 +121,12 @@ public PrefetchCountCalculation PrefetchCountCalculation
///
public bool UseExternalAuthMechanism { get; set; } = false;
+ ///
+ /// Set this to prevent the transport from using the RabbitMQ Management API.
+ /// This is not recommended as it can prevent the transport from setting appropriate delivery limits for retry functionality.
+ ///
+ public bool DoNotUseManagementClient { get; set; } = false;
+
///
/// The interval for heartbeats between the endpoint and the broker.
///
@@ -188,7 +194,7 @@ public override async Task Initialize(HostSettings host
var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate,
UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes);
- var managementClientFactory = new ManagementClientFactory(ConnectionConfiguration);
+ var managementClientFactory = DoNotUseManagementClient ? null : new ManagementClientFactory(ConnectionConfiguration);
var brokerVerifier = new BrokerVerifier(connectionFactory, managementClientFactory);
await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false);