Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration option to prevent the transport using the RabbitMQ management API #1498

Merged
merged 8 commits into from
Nov 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());

Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.quorum-error'"));
Assert.That(exception.Message, Does.Contain("received 'classic' but current is 'quorum'"));
Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")
.Or.Contain("received 'classic' but current is 'quorum'"));
}

class ClassicQueueEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public async Task Should_fail_to_start()
.Run());

Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'ClassicEndpointUsesQuorumQueue.ClassicQueueEndpoint'"));
Assert.That(exception.Message, Does.Contain("received 'classic' but current is 'quorum'"));
Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")
.Or.Contain("received 'classic' but current is 'quorum'"));
}

class ClassicQueueEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());

Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.classic-error'"));
Assert.That(exception.Message, Does.Contain("received 'quorum' but current is 'classic'"));
Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x
.Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x
}

class QuorumQueueEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public async Task Should_fail_to_start()
.Run());

Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'QuorumEndpointUsesClassicQueue.QuorumQueueEndpoint'"));
Assert.That(exception.Message, Does.Contain("received 'quorum' but current is 'classic'"));
Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x
.Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x
}

class QuorumQueueEndpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ protected override BrokerConnection GetBoundValue(BindingContext bindingContext)
certificateCollection.Add(certificate);
}

var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), new List<(string, int, bool)>());
var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []);
var brokerConnection = new BrokerConnection(connectionFactory);

return brokerConnection;
}

string GetConnectionString(string? connectionString, string? connectionStringEnv)
static string GetConnectionString(string? connectionString, string? connectionStringEnv)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace NServiceBus
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
public bool DoNotUseManagementClient { get; set; }
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
Expand All @@ -14,17 +15,18 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection


[TestFixture]
class When_connecting_to_the_rabbitmq_management_api : RabbitMQTransport
class When_connecting_to_the_rabbitmq_management_api
{
ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create("host=localhost");
static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";
readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString);
protected QueueType queueType = QueueType.Quorum;
protected string ReceiverQueue => GetTestQueueName("ManagementAPITestQueue");
protected string GetTestQueueName(string queueName) => $"{queueName}-{queueType}";

[SetUp]
public async Task SetUp()
{
var connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfiguration, null, !ValidateRemoteCertificate, UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, []);
var connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []);
IConnection connection = await connectionFactory.CreateConnection(ReceiverQueue).ConfigureAwait(false);
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false);
var channel = await connection.CreateChannelAsync(createChannelOptions).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
Expand All @@ -12,18 +12,18 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit.Analyzers" Version="4.3.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="NServiceBus" Version="9.2.3" />
<PackageReference Include="Particular.Approvals" Version="2.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="9.2.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.14" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
97 changes: 50 additions & 47 deletions src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,41 @@ namespace NServiceBus.Transport.RabbitMQ.Administration;
using NServiceBus.Logging;
using Polly;

class BrokerVerifier(ConnectionFactory connectionFactory, IManagementClientFactory managementClientFactory) : IBrokerVerifier
class BrokerVerifier(ConnectionFactory connectionFactory, IManagementClientFactory? managementClientFactory) : IBrokerVerifier
{
static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier));
static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0");
static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0");

readonly IManagementClient managementClient = managementClientFactory.CreateManagementClient();
readonly IManagementClient? managementClient = managementClientFactory?.CreateManagementClient();

Overview? overview;
Version? brokerVersion;

public async Task Initialize(CancellationToken cancellationToken = default)
{
var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false);
if (response.HasValue)
if (managementClient != null)
{
overview = response.Value;
brokerVersion = overview.RabbitMqVersion;
var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false);
if (response.HasValue)
{
brokerVersion = response.Value.RabbitMqVersion;
return;
}

throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})");
}
else
{
// TODO: Need logic/config settings for determining which action to take if management API unavailable, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("Could not access RabbitMQ Management API. ({0}: {1})", response.StatusCode, response.Reason);

using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
brokerVersion = connection.GetBrokerVersion();
using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false);
brokerVersion = connection.GetBrokerVersion();

if (brokerVersion >= RabbitMqVersion4)
{
Logger.Warn("Use of RabbitMQ Management API has been disabled." +
"The transport will not be able to override the default delivery limit on each queue " +
"which is necessary in order to guarantee that messages are not lost after repeated retries.");
}
}

bool HasManagementClientAccess => overview != null;

