Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add management connection string API and tests #1519

Merged
merged 13 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
enable-AzPSSession: true
- name: Setup RabbitMQ
uses: Particular/[email protected].0
uses: Particular/[email protected].1
with:
connection-string-name: RabbitMQTransport_ConnectionString
tag: RabbitMQTransport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace NServiceBus
}
public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition
{
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, string managementConnectionString = null) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString = null) { }
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
public bool DoNotUseManagementClient { get; set; }
public System.TimeSpan HeartbeatInterval { get; set; }
Expand All @@ -42,6 +42,8 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> CustomMessageIdStrategy(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> customIdStrategy) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ManagementConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<string> getConnectionString) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ManagementConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, string connectionString) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> PrefetchCount(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, ushort prefetchCount) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> PrefetchMultiplier(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, int prefetchMultiplier) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> SetClientCertificate(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { }
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#nullable enable

namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection
namespace NServiceBus.Transport.RabbitMQ.Tests.ManagementConnection
{
using System;
using System.Collections.Generic;
Expand All @@ -19,6 +19,7 @@ class When_connecting_to_the_rabbitmq_management_api
{
static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";
readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString);
readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString, isManagementConnection: true);
protected QueueType queueType = QueueType.Quorum;
protected string ReceiverQueue => GetTestQueueName("ManagementAPITestQueue");
protected string GetTestQueueName(string queueName) => $"{queueName}-{queueType}";
Expand All @@ -44,7 +45,7 @@ public async Task SetUp()
[Test]
public async Task GetQueue_Should_Return_Queue_Information_When_Exists()
{
var client = new ManagementClient(connectionConfiguration);
var client = new ManagementClient(managementConnectionConfiguration);

var response = await client.GetQueue(ReceiverQueue);

Expand All @@ -59,7 +60,7 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists()
[Test]
public async Task GetOverview_Should_Return_Broker_Information_When_Exists()
{
var client = new ManagementClient(connectionConfiguration);
var client = new ManagementClient(managementConnectionConfiguration);

var response = await client.GetOverview();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public virtual async Task SetUp()
var useTls = connectionString.StartsWith("https", StringComparison.InvariantCultureIgnoreCase) || connectionString.StartsWith("amqps", StringComparison.InvariantCultureIgnoreCase);

var transport = new RabbitMQTransport(RoutingTopology.Conventional(queueType), connectionString);
var connectionConfig = transport.ConnectionConfiguration;
var connectionConfig = transport.BrokerConnectionConfiguration;

connectionFactory = new ConnectionFactory(ReceiverQueue, connectionConfig, null, true, false, transport.HeartbeatInterval, transport.NetworkRecoveryInterval, null);

Expand Down
TravisNickels marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class ManagementClient : IManagementClient

public ManagementClient(ConnectionConfiguration connectionConfiguration)
{
ArgumentNullException.ThrowIfNull(connectionConfiguration, nameof(connectionConfiguration));

virtualHost = connectionConfiguration.VirtualHost;
escapedVirtualHost = Uri.EscapeDataString(virtualHost);

var uriBuilder = new UriBuilder
{
Scheme = connectionConfiguration.UseTls ? "https" : "http",
Host = connectionConfiguration.Host,
Port = 15672 // TODO: fallback to default only if specific details aren't given in config
Port = connectionConfiguration.Port,
};

httpClient = new HttpClient { BaseAddress = uriBuilder.Uri };
Expand Down Expand Up @@ -90,6 +92,8 @@ public ManagementClient(ConnectionConfiguration connectionConfiguration)

public async Task CreatePolicy(Policy policy, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(policy, nameof(policy));

policy.VirtualHost = virtualHost;

var escapedPolicyName = Uri.EscapeDataString(policy.Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
class ConnectionConfiguration
{
const bool defaultUseTls = false;
const int defaultPort = 5672;
const int defaultTlsPort = 5671;
const int defaultBrokerPort = 5672;
const int defaultBrokerTlsPort = 5671;
const int defaultManagementPort = 15672;
const int defaultManagementTlsPort = 15671;
const string defaultVirtualHost = "/";
const string defaultUserName = "guest";
const string defaultPassword = "guest";
Expand Down Expand Up @@ -43,7 +45,7 @@ class ConnectionConfiguration
UseTls = useTls;
}

public static ConnectionConfiguration Create(string connectionString)
public static ConnectionConfiguration Create(string connectionString, bool isManagementConnection = false)
{
Dictionary<string, string> dictionary;
var invalidOptionsMessage = new StringBuilder();
Expand All @@ -59,7 +61,10 @@ public static ConnectionConfiguration Create(string connectionString)

var host = GetValue(dictionary, "host", string.Empty);
var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage);
var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage);
var port = GetValue(dictionary, "port", int.TryParse, useTls ?
(isManagementConnection ? defaultManagementTlsPort : defaultBrokerTlsPort) :
(isManagementConnection ? defaultManagementPort : defaultBrokerPort),
invalidOptionsMessage);
var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost);
var userName = GetValue(dictionary, "userName", defaultUserName);
var password = GetValue(dictionary, "password", defaultPassword);
Expand All @@ -72,6 +77,18 @@ public static ConnectionConfiguration Create(string connectionString)
return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls);
}

public static ConnectionConfiguration ConvertToManagementConnection(ConnectionConfiguration brokerConnectionConfiguration)
{
var virtualHost = brokerConnectionConfiguration.VirtualHost;
var host = brokerConnectionConfiguration.Host;
var port = defaultManagementPort;
var useTls = brokerConnectionConfiguration.UseTls;
var userName = brokerConnectionConfiguration.UserName;
var password = brokerConnectionConfiguration.Password;

return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls);
}

static Dictionary<string, string> ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage)
{
var dictionary = new Dictionary<string, string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,36 @@ public static TransportExtensions<RabbitMQTransport> ConnectionString(this Trans
return transportExtensions;
}

/// <summary>
/// The connection string to use when connecting to the broker management API.
/// </summary>
[PreObsolete("https://github.com/Particular/NServiceBus/issues/6811",
Message = "The configuration has been moved to RabbitMQTransport class.",
Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")]
public static TransportExtensions<RabbitMQTransport> ManagementConnectionString(this TransportExtensions<RabbitMQTransport> transportExtensions, string connectionString)
{
ArgumentNullException.ThrowIfNull(transportExtensions);
ArgumentException.ThrowIfNullOrWhiteSpace(connectionString);

transportExtensions.Transport.LegacyManagementApiConnectionString = connectionString;
return transportExtensions;
}

/// <summary>
/// The connection string to use when connecting to the broker management API.
/// </summary>
[PreObsolete("https://github.com/Particular/NServiceBus/issues/6811",
Message = "The configuration has been moved to RabbitMQTransport class.",
Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")]
public static TransportExtensions<RabbitMQTransport> ManagementConnectionString(this TransportExtensions<RabbitMQTransport> transportExtensions, Func<string> getConnectionString)
{
ArgumentNullException.ThrowIfNull(transportExtensions);
ArgumentNullException.ThrowIfNull(getConnectionString);

transportExtensions.Transport.LegacyManagementApiConnectionString = getConnectionString();
return transportExtensions;
}

/// <summary>
/// Allows the user to control how the message ID is determined. Mostly useful when consuming native messages from non-NServiceBus endpoints.
/// </summary>
Expand Down
Loading