Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add management connection string API and tests #1519

Merged
merged 13 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace NServiceBus
}
public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition
{
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, string managementConnectionString = null) { }
public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString = null) { }
public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; }
public bool DoNotUseManagementClient { get; set; }
public System.TimeSpan HeartbeatInterval { get; set; }
Expand All @@ -42,6 +42,8 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> CustomMessageIdStrategy(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> customIdStrategy) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ManagementConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Func<string> getConnectionString) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> ManagementConnectionString(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, string connectionString) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> PrefetchCount(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, ushort prefetchCount) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> PrefetchMultiplier(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, int prefetchMultiplier) { }
public static NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> SetClientCertificate(this NServiceBus.TransportExtensions<NServiceBus.RabbitMQTransport> transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using global::RabbitMQ.Client.Exceptions;
using NUnit.Framework;
using RabbitMQ;

[TestFixture]
public class ConnectionConfigurationTests
class ConnectionConfigurationTests
{
const string connectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;useTls=true";
protected string ReceiverQueue => GetTestQueueName("testreceiver");
protected string ErrorQueue => GetTestQueueName("error");
protected string GetTestQueueName(string queueName) => $"{queueName}-{queueType}";
protected IList<string> AdditionalReceiverQueues = [];
protected string[] SendingAddresses => [.. AdditionalReceiverQueues, ErrorQueue];
protected QueueType queueType = QueueType.Quorum;

protected HostSettings HostSettings => new(ReceiverQueue, ReceiverQueue, new StartupDiagnosticEntries(), (_, _, _) => { }, true);
protected ReceiveSettings[] ReceiveSettings =>
[
new ReceiveSettings( ReceiverQueue, new QueueAddress(ReceiverQueue), true, true, ErrorQueue)
];

readonly ConnectionConfiguration brokerDefaults = ConnectionConfiguration.Create("host=localhost");
readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost", isManagementConnection: true);

ConnectionConfiguration defaults = ConnectionConfiguration.Create("host=localhost");

[Test]
public void Should_correctly_parse_full_connection_string()
Expand Down Expand Up @@ -164,31 +181,198 @@ public void Should_list_all_invalid_options()
[Test]
public void Should_set_default_port()
{
Assert.That(defaults.Port, Is.EqualTo(5672));
Assert.Multiple(() =>
{
Assert.That(brokerDefaults.Port, Is.EqualTo(5672));
Assert.That(managementDefaults.Port, Is.EqualTo(15672));
});
}

[Test]
public void Should_set_default_virtual_host()
{
Assert.That(defaults.VirtualHost, Is.EqualTo("/"));
Assert.Multiple(() =>
{
Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/"));
Assert.That(managementDefaults.VirtualHost, Is.EqualTo("/"));
});
}

[Test]
public void Should_set_default_username()
{
Assert.That(defaults.UserName, Is.EqualTo("guest"));
Assert.Multiple(() =>
{
Assert.That(brokerDefaults.UserName, Is.EqualTo("guest"));
Assert.That(managementDefaults.UserName, Is.EqualTo("guest"));
});
}

[Test]
public void Should_set_default_password()
{
Assert.That(defaults.Password, Is.EqualTo("guest"));
Assert.Multiple(() =>
{
Assert.That(brokerDefaults.Password, Is.EqualTo("guest"));
Assert.That(managementDefaults.Password, Is.EqualTo("guest"));
});
}

[Test]
public void Should_set_default_use_tls()
{
Assert.That(defaults.UseTls, Is.EqualTo(false));
Assert.Multiple(() =>
{
Assert.That(brokerDefaults.UseTls, Is.EqualTo(false));
Assert.That(managementDefaults.UseTls, Is.EqualTo(false));
});
}

[Test]
public void Should_configure_broker_and_management_connection_configurations_with_single_connection_string()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the following tests are actually connecting to RabbitMQ, I'm wondering if they should be moved to the acceptance tests assembly.

Copy link
Member Author

@TravisNickels TravisNickels Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't 100% sure about this either. However, the RabbitMQContext.cs also connects to RabbitMQ, but it looks like only the tests that start with When_ are inheriting RabbitMQContext. which seem to all be acceptance tests, I think? However, they are still in the same Tests project. I could separate them into a separate file that starts with When_ to indicate that they are acceptance tests. I don't know if that is a standard naming convention for acceptances tests, but it seems right.

{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false");

Assert.Multiple(() =>
{
Assert.That(transport.ConnectionConfiguration.VirtualHost, Is.EqualTo("/"));
Assert.That(transport.ConnectionConfiguration.Host, Is.EqualTo("localhost"));
Assert.That(transport.ConnectionConfiguration.UserName, Is.EqualTo("guest"));
Assert.That(transport.ConnectionConfiguration.Password, Is.EqualTo("guest"));
Assert.That(transport.ConnectionConfiguration.Port, Is.EqualTo(5672));
Assert.That(transport.ConnectionConfiguration.UseTls, Is.EqualTo(false));

Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("/"));
Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("localhost"));
Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("guest"));
Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("guest"));
Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); // This should be set to the default management port
Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(false));
});
}

