diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs index 148d5eec0..2d6203fba 100644 --- a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs @@ -1,15 +1,12 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Net; using System.Threading; using System.Threading.Tasks; using Npgsql; using NServiceBus; using NServiceBus.Transport; -using NServiceBus.Transport.PostgreSql; using NServiceBus.TransportTests; -using NUnit.Framework; using QueueAddress = NServiceBus.Transport.QueueAddress; public class ConfigurePostgreSqlTransportInfrastructure : IConfigureTransportInfrastructure diff --git a/src/NServiceBus.Transport.PostgreSql.UnitTests/ConnectionPoolValidatorTests.cs b/src/NServiceBus.Transport.PostgreSql.UnitTests/ConnectionPoolValidatorTests.cs new file mode 100644 index 000000000..11f70cfc7 --- /dev/null +++ b/src/NServiceBus.Transport.PostgreSql.UnitTests/ConnectionPoolValidatorTests.cs @@ -0,0 +1,56 @@ +namespace NServiceBus.Transport.PostgreSql.UnitTests +{ + using NUnit.Framework; + + [TestFixture] + public class ConnectionPoolValidatorTests + { + [Test] + public void Is_not_validated_when_connection_pooling_not_specified() + { + var result = ConnectionPoolValidator.Validate("Database = xxx"); + + Assert.That(result.IsValid, Is.False); + } + + [Test] + public void Is_validated_when_both_min_and_max_pool_size_is_specified() + { + var result = ConnectionPoolValidator.Validate("Database = xxx; minimum pool size = 20; maximum pool size=120"); + + Assert.That(result.IsValid, Is.True); + } + + [Test] + public void Is_not_validated_when_only_min_pool_size_is_specified() + { + var result = ConnectionPoolValidator.Validate("Database = xxx; Minimum Pool Size = 20;"); + + Assert.That(result.IsValid, Is.False); + } + + [Test] + public void Is_not_validated_when_pooling_is_enabled_and_no_min_and_max_is_set() + { + var result = ConnectionPoolValidator.Validate("Database = xxx; Pooling = true"); + + Assert.That(result.IsValid, Is.False); + } + + [Test] + public void Is_validated_when_pooling_is_disabled() + { + var result = ConnectionPoolValidator.Validate("Database = xxx; Pooling = false"); + + Assert.That(result.IsValid, Is.True); + } + + [Test] + public void Parses_pool_disable_values_with_yes_or_no() + { + var result = ConnectionPoolValidator.Validate("Database = xxx; Pooling = no"); + + Assert.That(result.IsValid, Is.True); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.PostgreSql/Configuration/ConnectionPoolValidator.cs b/src/NServiceBus.Transport.PostgreSql/Configuration/ConnectionPoolValidator.cs new file mode 100644 index 000000000..24cafeda0 --- /dev/null +++ b/src/NServiceBus.Transport.PostgreSql/Configuration/ConnectionPoolValidator.cs @@ -0,0 +1,27 @@ +namespace NServiceBus.Transport.PostgreSql; + +using System; +using System.Data.Common; +using Sql.Shared; + +static class ConnectionPoolValidator +{ + public static ValidationCheckResult Validate(string connectionString) + { + var keys = new DbConnectionStringBuilder { ConnectionString = connectionString }; + var hasPoolingValue = keys.TryGetValue("Pooling", out object poolingValue); + if (hasPoolingValue && !string.Equals(poolingValue.ToString(), "true", StringComparison.InvariantCultureIgnoreCase)) + { + return ValidationCheckResult.Valid(); + } + if (keys.ContainsKey("Maximum Pool Size")) + { + return ValidationCheckResult.Valid(); + } + return ValidationCheckResult.Invalid(ConnectionPoolSizeNotSet); + } + + const string ConnectionPoolSizeNotSet = + "Maximum connection pooling value (Maximum Pool Size=N) is not " + + "configured on the provided connection string. The default value (100) will be used."; +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.PostgreSql/Configuration/PostgreSqlDbConnectionFactory.cs b/src/NServiceBus.Transport.PostgreSql/Configuration/PostgreSqlDbConnectionFactory.cs index 01dba2cfe..af7a841a3 100644 --- a/src/NServiceBus.Transport.PostgreSql/Configuration/PostgreSqlDbConnectionFactory.cs +++ b/src/NServiceBus.Transport.PostgreSql/Configuration/PostgreSqlDbConnectionFactory.cs @@ -19,8 +19,6 @@ public PostgreSqlDbConnectionFactory(string connectionString) { openNewConnection = async cancellationToken => { - ValidateConnectionPool(connectionString); - var connection = new NpgsqlConnection(connectionString); try { diff --git a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs index a09a84b3c..50ed658ae 100644 --- a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs @@ -36,7 +36,7 @@ class PostgreSqlTransportInfrastructure : TransportInfrastructure IDelayedMessageStore delayedMessageStore = new SendOnlyDelayedMessageStore(); PostgreSqlDbConnectionFactory connectionFactory; - static ILog _logger = LogManager.GetLogger(); + static ILog Logger = LogManager.GetLogger(); readonly PostgreSqlExceptionClassifier exceptionClassifier; public PostgreSqlTransportInfrastructure(PostgreSqlTransport transport, HostSettings hostSettings, @@ -72,6 +72,8 @@ public override string ToTransportAddress(Transport.QueueAddress address) { connectionFactory = CreateConnectionFactory(); + await ValidateDatabaseAccess(cancellationToken).ConfigureAwait(false); + addressTranslator = new QueueAddressTranslator("public", transport.DefaultSchema, transport.Schema); tableBasedQueueCache = new TableBasedQueueCache( @@ -192,7 +194,7 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken) if (receiveSetting.PurgeOnStartup) { - _logger.Warn($"The {receiveSetting.PurgeOnStartup} should only be used in the development environment."); + Logger.Warn($"The {receiveSetting.PurgeOnStartup} should only be used in the development environment."); } return new MessageReceiver(transport, receiveSetting.Id, receiveAddress, receiveSetting.ErrorQueue, @@ -201,8 +203,6 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken) subscriptionManager, receiveSetting.PurgeOnStartup, exceptionClassifier); }).ToDictionary(receiver => receiver.Id, receiver => receiver); - await ValidateDatabaseAccess(cancellationToken).ConfigureAwait(false); - var receiveAddresses = Receivers.Values.Select(r => r.ReceiveAddress).ToList(); if (hostSettings.SetupInfrastructure) @@ -246,8 +246,14 @@ async Task TryOpenDatabaseConnection(CancellationToken cancellationToken) { try { - await using (await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false)) + using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false)) { + var result = ConnectionPoolValidator.Validate(connection.ConnectionString); + + if (!result.IsValid) + { + Logger.Warn(result.Message); + } } } catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) diff --git a/src/NServiceBus.Transport.Sql.Shared/Configuration/ConnectionPoolValidator.cs b/src/NServiceBus.Transport.Sql.Shared/Configuration/ConnectionPoolValidator.cs deleted file mode 100644 index a8718dc50..000000000 --- a/src/NServiceBus.Transport.Sql.Shared/Configuration/ConnectionPoolValidator.cs +++ /dev/null @@ -1,28 +0,0 @@ -namespace NServiceBus.Transport.Sql.Shared -{ - using System; - using System.Data.Common; - - - class ConnectionPoolValidator - { - public static ValidationCheckResult Validate(string connectionString) - { - var keys = new DbConnectionStringBuilder { ConnectionString = connectionString }; - var hasPoolingValue = keys.TryGetValue("Pooling", out object poolingValue); - if (hasPoolingValue && !string.Equals(poolingValue.ToString(), "true", StringComparison.InvariantCultureIgnoreCase)) - { - return ValidationCheckResult.Valid(); - } - if (keys.ContainsKey("Max Pool Size")) - { - return ValidationCheckResult.Valid(); - } - return ValidationCheckResult.Invalid(ConnectionPoolSizeNotSet); - } - - const string ConnectionPoolSizeNotSet = - "Maximum connection pooling value (Max Pool Size=N) is not " + - "configured on the provided connection string. The default value (100) will be used."; - } -} \ No newline at end of file diff --git a/src/NServiceBus.Transport.Sql.Shared/Configuration/DbConnectionFactory.cs b/src/NServiceBus.Transport.Sql.Shared/Configuration/DbConnectionFactory.cs index dd70bedf8..a4a67e6c7 100644 --- a/src/NServiceBus.Transport.Sql.Shared/Configuration/DbConnectionFactory.cs +++ b/src/NServiceBus.Transport.Sql.Shared/Configuration/DbConnectionFactory.cs @@ -4,47 +4,16 @@ using System.Threading.Tasks; using System.Threading; using System; -using NServiceBus.Logging; public abstract class DbConnectionFactory { - public DbConnectionFactory(Func> factory) - { - openNewConnection = factory; - } + protected DbConnectionFactory(Func> factory) => openNewConnection = factory; protected DbConnectionFactory() { } - public async Task OpenNewConnection(CancellationToken cancellationToken = default) - { - var connection = await openNewConnection(cancellationToken).ConfigureAwait(false); - - ValidateConnectionPool(connection.ConnectionString); - - return connection; - } - - protected void ValidateConnectionPool(string connectionString) - { - if (hasValidated) - { - return; - } - - var validationResult = ConnectionPoolValidator.Validate(connectionString); - if (!validationResult.IsValid) - { - Logger.Warn(validationResult.Message); - } - - hasValidated = true; - } - - static bool hasValidated; + public Task OpenNewConnection(CancellationToken cancellationToken = default) => openNewConnection(cancellationToken); protected Func> openNewConnection; - - static ILog Logger = LogManager.GetLogger(); } \ No newline at end of file diff --git a/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj b/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj index 6c09d69b3..49d7a2101 100644 --- a/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj +++ b/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj @@ -17,5 +17,6 @@ + diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs index 20ee5dc0f..e18eb0b7f 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using NUnit.Framework; - using Sql.Shared.Queuing; using SqlServer; public class When_checking_schema diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs index 86b3c4b04..16b38b8e1 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs @@ -6,7 +6,6 @@ using System.Transactions; using NUnit.Framework; using Sql.Shared.Queuing; - using Sql.Shared.Receiving; using SqlServer; using Transport; diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_recoverable_column_is_removed.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_recoverable_column_is_removed.cs index 56d3f61ed..797dd3e3e 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_recoverable_column_is_removed.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_recoverable_column_is_removed.cs @@ -4,7 +4,6 @@ namespace NServiceBus.Transport.SqlServer.IntegrationTests using System.Collections.Generic; using System.Data; using System.Data.Common; - using Microsoft.Data.SqlClient; using System.Threading.Tasks; using Extensibility; using NUnit.Framework; diff --git a/src/NServiceBus.Transport.SqlServer.UnitTests/ConnectionPoolValidatorTests.cs b/src/NServiceBus.Transport.SqlServer.UnitTests/ConnectionPoolValidatorTests.cs index fd38d7ee7..e500d14ca 100644 --- a/src/NServiceBus.Transport.SqlServer.UnitTests/ConnectionPoolValidatorTests.cs +++ b/src/NServiceBus.Transport.SqlServer.UnitTests/ConnectionPoolValidatorTests.cs @@ -1,7 +1,6 @@ namespace NServiceBus.Transport.SqlServer.UnitTests { using NUnit.Framework; - using Sql.Shared; [TestFixture] public class ConnectionPoolValidatorTests diff --git a/src/NServiceBus.Transport.SqlServer/Configuration/ConnectionPoolValidator.cs b/src/NServiceBus.Transport.SqlServer/Configuration/ConnectionPoolValidator.cs new file mode 100644 index 000000000..9ccef97a9 --- /dev/null +++ b/src/NServiceBus.Transport.SqlServer/Configuration/ConnectionPoolValidator.cs @@ -0,0 +1,27 @@ +namespace NServiceBus.Transport.SqlServer; + +using System; +using System.Data.Common; +using Sql.Shared; + +static class ConnectionPoolValidator +{ + public static ValidationCheckResult Validate(string connectionString) + { + var keys = new DbConnectionStringBuilder { ConnectionString = connectionString }; + var hasPoolingValue = keys.TryGetValue("Pooling", out object poolingValue); + if (hasPoolingValue && !string.Equals(poolingValue.ToString(), "true", StringComparison.InvariantCultureIgnoreCase)) + { + return ValidationCheckResult.Valid(); + } + if (keys.ContainsKey("Max Pool Size")) + { + return ValidationCheckResult.Valid(); + } + return ValidationCheckResult.Invalid(ConnectionPoolSizeNotSet); + } + + const string ConnectionPoolSizeNotSet = + "Maximum connection pooling value (Max Pool Size=N) is not " + + "configured on the provided connection string. The default value (100) will be used."; +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/Configuration/SqlServerDbConnectionFactory.cs b/src/NServiceBus.Transport.SqlServer/Configuration/SqlServerDbConnectionFactory.cs index c0504eda4..a731b8291 100644 --- a/src/NServiceBus.Transport.SqlServer/Configuration/SqlServerDbConnectionFactory.cs +++ b/src/NServiceBus.Transport.SqlServer/Configuration/SqlServerDbConnectionFactory.cs @@ -14,13 +14,10 @@ public SqlServerDbConnectionFactory(Func> { } - public SqlServerDbConnectionFactory(string connectionString) { openNewConnection = async cancellationToken => { - ValidateConnectionPool(connectionString); - var connection = new SqlConnection(connectionString); try { @@ -46,6 +43,6 @@ public SqlServerDbConnectionFactory(string connectionString) }; } - static ILog Logger = LogManager.GetLogger(); + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs index 6004b8707..e0bcb1e12 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs @@ -44,6 +44,13 @@ public async Task Initialize(CancellationToken cancellationToken = default) } } + var result = ConnectionPoolValidator.Validate(connectionString); + + if (!result.IsValid) + { + Logger.Warn(result.Message); + } + connectionAttributes = ConnectionAttributesParser.Parse(connectionString, transport.DefaultCatalog); addressTranslator = new QueueAddressTranslator(connectionAttributes.Catalog, "dbo", transport.DefaultSchema, transport.SchemaAndCatalog); @@ -320,7 +327,7 @@ async Task TryEscalateToDistributedTransactions(TransactionOptions transactionOp if (!string.IsNullOrWhiteSpace(message)) { - _logger.Warn(message); + Logger.Warn(message); } } } @@ -361,6 +368,6 @@ public override Task Shutdown(CancellationToken cancellationToken = default) Distributed transactions are not available on Linux. The other transaction modes can be used by setting the `SqlServerTransport.TransportTransactionMode` property when configuring the endpoint. Be aware that different transaction modes affect consistency guarantees since distributed transactions won't be atomically updating the resources together with consuming the incoming message."; - static ILog _logger = LogManager.GetLogger(); + static ILog Logger = LogManager.GetLogger(); } }