diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2f0f1138e..c916b7b78 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,10 +51,11 @@ jobs: creds: ${{ secrets.AZURE_ACI_CREDENTIALS }} enable-AzPSSession: true - name: Setup RabbitMQ - uses: Particular/setup-rabbitmq-action@v1.7.0 + uses: Particular/setup-rabbitmq-action@v1.7.1 with: connection-string-name: RabbitMQTransport_ConnectionString tag: RabbitMQTransport + image-tag: 4-management registry-username: ${{ secrets.DOCKERHUB_USERNAME }} registry-password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Run tests diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs index a9570ea94..521489736 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs @@ -24,7 +24,8 @@ public async Task Should_fail_to_start() .Run()); Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.quorum-error'")); - Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")); + Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'") // RabbitMQ v3.x + .Or.Contain("received 'classic' but current is 'quorum'")); // RabbitMQ v4.x } class ClassicQueueEndpoint : EndpointConfigurationBuilder diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs index 73750a13f..919c95dc4 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs @@ -25,7 +25,8 @@ public async Task Should_fail_to_start() .Run()); Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'ClassicEndpointUsesQuorumQueue.ClassicQueueEndpoint'")); - Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'")); + Assert.That(exception.Message, Does.Contain("received none but current is the value 'quorum'") // RabbitMQ v3.x + .Or.Contain("received 'classic' but current is 'quorum'")); // RabbitMQ v4.x } class ClassicQueueEndpoint : EndpointConfigurationBuilder diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs index 7909eb905..1fd9da994 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs @@ -24,7 +24,8 @@ public async Task Should_fail_to_start() .Run()); Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'rabbitmq.transport.tests.classic-error'")); - Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'")); + Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x + .Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x } class QuorumQueueEndpoint : EndpointConfigurationBuilder diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs index c5a8aaa65..4b8733263 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs @@ -24,7 +24,8 @@ public async Task Should_fail_to_start() .Run()); Assert.That(exception.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'QuorumEndpointUsesClassicQueue.QuorumQueueEndpoint'")); - Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none")); + Assert.That(exception.Message, Does.Contain("received the value 'quorum' of type 'longstr' but current is none'") // RabbitMQ v3.x + .Or.Contain("received 'quorum' but current is 'classic'")); // RabbitMQ v4.x } class QuorumQueueEndpoint : EndpointConfigurationBuilder diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs index 581df4950..4b98b4c82 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/BrokerConnectionBinder.cs @@ -42,7 +42,7 @@ protected override BrokerConnection GetBoundValue(BindingContext bindingContext) return brokerConnection; } - string GetConnectionString(string? connectionString, string? connectionStringEnv) + static string GetConnectionString(string? connectionString, string? connectionStringEnv) { if (string.IsNullOrWhiteSpace(connectionString)) { diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 01faebeab..a690d0f59 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -18,7 +18,10 @@ namespace NServiceBus { public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { } public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { } + public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, string managementConnectionString) { } + public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) { } public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } + public bool DoNotUseManagementClient { get; set; } public System.TimeSpan HeartbeatInterval { get; set; } public System.Func MessageIdStrategy { get; set; } public System.TimeSpan NetworkRecoveryInterval { get; set; } @@ -41,6 +44,8 @@ namespace NServiceBus public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } + public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs index 3e8cf336e..c3af26cdd 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs @@ -1,20 +1,40 @@ -namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString { using System; + using System.Collections.Generic; + using System.Linq; + using System.Net.Http; + using System.Text; + using System.Threading.Tasks; + using global::RabbitMQ.Client.Exceptions; using NUnit.Framework; using RabbitMQ; [TestFixture] - public class ConnectionConfigurationTests + class ConnectionConfigurationTests { - const string connectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;useTls=true"; + const string FakeConnectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;useTls=true"; + static string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static string ManagementConnectionString => CreateManagementConnectionString(BrokerConnectionString); + + static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); - ConnectionConfiguration defaults = ConnectionConfiguration.Create("host=localhost"); + static readonly ConnectionConfiguration brokerDefaults = ConnectionConfiguration.Create("host=localhost"); + static readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost", isManagementConnection: true); + + static string CreateManagementConnectionString(string connectionString) + { + var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0], parts => parts[1]); + parameters["port"] = "15672"; + return string.Join(";", parameters.Select(kv => $"{kv.Key}={kv.Value}")); + } [Test] public void Should_correctly_parse_full_connection_string() { - var connectionConfiguration = ConnectionConfiguration.Create(connectionString); + var connectionConfiguration = ConnectionConfiguration.Create(FakeConnectionString); Assert.Multiple(() => { @@ -143,8 +163,8 @@ public void Should_list_all_invalid_options() "certPath =/path/to/client/keycert.p12;" + "certPassPhrase = abc123;"; - var exception = Assert.Throws(() => - ConnectionConfiguration.Create(connectionString)); + var exception = Assert.Throws(() => ConnectionConfiguration.Create(connectionString)) + ?? throw new ArgumentNullException("exception"); Assert.That(exception.Message, Does.Contain("Multiple hosts are no longer supported")); Assert.That(exception.Message, Does.Contain("Empty host name in 'host' connection string option.")); @@ -164,31 +184,311 @@ public void Should_list_all_invalid_options() [Test] public void Should_set_default_port() { - Assert.That(defaults.Port, Is.EqualTo(5672)); + Assert.Multiple(() => + { + Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); + Assert.That(managementDefaults.Port, Is.EqualTo(15672)); + }); } [Test] public void Should_set_default_virtual_host() { - Assert.That(defaults.VirtualHost, Is.EqualTo("/")); + Assert.Multiple(() => + { + Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/")); + Assert.That(managementDefaults.VirtualHost, Is.EqualTo("/")); + }); } [Test] public void Should_set_default_username() { - Assert.That(defaults.UserName, Is.EqualTo("guest")); + Assert.Multiple(() => + { + Assert.That(brokerDefaults.UserName, Is.EqualTo("guest")); + Assert.That(managementDefaults.UserName, Is.EqualTo("guest")); + }); } [Test] public void Should_set_default_password() { - Assert.That(defaults.Password, Is.EqualTo("guest")); + Assert.Multiple(() => + { + Assert.That(brokerDefaults.Password, Is.EqualTo("guest")); + Assert.That(managementDefaults.Password, Is.EqualTo("guest")); + }); } [Test] public void Should_set_default_use_tls() { - Assert.That(defaults.UseTls, Is.EqualTo(false)); + Assert.Multiple(() => + { + Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); + Assert.That(managementDefaults.UseTls, Is.EqualTo(false)); + }); + } + + [Test] + public void Should_configure_broker_and_management_connection_configurations_with_single_connection_string() + { + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false"); + + Assert.Multiple(() => + { + Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); + Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); + Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); + Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); + Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); + Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); + + Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("/")); + Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("localhost")); + Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("guest")); + Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("guest")); + Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); // This should be set to the default management port + Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(false)); + }); + } + + [Test] + public void Should_configure_broker_and_management_connection_configurations_with_respective_connection_strings() + { + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false", FakeConnectionString); + + Assert.Multiple(() => + { + Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); + Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); + Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); + Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); + Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); + Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); + + Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("Copa")); + Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("192.168.1.1")); + Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("Copa")); + Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("abc_xyz")); + Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(1234)); + Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(true)); + }); + } + + [Test] + public void Should_throw_on_invalid_management_credentials() + { + var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) + { + UserName = "Copa" + }; + + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); + + var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) + ?? throw new ArgumentNullException("exception"); + + Assert.That(exception.Message, Does.Contain("Could not access RabbitMQ Management API")); + } + + [Test] + public void Should_throw_on_invalid_management_host() + { + var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) + { + Host = "WrongHostName" + }; + + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_broker_connection_string() + { + var invalidBrokerConnection = new FakeConnectionConfiguration(host: "127.0.0.1", userName: "Copa"); + + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), invalidBrokerConnection.ToConnectionString(), ManagementConnectionString); + + var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) + ?? throw new ArgumentNullException("exception"); + + Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); + } + + [Test] + public void Should_throw_on_invalid_legacy_management_credentials() + { + var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) + { + UserName = "Copa" + }; + + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() + }; + + var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + + Assert.That(exception!.Message, Does.Contain("Could not access RabbitMQ Management API")); + } + + [Test] + public void Should_throw_on_invalid_legacy_management_host() + { + var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) + { + Host = "WrongHostName" + }; + + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_legacy_broker_connection_string() + { + var invalidBrokerConnection = new FakeConnectionConfiguration(host: "localhost", port: "5672", virtualHost: "/", userName: "Copa", password: "guest", useTls: "false"); + + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = invalidBrokerConnection.ToConnectionString(), + LegacyManagementApiConnectionString = ManagementConnectionString + }; + + var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) + ?? throw new ArgumentNullException("exception"); + + Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); + } + + [Test] + public void Should_connect_to_management_api_with_broker_credentials() + { + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString); + + Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public async Task Should_set_default_port_values_for_broker_and_management_connections() + { + var validConnectionWithoutPort = new FakeConnectionConfiguration(BrokerConnectionString) + { + Port = null + }; + + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), validConnectionWithoutPort.ToConnectionString()); + + _ = await transport.Initialize(HostSettings, [], []); + + Assert.Multiple(() => + { + Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); + Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); + }); + } + + [Test] + public void Should_not_throw_when_DoNotUseManagementClient_is_enabled_and_management_connection_is_invalid() + { + var invalidManagementConnection = new FakeConnectionConfiguration(host: "Copa"); + + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()) + { + DoNotUseManagementClient = true + }; + + Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + public class FakeConnectionConfiguration + { + internal string Host { get; set; } + + internal string? Port { get; set; } + + internal string? VirtualHost { get; set; } + + internal string? UserName { get; set; } + + internal string? Password { get; set; } + + internal string? UseTls { get; set; } + + internal FakeConnectionConfiguration( + string host, + string? port = null, + string? virtualHost = null, + string? userName = null, + string? password = null, + string? useTls = null) + { + Host = host; + Port = port; + VirtualHost = virtualHost; + UserName = userName; + Password = password; + UseTls = useTls; + } + + internal FakeConnectionConfiguration(string connectionString) + { + var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0].ToLower(), parts => parts[1]); + + Host = parameters["host"]; + Port = GetParameterValue(parameters, "port"); + VirtualHost = GetParameterValue(parameters, "virtualhost"); + UserName = GetParameterValue(parameters, "username"); + Password = GetParameterValue(parameters, "password"); + UseTls = GetParameterValue(parameters, "usetls"); + } + + static string? GetParameterValue(Dictionary parameters, string key) => parameters.TryGetValue(key, out var value) ? value : null; + + internal string ToConnectionString() + { + var sb = new StringBuilder(); + _ = sb.Append($"{nameof(Host)}={Host}"); + + if (!string.IsNullOrEmpty(VirtualHost)) + { + _ = sb.Append($";{nameof(VirtualHost)}={VirtualHost}"); + } + if (!string.IsNullOrEmpty(Port)) + { + _ = sb.Append($";{nameof(Port)}={Port}"); + } + if (!string.IsNullOrEmpty(UserName)) + { + _ = sb.Append($";{nameof(UserName)}={UserName}"); + } + if (!string.IsNullOrEmpty(Password)) + { + _ = sb.Append($";{nameof(Password)}={Password}"); + } + if (!string.IsNullOrEmpty(UseTls)) + { + _ = sb.Append($";{nameof(UseTls)}={UseTls}"); + } + return sb.ToString(); + } } } -} +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/FeatureFlagListExtensions.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/FeatureFlagListExtensions.cs new file mode 100644 index 000000000..eba5ac045 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/FeatureFlagListExtensions.cs @@ -0,0 +1,11 @@ +namespace NServiceBus.Transport.RabbitMQ.Tests; + +using System; +using System.Linq; +using NServiceBus.Transport.RabbitMQ.ManagementClient; + +static class FeatureFlagListExtensions +{ + public static bool Contains(this FeatureFlagList featureFlagList, string featureName) => + featureFlagList.Any(featureFlag => featureFlag.Name.Equals(featureName, StringComparison.OrdinalIgnoreCase)); +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs new file mode 100644 index 000000000..cede74f2e --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -0,0 +1,125 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using System.Collections.Generic; + using System.Net; + using System.Threading.Tasks; + using NServiceBus.Transport.RabbitMQ.ManagementClient; + using NUnit.Framework; + using NUnit.Framework.Internal; + using ConnectionFactory = ConnectionFactory; + + + [TestFixture] + class ManagementClientTests + { + static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); + static readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString, isManagementConnection: true); + static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); + static readonly ManagementClient client = new(managementConnectionConfiguration); + + [Test] + public async Task GetQueue_Should_Return_Queue_Information_When_Exists() + { + // Arrange + var queueName = nameof(GetQueue_Should_Return_Queue_Information_When_Exists); + await CreateQuorumQueue(queueName).ConfigureAwait(false); + + // Act + var response = await client.GetQueue(queueName); + + // Assert + Assert.Multiple(() => + { + Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); + Assert.That(response.Value, Is.Not.Null); + Assert.That(response.Value?.Name, Is.EqualTo(queueName)); + }); + } + + [Test] + public async Task GetOverview_Should_Return_Broker_Information() + { + // Act + var response = await client.GetOverview(); + + // Assert + Assert.Multiple(() => + { + Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); + Assert.That(response.Value, Is.Not.Null); + Assert.That(response.Value?.ProductName, Is.EqualTo("RabbitMQ")); + Assert.That(response.Value?.ManagementVersion.Major, Is.InRange(3, 4)); + Assert.That(response.Value?.ProductVersion.Major, Is.InRange(3, 4)); + Assert.That(response.Value?.RabbitMqVersion.Major, Is.InRange(3, 4)); + }); + } + + [Test] + public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() + { + // Act + var response = await client.GetFeatureFlags(); + + // Assert + Assert.Multiple(() => + { + Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); + Assert.That(response.Value, Is.Not.Null); + Assert.That(response.Value, Is.Not.Empty); + Assert.That(response.Value?.Contains(FeatureFlags.QuorumQueue), Is.True); + }); + } + + [Test] + [TestCase(-1)] + [TestCase(200)] + public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues(int deliveryLimit) + { + // Arrange + var queueName = nameof(CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues); + var policyName = $"{queueName} policy"; + await CreateQuorumQueue(queueName); + + // Act + var policy = new Policy + { + ApplyTo = PolicyTarget.QuorumQueues, + Definition = new PolicyDefinition + { + DeliveryLimit = deliveryLimit + }, + Name = policyName, + Pattern = queueName, + Priority = 100 + }; + await client.CreatePolicy(policy); + + // Assert + + // It can take some time for updated policies to be applied, so we need to wait. + // If this test is randomly failing, consider increasing the delay + await Task.Delay(10000); + var response = await client.GetQueue(queueName); + Assert.Multiple(() => + { + Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); + Assert.That(response.Value, Is.Not.Null); + Assert.That(response.Value?.AppliedPolicyName, Is.EqualTo(policyName)); + Assert.That(response.Value?.EffectivePolicyDefinition?.DeliveryLimit, Is.EqualTo(deliveryLimit)); + }); + } + + static async Task CreateQuorumQueue(string queueName) + { + using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); + var arguments = new Dictionary { { "x-queue-type", "quorum" } }; + + _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + } + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj index bbae26ae5..8b22a02c3 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj @@ -1,4 +1,4 @@ - + net8.0 diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs index a2bc8be4c..d2a47527c 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs @@ -21,7 +21,7 @@ public virtual async Task SetUp() var useTls = connectionString.StartsWith("https", StringComparison.InvariantCultureIgnoreCase) || connectionString.StartsWith("amqps", StringComparison.InvariantCultureIgnoreCase); var transport = new RabbitMQTransport(RoutingTopology.Conventional(queueType), connectionString); - var connectionConfig = transport.ConnectionConfiguration; + var connectionConfig = transport.BrokerConnectionConfiguration; connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfig, null, true, false, transport.HeartbeatInterval, transport.NetworkRecoveryInterval, null); diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs new file mode 100644 index 000000000..99730d95c --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs @@ -0,0 +1,181 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ; + +using System; +using System.Threading; +using System.Threading.Tasks; +using ManagementClient; +using NServiceBus.Logging; +using Polly; + +class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ConnectionConfiguration connectionConfiguration) +{ + static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier)); + static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0"); + static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0"); + + readonly ManagementClient.ManagementClient managementClient = new(connectionConfiguration); + + Version? brokerVersion; + + public async Task Initialize(CancellationToken cancellationToken = default) + { + if (managementClientAvailable) + { + var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false); + if (response.HasValue) + { + brokerVersion = response.Value.RabbitMqVersion; + return; + } + + throw new InvalidOperationException($"Could not access RabbitMQ Management API. ({response.StatusCode}: {response.Reason})"); + } + + using var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false); + brokerVersion = connection.GetBrokerVersion(); + + if (brokerVersion >= RabbitMqVersion4) + { + Logger.Warn("Use of RabbitMQ Management API has been disabled." + + "The transport will not be able to override the default delivery limit on each queue " + + "which is necessary in order to guarantee that messages are not lost after repeated retries."); + } + } + + Version BrokerVersion + { + get + { + if (brokerVersion == null) + { + throw new InvalidOperationException($"Need to call Initialize before accessing {nameof(BrokerVersion)} property"); + } + + return brokerVersion; + } + } + + public async Task VerifyRequirements(CancellationToken cancellationToken = default) + { + if (BrokerVersion < MinimumSupportedRabbitMqVersion) + { + throw new Exception($"An unsupported broker version was detected: {BrokerVersion}. The broker must be at least version {MinimumSupportedRabbitMqVersion}."); + } + + bool streamsEnabled; + if (managementClientAvailable) + { + var response = await managementClient.GetFeatureFlags(cancellationToken).ConfigureAwait(false); + streamsEnabled = response.HasValue && response.Value.HasEnabledFeature(FeatureFlags.StreamQueue); + } + else + { + var connection = await connectionFactory.CreateAdministrationConnection(cancellationToken).ConfigureAwait(false); + streamsEnabled = await connection.TryCreateStream(cancellationToken).ConfigureAwait(false); + } + + if (!streamsEnabled) + { + throw new Exception("An unsupported broker configuration was detected. The 'stream_queue' feature flag needs to be enabled."); + } + } + + public async Task ValidateDeliveryLimit(string queueName, CancellationToken cancellationToken = default) + { + if (!managementClientAvailable) + { + return; + } + + var queue = await GetFullQueueDetails(managementClient, queueName, cancellationToken).ConfigureAwait(false) + ?? throw new InvalidOperationException($"Could not retrieve full queue details for {queueName}."); + + if (ShouldOverrideDeliveryLimit(queue)) + { + await SetDeliveryLimitViaPolicy(managementClient, queue, BrokerVersion, cancellationToken).ConfigureAwait(false); + } + } + + bool ShouldOverrideDeliveryLimit(Queue queue) + { + if (BrokerVersion < RabbitMqVersion4) + { + return false; + } + + if (queue.DeliveryLimit == -1) + { + return false; + } + + if (queue.Arguments.DeliveryLimit.HasValue && queue.Arguments.DeliveryLimit != -1) + { + throw new InvalidOperationException($"The delivery limit for {queue.Name} is set to {queue.Arguments.DeliveryLimit} by a queue argument. " + + "This can interfere with the transport's retry implementation"); + } + + if (queue.EffectivePolicyDefinition!.DeliveryLimit.HasValue && queue.EffectivePolicyDefinition.DeliveryLimit != -1) + { + throw new InvalidOperationException($"The RabbitMQ policy {queue.AppliedPolicyName} " + + $"is setting delivery limit to {queue.EffectivePolicyDefinition.DeliveryLimit} for {queue.Name}."); + } + + return true; + } + + static async Task GetFullQueueDetails(ManagementClient.ManagementClient managementClient, string queueName, CancellationToken cancellationToken) + { + var retryPolicy = Polly.Policy + .HandleResult>(response => response.Value?.EffectivePolicyDefinition is null) + .WaitAndRetryAsync( + 5, + attempt => TimeSpan.FromMilliseconds(3000 * Math.Pow(2, attempt - 1)), + onRetry: (outcome, timespan, retryCount, context) => + { + if (outcome.Exception is not null) + { + Logger.Error($"Failed to get {queueName} queue - Attempt #{retryCount}.", outcome.Exception); + } + else if (!outcome.Result.HasValue) + { + var response = outcome.Result; + Logger.WarnFormat("Could not get queue details for {0} - Attempt #{1}. ({2}: {3})", queueName, retryCount, response.StatusCode, response.Reason); + } + else + { + var response = outcome.Result; + Logger.WarnFormat("Did not receive full queue details for {0} - Attempt #{1})", queueName, retryCount); + } + }); + + var response = await retryPolicy.ExecuteAsync(() => managementClient.GetQueue(queueName, cancellationToken)).ConfigureAwait(false); + + return response?.Value?.EffectivePolicyDefinition is not null ? response.Value : null; + } + + static async Task SetDeliveryLimitViaPolicy(ManagementClient.ManagementClient managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken) + { + if (!string.IsNullOrEmpty(queue.AppliedPolicyName)) + { + throw new InvalidOperationException($"The {queue.Name} queue already has the '{queue.AppliedPolicyName}' policy applied."); + } + + if (brokerVersion < RabbitMqVersion4) + { + throw new InvalidOperationException($"Cannot override delivery limit on the {queue.Name} queue by policy in RabbitMQ versions prior to 4. Version is {brokerVersion}."); + } + + var policy = new ManagementClient.Policy + { + Name = $"nsb.{queue.Name}.delivery-limit", + ApplyTo = PolicyTarget.QuorumQueues, + Definition = new PolicyDefinition { DeliveryLimit = -1 }, + Pattern = queue.Name, + Priority = 100 + }; + + await managementClient.CreatePolicy(policy, cancellationToken).ConfigureAwait(false); + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/DeliveryLimitConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/DeliveryLimitConverter.cs new file mode 100644 index 000000000..44bd575a0 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/DeliveryLimitConverter.cs @@ -0,0 +1,34 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +class DeliveryLimitConverter : JsonConverter +{ + public override int Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.Number) + { + return reader.GetInt32(); + } + else if (reader.TokenType == JsonTokenType.String) + { + var value = reader.GetString(); + if (string.Equals(value, "unlimited", StringComparison.OrdinalIgnoreCase)) + { + return -1; + } + + throw new JsonException($"Unexpected string value for `delivery-limit` - {value}"); + } + else + { + throw new JsonException($"Expected `delivery-limit` to be either a Number or the String `unlimited`, not a {reader.TokenType}"); + } + } + + public override void Write(Utf8JsonWriter writer, int value, JsonSerializerOptions options) => writer.WriteNumberValue(value); +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/FeatureFlagStateConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/FeatureFlagStateConverter.cs new file mode 100644 index 000000000..7e9b346b4 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/FeatureFlagStateConverter.cs @@ -0,0 +1,16 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +class FeatureFlagEnabledConverter : JsonConverter +{ + public override bool Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => + reader.GetString()?.Equals("enabled", StringComparison.OrdinalIgnoreCase) ?? false; + + public override void Write(Utf8JsonWriter writer, bool value, JsonSerializerOptions options) => + writer.WriteStringValue(value ? "enabled" : "disabled"); +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/PolicyTargetConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/PolicyTargetConverter.cs new file mode 100644 index 000000000..a339ac39e --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/PolicyTargetConverter.cs @@ -0,0 +1,40 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +class PolicyTargetConverter : JsonConverter +{ + public override PolicyTarget Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return value switch + { + "all" => PolicyTarget.All, + "queues" => PolicyTarget.Queues, + "classic_queues" => PolicyTarget.ClassicQueues, + "quorum_queues" => PolicyTarget.QuorumQueues, + "streams" => PolicyTarget.Streams, + "exchanges" => PolicyTarget.Exchanges, + _ => throw new JsonException($"Unknown PolicyTarget: {value}") + }; + } + + public override void Write(Utf8JsonWriter writer, PolicyTarget policyTarget, JsonSerializerOptions options) + { + var value = policyTarget switch + { + PolicyTarget.All => "all", + PolicyTarget.Queues => "queues", + PolicyTarget.ClassicQueues => "classic_queues", + PolicyTarget.QuorumQueues => "quorum_queues", + PolicyTarget.Streams => "streams", + PolicyTarget.Exchanges => "exchanges", + _ => throw new ArgumentOutOfRangeException(nameof(policyTarget), $"PolicyTarget value out of range: {policyTarget}") + }; + writer.WriteStringValue(value); + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/QueueTypeConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/QueueTypeConverter.cs new file mode 100644 index 000000000..ba0f6842d --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/QueueTypeConverter.cs @@ -0,0 +1,34 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +class QueueTypeConverter : JsonConverter +{ + public override QueueType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return value switch + { + "quorum" => QueueType.Quorum, + "classic" => QueueType.Classic, + "stream" => QueueType.Stream, + _ => throw new JsonException($"Unknown QueueType: {value}") + }; + } + + public override void Write(Utf8JsonWriter writer, QueueType queueType, JsonSerializerOptions options) + { + var value = queueType switch + { + QueueType.Quorum => "quorum", + QueueType.Classic => "classic", + QueueType.Stream => "stream", + _ => throw new ArgumentOutOfRangeException(nameof(queueType), $"QueueType value out of range: {queueType}") + }; + writer.WriteStringValue(value); + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/VersionConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/VersionConverter.cs new file mode 100644 index 000000000..c8b11dca6 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Converters/VersionConverter.cs @@ -0,0 +1,24 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +class VersionConverter : JsonConverter +{ + public override Version Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString() ?? throw new JsonException("Missing version value"); + + if (Version.TryParse(value, out var version)) + { + return version; + } + + throw new JsonException($"Invalid version value {value}"); + } + + public override void Write(Utf8JsonWriter writer, Version version, JsonSerializerOptions options) => writer.WriteStringValue(version.ToString()); +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/HttpStatusCodeExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/HttpStatusCodeExtensions.cs new file mode 100644 index 000000000..b33313f6d --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/HttpStatusCodeExtensions.cs @@ -0,0 +1,10 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Net; + +static class HttpStatusCodeExtensions +{ + public static bool IsSuccessStatusCode(this HttpStatusCode statusCode) => (int)statusCode is >= 200 and <= 299; +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs new file mode 100644 index 000000000..ae05e9684 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -0,0 +1,104 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Net.Http.Json; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +class ManagementClient +{ + readonly HttpClient httpClient; + readonly string virtualHost; + readonly string escapedVirtualHost; + + public ManagementClient(ConnectionConfiguration connectionConfiguration) + { + ArgumentNullException.ThrowIfNull(connectionConfiguration, nameof(connectionConfiguration)); + + virtualHost = connectionConfiguration.VirtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + + var uriBuilder = new UriBuilder + { + Scheme = connectionConfiguration.UseTls ? "https" : "http", + Host = connectionConfiguration.Host, + Port = connectionConfiguration.Port, + }; + + httpClient = new HttpClient { BaseAddress = uriBuilder.Uri }; + httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( + "Basic", + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{connectionConfiguration.UserName}:{connectionConfiguration.Password}"))); + } + + public async Task> GetQueue(string queueName, CancellationToken cancellationToken = default) + { + Queue? value = null; + + var escapedQueueName = Uri.EscapeDataString(queueName); + var response = await httpClient.GetAsync($"api/queues/{escapedVirtualHost}/{escapedQueueName}", cancellationToken) + .ConfigureAwait(false); + + if (response.IsSuccessStatusCode) + { + value = await response.Content.ReadFromJsonAsync(cancellationToken).ConfigureAwait(false); + } + + return new Response( + response.StatusCode, + response.ReasonPhrase ?? string.Empty, + value); + } + + public async Task> GetOverview(CancellationToken cancellationToken = default) + { + Overview? value = null; + + var response = await httpClient.GetAsync($"api/overview", cancellationToken).ConfigureAwait(false); + + if (response.IsSuccessStatusCode) + { + value = await response.Content.ReadFromJsonAsync(cancellationToken).ConfigureAwait(false); + } + + return new Response( + response.StatusCode, + response.ReasonPhrase ?? string.Empty, + value); + } + + public async Task> GetFeatureFlags(CancellationToken cancellationToken = default) + { + FeatureFlagList? value = null; + + var response = await httpClient.GetAsync($"api/feature-flags", cancellationToken).ConfigureAwait(false); + + if (response.IsSuccessStatusCode) + { + value = await response.Content.ReadFromJsonAsync(cancellationToken).ConfigureAwait(false); + } + + return new Response( + response.StatusCode, + response.ReasonPhrase ?? string.Empty, + value); + } + + public async Task CreatePolicy(Policy policy, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(policy, nameof(policy)); + + policy.VirtualHost = virtualHost; + + var escapedPolicyName = Uri.EscapeDataString(policy.Name); + var response = await httpClient.PutAsJsonAsync($"api/policies/{escapedVirtualHost}/{escapedPolicyName}", policy, cancellationToken) + .ConfigureAwait(false); + + response.EnsureSuccessStatusCode(); + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlag.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlag.cs new file mode 100644 index 000000000..29cd289e1 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlag.cs @@ -0,0 +1,25 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +// This is to prevent Fody throwing an error on classes with `required` properties (since the compiler marks the default constructor with an `[Obsolete]` attribute) +// https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-11.0/required-members#metadata-representation +[method: DoNotWarnAboutObsoleteUsage] +class FeatureFlag() +{ + [JsonRequired] + [JsonPropertyName("name")] + public required string Name { get; set; } + + [JsonConverter(typeof(FeatureFlagEnabledConverter))] + [JsonPropertyName("state")] + public bool IsEnabled { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlagList.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlagList.cs new file mode 100644 index 000000000..03d7ba62a --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlagList.cs @@ -0,0 +1,14 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Collections.Generic; +using System.Linq; + +class FeatureFlagList : List +{ + public bool HasEnabledFeature(string featureName) => + this.Any(feature => feature.Name.Equals(featureName, StringComparison.OrdinalIgnoreCase) && feature.IsEnabled); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlags.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlags.cs new file mode 100644 index 000000000..2699f24f9 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/FeatureFlags.cs @@ -0,0 +1,17 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +static class FeatureFlags +{ + public const string DetailedQueuesEndpoints = "detailed_queues_endpoint"; + + public const string KhepriDatabase = "khepri_db"; + + public const string QuorumQueue = "quorum_queue"; + + public const string StreamFiltering = "stream_filtering"; + + public const string StreamQueue = "stream_queue"; +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Overview.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Overview.cs new file mode 100644 index 000000000..8e0fdc425 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Overview.cs @@ -0,0 +1,39 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +// This is to prevent Fody throwing an error on classes with `required` properties (since the compiler marks the default constructor with an `[Obsolete]` attribute) +// https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-11.0/required-members#metadata-representation +[method: DoNotWarnAboutObsoleteUsage] +class Overview() +{ + [JsonPropertyName("product_name")] + public required string ProductName { get; set; } + + [JsonConverter(typeof(VersionConverter))] + [JsonPropertyName("management_version")] + public required Version ManagementVersion { get; set; } + + [JsonConverter(typeof(VersionConverter))] + [JsonPropertyName("product_version")] + public required Version ProductVersion { get; set; } + + [JsonConverter(typeof(VersionConverter))] + [JsonPropertyName("rabbitmq_version")] + public required Version RabbitMqVersion { get; set; } + + [JsonPropertyName("cluster_name")] + public required string ClusterName { get; set; } + + [JsonPropertyName("node")] + public required string Node { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Policy.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Policy.cs new file mode 100644 index 000000000..32da4acaf --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Policy.cs @@ -0,0 +1,36 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +// This is to prevent Fody throwing an error on classes with `required` properties (since the compiler marks the default constructor with an `[Obsolete]` attribute) +// https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-11.0/required-members#metadata-representation +[method: DoNotWarnAboutObsoleteUsage] +class Policy() +{ + [JsonPropertyName("vhost")] + public string VirtualHost { get; set; } = "/"; + + [JsonPropertyName("name")] + public required string Name { get; set; } + + [JsonPropertyName("pattern")] + public required string Pattern { get; set; } + + [JsonConverter(typeof(PolicyTargetConverter))] + [JsonPropertyName("apply-to")] + public required PolicyTarget ApplyTo { get; set; } + + [JsonPropertyName("definition")] + public required PolicyDefinition Definition { get; set; } + + [JsonPropertyName("priority")] + public int Priority { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyDefinition.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyDefinition.cs new file mode 100644 index 000000000..3763f3d5c --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyDefinition.cs @@ -0,0 +1,18 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +class PolicyDefinition +{ + [JsonPropertyName("delivery-limit")] + [JsonConverter(typeof(DeliveryLimitConverter))] + public int? DeliveryLimit { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyTarget.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyTarget.cs new file mode 100644 index 000000000..8a7e82df0 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/PolicyTarget.cs @@ -0,0 +1,11 @@ +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +enum PolicyTarget +{ + All, + Queues, + ClassicQueues, + QuorumQueues, + Streams, + Exchanges, +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs new file mode 100644 index 000000000..9fdd72001 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs @@ -0,0 +1,37 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +// This is to prevent Fody throwing an error on classes with `required` properties (since the compiler marks the default constructor with an `[Obsolete]` attribute) +// https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-11.0/required-members#metadata-representation +[method: DoNotWarnAboutObsoleteUsage] +class Queue() +{ + [JsonRequired] + [JsonPropertyName("name")] + public required string Name { get; set; } + + [JsonRequired] + [JsonPropertyName("arguments")] + public required QueueArguments Arguments { get; set; } + + [JsonPropertyName("delivery_limit")] + [JsonConverter(typeof(DeliveryLimitConverter))] + public int DeliveryLimit { get; set; } + + [JsonPropertyName("effective_policy_definition")] + public PolicyDefinition? EffectivePolicyDefinition { get; set; } + + [JsonPropertyName("policy")] + public string? AppliedPolicyName { get; set; } + + [JsonPropertyName("operator_policy")] + public string? AppliedOperatorPolicyName { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs new file mode 100644 index 000000000..ecc99d1be --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs @@ -0,0 +1,22 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +class QueueArguments +{ + [JsonPropertyName("x-queue-type")] + [JsonConverter(typeof(QueueTypeConverter))] + public QueueType? QueueType { get; set; } + + [JsonPropertyName("x-delivery-limit")] + [JsonConverter(typeof(DeliveryLimitConverter))] + public int? DeliveryLimit { get; set; } + + [JsonExtensionData] + public IDictionary ExtraProperties { get; } = new Dictionary(); +} + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueType.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueType.cs new file mode 100644 index 000000000..6959e5eec --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueType.cs @@ -0,0 +1,10 @@ +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +// Note that this is different to `NServiceBus.QueueType` which lists the types of queues supported +// by the NServiceBus transport, which doesn't include the `Stream` value +enum QueueType +{ + Classic, + Quorum, + Stream +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Response.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Response.cs new file mode 100644 index 000000000..51d0a9c6d --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Response.cs @@ -0,0 +1,12 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.ManagementClient; + +using System.Diagnostics.CodeAnalysis; +using System.Net; + +record Response(HttpStatusCode StatusCode, string Reason, T Value) +{ + [MemberNotNullWhen(true, nameof(Value))] + public bool HasValue => StatusCode.IsSuccessStatusCode(); +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs index 3eca08219..80fe920e8 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs @@ -9,8 +9,10 @@ class ConnectionConfiguration { const bool defaultUseTls = false; - const int defaultPort = 5672; - const int defaultTlsPort = 5671; + const int defaultBrokerPort = 5672; + const int defaultBrokerTlsPort = 5671; + const int defaultManagementPort = 15672; + const int defaultManagementTlsPort = 15671; const string defaultVirtualHost = "/"; const string defaultUserName = "guest"; const string defaultPassword = "guest"; @@ -43,7 +45,7 @@ class ConnectionConfiguration UseTls = useTls; } - public static ConnectionConfiguration Create(string connectionString) + public static ConnectionConfiguration Create(string connectionString, bool isManagementConnection = false) { Dictionary dictionary; var invalidOptionsMessage = new StringBuilder(); @@ -59,7 +61,10 @@ public static ConnectionConfiguration Create(string connectionString) var host = GetValue(dictionary, "host", string.Empty); var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage); - var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage); + var port = GetValue(dictionary, "port", int.TryParse, useTls ? + (isManagementConnection ? defaultManagementTlsPort : defaultBrokerTlsPort) : + (isManagementConnection ? defaultManagementPort : defaultBrokerPort), + invalidOptionsMessage); var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost); var userName = GetValue(dictionary, "userName", defaultUserName); var password = GetValue(dictionary, "password", defaultPassword); @@ -72,6 +77,18 @@ public static ConnectionConfiguration Create(string connectionString) return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); } + public static ConnectionConfiguration ConvertToManagementConnection(ConnectionConfiguration brokerConnectionConfiguration) + { + var virtualHost = brokerConnectionConfiguration.VirtualHost; + var host = brokerConnectionConfiguration.Host; + var port = defaultManagementPort; + var useTls = brokerConnectionConfiguration.UseTls; + var userName = brokerConnectionConfiguration.UserName; + var password = brokerConnectionConfiguration.Password; + + return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); + } + static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage) { var dictionary = new Dictionary(); diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs index 4f1dc4341..6227c20f9 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs @@ -96,6 +96,36 @@ public static TransportExtensions ConnectionString(this Trans return transportExtensions; } + /// + /// The connection string to use when connecting to the broker management API. + /// + [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", + Message = "The configuration has been moved to RabbitMQTransport class.", + Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] + public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, string connectionString) + { + ArgumentNullException.ThrowIfNull(transportExtensions); + ArgumentException.ThrowIfNullOrWhiteSpace(connectionString); + + transportExtensions.Transport.LegacyManagementApiConnectionString = connectionString; + return transportExtensions; + } + + /// + /// The connection string to use when connecting to the broker management API. + /// + [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", + Message = "The configuration has been moved to RabbitMQTransport class.", + Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] + public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, Func getConnectionString) + { + ArgumentNullException.ThrowIfNull(transportExtensions); + ArgumentNullException.ThrowIfNull(getConnectionString); + + transportExtensions.Transport.LegacyManagementApiConnectionString = getConnectionString(); + return transportExtensions; + } + /// /// Allows the user to control how the message ID is determined. Mostly useful when consuming native messages from non-NServiceBus endpoints. /// diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionExtensions.cs index 44f39792f..80f00630d 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionExtensions.cs @@ -11,9 +11,8 @@ namespace NServiceBus.Transport.RabbitMQ static class ConnectionExtensions { - public static async Task VerifyBrokerRequirements(this IConnection connection, CancellationToken cancellationToken = default) + public static Version GetBrokerVersion(this IConnection connection) { - var minimumBrokerVersion = Version.Parse("3.10.0"); var versionValue = connection.ServerProperties?["version"]; var versionBytes = Array.Empty(); @@ -24,11 +23,33 @@ public static async Task VerifyBrokerRequirements(this IConnection connection, C var brokerVersionString = Encoding.UTF8.GetString(versionBytes); - if (Version.TryParse(brokerVersionString, out var brokerVersion) && brokerVersion < minimumBrokerVersion) + if (!Version.TryParse(brokerVersionString, out var brokerVersion)) + { + throw new FormatException($"Could not parse broker version: {brokerVersion}"); + } + + return brokerVersion; + } + + public static async Task VerifyBrokerRequirements(this IConnection connection, CancellationToken cancellationToken = default) + { + var minimumBrokerVersion = Version.Parse("3.10.0"); + + var brokerVersion = connection.GetBrokerVersion(); + if (brokerVersion < minimumBrokerVersion) { throw new Exception($"An unsupported broker version was detected: {brokerVersion}. The broker must be at least version {minimumBrokerVersion}."); } + var streamsEnabled = await connection.TryCreateStream(cancellationToken).ConfigureAwait(false); + if (!streamsEnabled) + { + throw new Exception("An unsupported broker configuration was detected. The 'stream_queue' feature flag needs to be enabled."); + } + } + + public static async Task TryCreateStream(this IConnection connection, CancellationToken cancellationToken = default) + { using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); var arguments = new Dictionary { { "x-queue-type", "stream" } }; @@ -36,10 +57,11 @@ public static async Task VerifyBrokerRequirements(this IConnection connection, C try { await channel.QueueDeclareAsync("nsb.v2.verify-stream-flag-enabled", true, false, false, arguments, cancellationToken: cancellationToken).ConfigureAwait(false); + return true; } catch (Exception ex) when (!ex.IsCausedBy(cancellationToken) && ex.Message.Contains("the corresponding feature flag is disabled")) { - throw new Exception("An unsupported broker configuration was detected. The 'stream_queue' feature flag needs to be enabled."); + return false; } } } diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj index 776b7b27c..dc6e9a2f6 100644 --- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj +++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj @@ -10,6 +10,7 @@ + diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 501b6e71e..4c2b1d16e 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -21,6 +21,7 @@ public class RabbitMQTransport : TransportDefinition Func messageIdStrategy = MessageConverter.DefaultMessageIdStrategy; PrefetchCountCalculation prefetchCountCalculation = maxConcurrency => 3 * maxConcurrency; TimeSpan timeToWaitBeforeTriggeringCircuitBreaker = TimeSpan.FromMinutes(2); + X509Certificate2Collection certCollection = null; readonly List<(string hostName, int port, bool useTls)> additionalClusterNodes = []; @@ -30,6 +31,17 @@ public class RabbitMQTransport : TransportDefinition /// The routing topology to use. /// The connection string to use when connecting to the broker. public RabbitMQTransport(RoutingTopology routingTopology, string connectionString) + : this(routingTopology, connectionString, null) + { + } + + /// + /// Creates a new instance of the RabbitMQ transport. + /// + /// The routing topology to use. + /// The connection string to use when connecting to the broker. + /// The connection string to use when connecting to the management API + public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: true, supportsPublishSubscribe: true, @@ -39,7 +51,11 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin ArgumentNullException.ThrowIfNull(connectionString); RoutingTopology = routingTopology.Create(); - ConnectionConfiguration = ConnectionConfiguration.Create(connectionString); + BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); + + ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? + ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : + ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); } /// @@ -49,6 +65,18 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin /// The connection string to use when connecting to the broker. /// Should the delayed delivery infrastructure be created by the endpoint public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) + : this(routingTopology, connectionString, enableDelayedDelivery, null) + { + } + + /// + /// Creates a new instance of the RabbitMQ transport. + /// + /// The routing topology to use. + /// The connection string to use when connecting to the broker. + /// Should the delayed delivery infrastructure be created by the endpoint + /// The connection string to use when connecting to the management API + public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: enableDelayedDelivery, supportsPublishSubscribe: true, @@ -58,10 +86,15 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin ArgumentNullException.ThrowIfNull(connectionString); RoutingTopology = routingTopology.Create(); - ConnectionConfiguration = ConnectionConfiguration.Create(connectionString); + BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); + ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? + ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : + ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); } - internal ConnectionConfiguration ConnectionConfiguration { get; set; } + internal ConnectionConfiguration BrokerConnectionConfiguration { get; set; } + + internal ConnectionConfiguration ManagementConnectionConfiguration { get; set; } internal IRoutingTopology RoutingTopology { get; set; } @@ -93,8 +126,8 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker } /// - /// Gets or sets the action that allows customization of the native - /// just before it is dispatched to the rabbitmq client. + /// Gets or sets the action that allows customization of the native + /// just before it is dispatched to the rabbitmq client. /// /// /// @@ -136,6 +169,12 @@ public PrefetchCountCalculation PrefetchCountCalculation /// public bool UseExternalAuthMechanism { get; set; } = false; + /// + /// Set this to prevent the transport from using the RabbitMQ Management API. + /// This is not recommended as it can prevent the transport from setting appropriate delivery limits for retry functionality. + /// + public bool DoNotUseManagementClient { get; set; } = false; + /// /// The interval for heartbeats between the endpoint and the broker. /// @@ -192,17 +231,11 @@ public void AddClusterNode(string hostName, int port, bool useTls) public override async Task Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default) { ValidateAndApplyLegacyConfiguration(); - - X509Certificate2Collection certCollection = null; - - if (ClientCertificate != null) - { - certCollection = new X509Certificate2Collection(ClientCertificate); - } + ValidateAndApplyCertCollections(); var connectionFactory = new ConnectionFactory( hostSettings.Name, - ConnectionConfiguration, + BrokerConnectionConfiguration, certCollection, !ValidateRemoteCertificate, UseExternalAuthMechanism, @@ -211,6 +244,15 @@ public override async Task Initialize(HostSettings host additionalClusterNodes ); + // Uses the legacy Management API connection string or default to the RabbitMQ broker connection credentials + if (!string.IsNullOrEmpty(LegacyManagementApiConnectionString)) + { + ManagementConnectionConfiguration = ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true); + } + + var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, ManagementConnectionConfiguration); + await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false); + var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology); await channelProvider.CreateConnection(cancellationToken).ConfigureAwait(false); @@ -223,6 +265,7 @@ public override async Task Initialize(HostSettings host RoutingTopology, channelProvider, converter, + brokerVerifier, OutgoingNativeMessageCustomization, TimeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation, @@ -238,6 +281,9 @@ public override async Task Initialize(HostSettings host return infra; } + void ValidateAndApplyCertCollections() => certCollection ??= ClientCertificate != null + ? new X509Certificate2Collection(ClientCertificate) : null; + /// public override IReadOnlyCollection GetSupportedTransactionModes() => new[] { TransportTransactionMode.ReceiveOnly }; @@ -245,6 +291,8 @@ public override async Task Initialize(HostSettings host internal string LegacyApiConnectionString { get; set; } + internal string LegacyManagementApiConnectionString { get; set; } + internal Func TopologyFactory { get; set; } internal bool UseDurableExchangesAndQueues { get; set; } = true; @@ -263,19 +311,37 @@ void ValidateAndApplyLegacyConfiguration() return; } + VaildateTopologyFactory(); + ValidateConnectionString(); + + RoutingTopology = TopologyFactory(UseDurableExchangesAndQueues); + BrokerConnectionConfiguration = ConnectionConfiguration.Create(LegacyApiConnectionString); + + // Uses the legacy management API connection string or build the string from the legacy broker connection configuration + ManagementConnectionConfiguration = !string.IsNullOrEmpty(LegacyManagementApiConnectionString) ? + ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true) : + ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration); + } + + void VaildateTopologyFactory() + { if (TopologyFactory == null) { throw new Exception("A routing topology must be configured with one of the 'EndpointConfiguration.UseTransport().UseXXXXRoutingTopology()` methods. Most new projects should use the Conventional routing topology."); } + } - RoutingTopology = TopologyFactory(UseDurableExchangesAndQueues); - + void ValidateConnectionString() + { if (string.IsNullOrEmpty(LegacyApiConnectionString)) { throw new Exception("A connection string must be configured with 'EndpointConfiguration.UseTransport().ConnectionString()` method."); } - ConnectionConfiguration = ConnectionConfiguration.Create(LegacyApiConnectionString); + if (!DoNotUseManagementClient && string.IsNullOrEmpty(LegacyManagementApiConnectionString)) + { + throw new Exception("A management API connection string must be configured with 'EndpointConfiguration.UseTransport().ManagementConnectionString()` method."); + } } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs index a86c7d0ee..e3ec93d35 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs @@ -11,6 +11,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure { readonly ConnectionFactory connectionFactory; readonly ChannelProvider channelProvider; + readonly BrokerVerifier brokerVerifier; readonly IRoutingTopology routingTopology; readonly TimeSpan networkRecoveryInterval; readonly bool supportsDelayedDelivery; @@ -18,6 +19,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, ConnectionFactory connectionFactory, IRoutingTopology routingTopology, ChannelProvider channelProvider, MessageConverter messageConverter, + BrokerVerifier brokerVerifier, Action messageCustomization, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation, TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery) @@ -25,6 +27,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting this.connectionFactory = connectionFactory; this.routingTopology = routingTopology; this.channelProvider = channelProvider; + this.brokerVerifier = brokerVerifier; this.networkRecoveryInterval = networkRecoveryInterval; this.supportsDelayedDelivery = supportsDelayedDelivery; @@ -36,8 +39,11 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting IMessageReceiver CreateMessagePump(HostSettings hostSettings, ReceiveSettings settings, MessageConverter messageConverter, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation) { var consumerTag = $"{hostSettings.HostDisplayName} - {hostSettings.Name}"; - var receiveAddress = ToTransportAddress(settings.ReceiveAddress); - return new MessagePump(settings, connectionFactory, routingTopology, messageConverter, consumerTag, channelProvider, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation, hostSettings.CriticalErrorAction, networkRecoveryInterval); + + return new MessagePump( + settings, connectionFactory, routingTopology, messageConverter, + consumerTag, channelProvider, brokerVerifier, + timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation, hostSettings.CriticalErrorAction, networkRecoveryInterval); } internal async Task SetupInfrastructure(string[] sendingQueues, CancellationToken cancellationToken = default) diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs index 2893e0a18..74c132367 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs @@ -22,6 +22,7 @@ sealed partial class MessagePump : IMessageReceiver readonly MessageConverter messageConverter; readonly string consumerTag; readonly ChannelProvider channelProvider; + readonly BrokerVerifier brokerVerifier; readonly TimeSpan timeToWaitBeforeTriggeringCircuitBreaker; readonly QueuePurger queuePurger; readonly PrefetchCountCalculation prefetchCountCalculation; @@ -50,10 +51,10 @@ public MessagePump( MessageConverter messageConverter, string consumerTag, ChannelProvider channelProvider, + BrokerVerifier brokerVerifier, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation, - Action criticalErrorAction, + Action criticalErrorAction, TimeSpan retryDelay) { this.settings = settings; @@ -61,6 +62,7 @@ public MessagePump( this.messageConverter = messageConverter; this.consumerTag = consumerTag; this.channelProvider = channelProvider; + this.brokerVerifier = brokerVerifier; this.timeToWaitBeforeTriggeringCircuitBreaker = timeToWaitBeforeTriggeringCircuitBreaker; this.prefetchCountCalculation = prefetchCountCalculation; this.criticalErrorAction = criticalErrorAction; @@ -93,6 +95,8 @@ public async Task Initialize(PushRuntimeSettings limitations, OnMessage onMessag { await queuePurger.Purge(ReceiveAddress, cancellationToken).ConfigureAwait(false); } + + await brokerVerifier.ValidateDeliveryLimit(ReceiveAddress, cancellationToken).ConfigureAwait(false); } public async Task StartReceive(CancellationToken cancellationToken = default)