[Test]
public void Should_configure_broker_and_management_connection_configurations_with_respective_connection_strings()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false", connectionString);

Assert.Multiple(() =>
{
Assert.That(transport.ConnectionConfiguration.VirtualHost, Is.EqualTo("/"));
Assert.That(transport.ConnectionConfiguration.Host, Is.EqualTo("localhost"));
Assert.That(transport.ConnectionConfiguration.UserName, Is.EqualTo("guest"));
Assert.That(transport.ConnectionConfiguration.Password, Is.EqualTo("guest"));
Assert.That(transport.ConnectionConfiguration.Port, Is.EqualTo(5672));
Assert.That(transport.ConnectionConfiguration.UseTls, Is.EqualTo(false));

Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("Copa"));
Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("192.168.1.1"));
Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("Copa"));
Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("abc_xyz"));
Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(1234));
Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(true));
});
}

[Test]
public void Should_throw_on_invalid_management_connection_string()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;username=guest;host=localhost;password=guest;port=5672;useTls=false", "host=127.0.0.1;username=Copa");

var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false));

Assert.That(exception.Message, Does.Contain("Could not access RabbitMQ Management API"));
}

[Test]
public void Should_throw_on_invalid_broker_connection_string()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "host=127.0.0.1;username=Copa", "virtualHost=/;username=guest;host=localhost;password=guest;port=15672;useTls=false");

var exception = Assert.ThrowsAsync<BrokerUnreachableException>(async () => await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false));

Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable"));
}

[Test]
public void Should_throw_on_invalid_legacy_management_connection_string()
{
// Create transport in legacy mode
var transport = new RabbitMQTransport
{
TopologyFactory = durable => new ConventionalRoutingTopology(durable, queueType),
LegacyApiConnectionString = "virtualHost=/;username=guest;host=localhost;password=guest;port=5672;useTls=false",
LegacyManagementApiConnectionString = "host=127.0.0.1;username=Copa"
};

var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false));

Assert.That(exception.Message, Does.Contain("Could not access RabbitMQ Management API"));
}

[Test]
public void Should_throw_on_invalid_legacy_broker_connection_string()
{
// Create transport in legacy mode
var transport = new RabbitMQTransport
{
TopologyFactory = durable => new ConventionalRoutingTopology(durable, queueType),
LegacyApiConnectionString = "virtualHost=/;username=Copa;host=localhost;password=guest;port=5672;useTls=false",
LegacyManagementApiConnectionString = "host=127.0.0.1;username=guest"
};

var exception = Assert.ThrowsAsync<BrokerUnreachableException>(async () => await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false));

Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable"));

}

[Test]
public async Task Should_connect_to_management_api_with_broker_credentials()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;username=guest;host=localhost;password=guest;port=5672;useTls=false");

var infra = await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false);

Assert.Multiple(() =>
{
Assert.That(transport.ConnectionConfiguration.Port, Is.EqualTo(5672));
Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672));
});

}

[Test]
public async Task Should_set_default_port_values_for_broker_and_management_connections()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "host=localhost");

_ = await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false);

Assert.Multiple(() =>
{
Assert.That(transport.ConnectionConfiguration.Port, Is.EqualTo(5672));
Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672));
});

}

[Test]
public async Task Should_not_throw_when_DoNotUseManagementClient_is_enabled_and_management_connection_is_invalid()
{
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "host=localhost", "host=Copa")
{
DoNotUseManagementClient = true
};