Version BrokerVersion
{
get
Expand All @@ -61,7 +66,7 @@ public async Task VerifyRequirements(CancellationToken cancellationToken = defau
}

bool streamsEnabled;
if (HasManagementClientAccess)
if (managementClient != null)
{
var response = await managementClient.GetFeatureFlags(cancellationToken).ConfigureAwait(false);
streamsEnabled = response.HasValue && response.Value.HasEnabledFeature(FeatureFlags.StreamQueue);
Expand All @@ -80,57 +85,59 @@ public async Task VerifyRequirements(CancellationToken cancellationToken = defau

public async Task ValidateDeliveryLimit(string queueName, CancellationToken cancellationToken = default)
{
if (!HasManagementClientAccess)
if (managementClient == null)
{
return;
}

var queue = await GetFullQueueDetails(queueName, cancellationToken).ConfigureAwait(false);
if (queue is null)
var queue = await GetFullQueueDetails(managementClient, queueName, cancellationToken).ConfigureAwait(false)
?? throw new InvalidOperationException($"Could not retrieve full queue details for {queueName}.");

if (ShouldOverrideDeliveryLimit(queue))
{
// TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("Could not retrieve full queue details for {0}.", queueName);
return;
await SetDeliveryLimitViaPolicy(managementClient, queue, BrokerVersion, cancellationToken).ConfigureAwait(false);
}
}

bool ShouldOverrideDeliveryLimit(Queue queue)
{
if (BrokerVersion < RabbitMqVersion4)
{
return false;
}

if (queue.DeliveryLimit == -1)
{
return;
return false;
}

if (queue.Arguments.DeliveryLimit.HasValue &&
queue.Arguments.DeliveryLimit != -1)
if (queue.Arguments.DeliveryLimit.HasValue && queue.Arguments.DeliveryLimit != -1)
{
// TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("The delivery limit for {0} is set to {1} by a queue argument. This can interfere with the transport's retry implementation",
queue.Name, queue.Arguments.DeliveryLimit);
return;
throw new InvalidOperationException($"The delivery limit for {queue.Name} is set to {queue.Arguments.DeliveryLimit} by a queue argument. " +
"This can interfere with the transport's retry implementation");
}

if (queue.EffectivePolicyDefinition!.DeliveryLimit.HasValue &&
queue.EffectivePolicyDefinition.DeliveryLimit != -1)
if (queue.EffectivePolicyDefinition!.DeliveryLimit.HasValue && queue.EffectivePolicyDefinition.DeliveryLimit != -1)
{
// TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("The RabbitMQ policy {2} is setting delivery limit to {1} for {0}.",
queue.Name, queue.EffectivePolicyDefinition.DeliveryLimit, queue.AppliedPolicyName);
return;
throw new InvalidOperationException($"The RabbitMQ policy {queue.AppliedPolicyName} " +
$"is setting delivery limit to {queue.EffectivePolicyDefinition.DeliveryLimit} for {queue.Name}.");
}

await SetDeliveryLimitViaPolicy(queue, cancellationToken).ConfigureAwait(false);
return true;
}

async Task<Queue?> GetFullQueueDetails(string queueName, CancellationToken cancellationToken)
static async Task<Queue?> GetFullQueueDetails(IManagementClient managementClient, string queueName, CancellationToken cancellationToken)
{
var retryPolicy = Polly.Policy
.HandleResult<Response<Queue?>>(response => response.Value?.EffectivePolicyDefinition is null)
.WaitAndRetryAsync(
5,
attempt => TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt - 1)),
attempt => TimeSpan.FromMilliseconds(3000 * Math.Pow(2, attempt - 1)),
onRetry: (outcome, timespan, retryCount, context) =>
{
if (outcome.Exception is not null)
{
Logger.Error($"Failed to get {queueName} queue", outcome.Exception);
Logger.Error($"Failed to get {queueName} queue - Attempt #{retryCount}.", outcome.Exception);
}
else if (!outcome.Result.HasValue)
{
Expand All @@ -149,20 +156,16 @@ public async Task ValidateDeliveryLimit(string queueName, CancellationToken canc
return response?.Value?.EffectivePolicyDefinition is not null ? response.Value : null;
}

async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellationToken)
static async Task SetDeliveryLimitViaPolicy(IManagementClient managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken)
{
if (!string.IsNullOrEmpty(queue.AppliedPolicyName))
{
// TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("The {0} queue already has an associated policy.", queue.Name, queue.AppliedPolicyName);
return;
throw new InvalidOperationException($"The {queue.Name} queue already has the '{queue.AppliedPolicyName}' policy applied.");
}

if (BrokerVersion.Major < 4)
if (brokerVersion < RabbitMqVersion4)
{
// TODO: Need logic/config settings for determining which action to take, e.g. should we throw an exception to refuse to start, or just log a warning
Logger.WarnFormat("Cannot override delivery limit on the {0} queue by policy in RabbitMQ versions prior to 4.", queue.Name);
return;
throw new InvalidOperationException($"Cannot override delivery limit on the {queue.Name} queue by policy in RabbitMQ versions prior to 4. Version is {brokerVersion}.");
}

var policy = new ManagementClient.Models.Policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="[2.4.1, 3.0.0)" />
<PackageReference Include="NServiceBus" Version="[9.1.0, 10.0.0)" />
<PackageReference Include="Polly" Version="[8.4.2, 9.0.0)" />
<PackageReference Include="RabbitMQ.Client" Version="[7.0.0-rc.14, 8.0.0)" />
<PackageReference Include="Polly" Version="[8.5.0, 9.0.0)" />
<PackageReference Include="RabbitMQ.Client" Version="[7.0.0, 8.0.0)" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 7 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public PrefetchCountCalculation PrefetchCountCalculation
/// </summary>
public bool UseExternalAuthMechanism { get; set; } = false;

/// <summary>
/// Set this to prevent the transport from using the RabbitMQ Management API.
/// This is not recommended as it can prevent the transport from setting appropriate delivery limits for retry functionality.
/// </summary>
public bool DoNotUseManagementClient { get; set; } = false;

/// <summary>
/// The interval for heartbeats between the endpoint and the broker.
/// </summary>
Expand Down Expand Up @@ -188,7 +194,7 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate,
UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes);

var managementClientFactory = new ManagementClientFactory(ConnectionConfiguration);
var managementClientFactory = DoNotUseManagementClient ? null : new ManagementClientFactory(ConnectionConfiguration);
var brokerVerifier = new BrokerVerifier(connectionFactory, managementClientFactory);
await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false);

Expand Down