_ = await transport.Initialize(HostSettings, ReceiveSettings, SendingAddresses).ConfigureAwait(false);

Assert.Multiple(() =>
{
Assert.That(transport.ConnectionConfiguration.Port, Is.EqualTo(5672));
Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672));
});

}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#nullable enable

namespace NServiceBus.Transport.RabbitMQ.Tests.Connection.ManagementConnection
namespace NServiceBus.Transport.RabbitMQ.Tests.ManagementConnection
{
using System;
using System.Collections.Generic;
Expand All @@ -19,6 +19,7 @@ class When_connecting_to_the_rabbitmq_management_api
{
static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";
readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString);
readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString, isManagementConnection: true);
protected QueueType queueType = QueueType.Quorum;
protected string ReceiverQueue => GetTestQueueName("ManagementAPITestQueue");
protected string GetTestQueueName(string queueName) => $"{queueName}-{queueType}";
Expand All @@ -44,7 +45,7 @@ public async Task SetUp()
[Test]
public async Task GetQueue_Should_Return_Queue_Information_When_Exists()
{
var client = new ManagementClient(connectionConfiguration);
var client = new ManagementClient(managementConnectionConfiguration);

var response = await client.GetQueue(ReceiverQueue);

Expand All @@ -59,7 +60,7 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists()
[Test]
public async Task GetOverview_Should_Return_Broker_Information_When_Exists()
{
var client = new ManagementClient(connectionConfiguration);
var client = new ManagementClient(managementConnectionConfiguration);

var response = await client.GetOverview();

Expand Down
TravisNickels marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NServiceBus.Transport.RabbitMQ.Administration.ManagementClient;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -17,24 +18,44 @@ class ManagementClient : IManagementClient
readonly string virtualHost;
readonly string escapedVirtualHost;

public ManagementClient(ConnectionConfiguration connectionConfiguration)
public ManagementClient(ConnectionConfiguration connectionConfiguration, X509Certificate2Collection? managementCertCollection = null)
{
if (connectionConfiguration == null)
{
throw new ArgumentNullException(nameof(connectionConfiguration));
}

TravisNickels marked this conversation as resolved.
Show resolved Hide resolved
virtualHost = connectionConfiguration.VirtualHost;
escapedVirtualHost = Uri.EscapeDataString(virtualHost);

var handler = new HttpClientHandler();
if (connectionConfiguration.UseTls)
{
ConfigureSsl(handler, managementCertCollection);
}

var uriBuilder = new UriBuilder
{
Scheme = connectionConfiguration.UseTls ? "https" : "http",
Host = connectionConfiguration.Host,
Port = 15672 // TODO: fallback to default only if specific details aren't given in config
Port = connectionConfiguration.Port,
};

httpClient = new HttpClient { BaseAddress = uriBuilder.Uri };
httpClient = new HttpClient(handler) { BaseAddress = uriBuilder.Uri };
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(
"Basic",
Convert.ToBase64String(Encoding.ASCII.GetBytes($"{connectionConfiguration.UserName}:{connectionConfiguration.Password}")));
}

void ConfigureSsl(HttpClientHandler handler, X509Certificate2Collection? managementCertCollection)
{
if (managementCertCollection != null)
{
handler.ClientCertificates.AddRange(managementCertCollection);
}
handler.SslProtocols = System.Security.Authentication.SslProtocols.Tls13;
}

public async Task<Response<Queue?>> GetQueue(string queueName, CancellationToken cancellationToken = default)
{
Queue? value = null;
Expand Down Expand Up @@ -90,7 +111,12 @@ public ManagementClient(ConnectionConfiguration connectionConfiguration)

public async Task CreatePolicy(Policy policy, CancellationToken cancellationToken = default)
{
policy.VirtualHost = virtualHost;
if (policy.Name == null)
{
throw new ArgumentNullException(nameof(policy.Name));
}

TravisNickels marked this conversation as resolved.
Show resolved Hide resolved
policy.VirtualHost = Uri.EscapeDataString(virtualHost);
abparticular marked this conversation as resolved.
Show resolved Hide resolved

var escapedPolicyName = Uri.EscapeDataString(policy.Name);
var response = await httpClient.PutAsJsonAsync($"api/policies/{escapedVirtualHost}/{escapedPolicyName}", policy, cancellationToken)
Expand Down
Loading
Loading