diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs index 2d6203fba..4dedc06c6 100644 --- a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs @@ -11,12 +11,11 @@ public class ConfigurePostgreSqlTransportInfrastructure : IConfigureTransportInfrastructure { - public TransportDefinition CreateTransportDefinition() - { - connectionString = Environment.GetEnvironmentVariable("PostgreSqlTransportConnectionString") ?? @"User ID=user;Password=admin;Host=localhost;Port=54320;Database=nservicebus;Pooling=true;Connection Lifetime=0;"; + public static string ConnectionString => + Environment.GetEnvironmentVariable("PostgreSqlTransportConnectionString") ?? + @"User ID=user;Password=admin;Host=localhost;Port=54320;Database=nservicebus;Pooling=true;Connection Lifetime=0;"; - return new PostgreSqlTransport(connectionString); - } + public TransportDefinition CreateTransportDefinition() => new PostgreSqlTransport(ConnectionString); public async Task Configure(TransportDefinition transportDefinition, HostSettings hostSettings, QueueAddress queueAddress, string errorQueueName, CancellationToken cancellationToken = default) { @@ -48,7 +47,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default) return; } - if (string.IsNullOrWhiteSpace(connectionString)) + if (string.IsNullOrWhiteSpace(ConnectionString)) { return; } @@ -64,7 +63,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default) queues.Add(delayedDeliveryQueueName); } - using var conn = new NpgsqlConnection(connectionString); + using var conn = new NpgsqlConnection(ConnectionString); await conn.OpenAsync(cancellationToken).ConfigureAwait(false); foreach (var queue in queues.Where(q => !string.IsNullOrWhiteSpace(q))) @@ -91,7 +90,6 @@ public async Task Cleanup(CancellationToken cancellationToken = default) } } - string connectionString; string inputQueueName; string errorQueueName; PostgreSqlTransport postgreSqlTransport; diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBus.Transport.PostgreSql.TransportTests.csproj b/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBus.Transport.PostgreSql.TransportTests.csproj index 3c7024d08..24f36e2b8 100644 --- a/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBus.Transport.PostgreSql.TransportTests.csproj +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBus.Transport.PostgreSql.TransportTests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/When_receive_takes_long_to_complete.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/When_receive_takes_long_to_complete.cs new file mode 100644 index 000000000..15145a23e --- /dev/null +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/When_receive_takes_long_to_complete.cs @@ -0,0 +1,123 @@ +#pragma warning disable PS0018 +namespace NServiceBus.TransportTests; + +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Transport; +using Transport.PostgreSql; + +//HINT: This test operates on the lower level than the transport tests because we need to verify +// internal behavior of the transport. In this case the peek behavior that is not exposed by the transport seam. +// Therefore we are not using the transport base class. +public class When_receive_takes_long_to_complete +{ + [TestCase(TransportTransactionMode.None)] + [TestCase(TransportTransactionMode.ReceiveOnly)] + [TestCase(TransportTransactionMode.SendsAtomicWithReceive)] + [TestCase(TransportTransactionMode.TransactionScope)] + public async Task Peeker_should_provide_accurate_queue_length_estimate(TransportTransactionMode transactionMode) + { + await SendAMessage(connectionFactory, queue); + await SendAMessage(connectionFactory, queue); + + var (txStarted, txFinished, txCompletionSource) = SpawnALongRunningReceiveTransaction(); + + await txStarted; + + int peekCount; + + await using (var connection = await connectionFactory.OpenNewConnection()) + { + var transaction = await connection.BeginTransactionAsync(); + + queue.FormatPeekCommand(); + peekCount = await queue.TryPeek(connection, transaction, null); + } + + txCompletionSource.SetResult(); + await txFinished; + + Assert.That(peekCount, Is.EqualTo(1), "A long running receive transaction should not skew the estimation for number of messages in the queue."); + } + + static async Task CreateATestQueue(PostgreSqlDbConnectionFactory connectionFactory) + { + var queueName = "queue_length_estimation_test"; + + var sqlConstants = new PostgreSqlConstants(); + + var queue = new PostgreSqlTableBasedQueue(sqlConstants, queueName, queueName, false); + + var addressTranslator = new QueueAddressTranslator("public", null, new QueueSchemaOptions()); + var queueCreator = new QueueCreator(sqlConstants, connectionFactory, addressTranslator.Parse, false); + + await queueCreator.CreateQueueIfNecessary(new[] { queueName }, null); + + await using var connection = await connectionFactory.OpenNewConnection(); + await queue.Purge(connection); + + return queue; + } + + static async Task SendAMessage(PostgreSqlDbConnectionFactory connectionFactory, PostgreSqlTableBasedQueue queue) + { + await using var connection = await connectionFactory.OpenNewConnection(); + var transaction = await connection.BeginTransactionAsync(); + + await queue.Send( + new OutgoingMessage(Guid.NewGuid().ToString(), [], Array.Empty()), + TimeSpan.MaxValue, connection, transaction); + + await transaction.CommitAsync(); + } + + (Task, Task, TaskCompletionSource) SpawnALongRunningReceiveTransaction() + { + var started = new TaskCompletionSource(); + var cancellationTokenSource = new TaskCompletionSource(); + + var task = Task.Run(async () => + { + await using var connection = await connectionFactory.OpenNewConnection(); + var transaction = await connection.BeginTransactionAsync(); + + await queue.TryReceive(connection, transaction); + + started.SetResult(); + + await cancellationTokenSource.Task; + }); + + return (started.Task, task, cancellationTokenSource); + } + + [SetUp] + public async Task Setup() + { + connectionFactory = new PostgreSqlDbConnectionFactory(ConfigurePostgreSqlTransportInfrastructure.ConnectionString); + + queue = await CreateATestQueue(connectionFactory); + } + + [TearDown] + public async Task TearDown() + { + if (queue == null) + { + return; + } + + await using var connection = await connectionFactory.OpenNewConnection(CancellationToken.None); + await using var comm = connection.CreateCommand(); + + comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " + + $"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";"; + + await comm.ExecuteNonQueryAsync(CancellationToken.None); + } + + PostgreSqlTableBasedQueue queue; + PostgreSqlDbConnectionFactory connectionFactory; +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.PostgreSql/PostgreSqlConstants.cs b/src/NServiceBus.Transport.PostgreSql/PostgreSqlConstants.cs index 8c0d22f31..a9f4c3bec 100644 --- a/src/NServiceBus.Transport.PostgreSql/PostgreSqlConstants.cs +++ b/src/NServiceBus.Transport.PostgreSql/PostgreSqlConstants.cs @@ -53,7 +53,8 @@ SELECT now() AT TIME ZONE 'UTC' as UtcNow, Due as NextDue ORDER BY Due LIMIT 1 FOR UPDATE SKIP LOCKED"; public string PeekText { get; set; } = @" -SELECT COALESCE(cast(max(seq) - min(seq) + 1 AS int), 0) Id FROM {0}"; +SELECT COALESCE(cast((SELECT seq FROM {0} ORDER BY seq DESC LIMIT 1 FOR UPDATE SKIP LOCKED) +- (SELECT seq FROM {0} ORDER BY seq ASC LIMIT 1 FOR UPDATE SKIP LOCKED) + 1 AS int), 0);"; public string AddMessageBodyStringColumn { get; set; } = @" DO $$ diff --git a/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj b/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj index 49d7a2101..7b1d50a90 100644 --- a/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj +++ b/src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj @@ -1,4 +1,4 @@ - + net8.0 diff --git a/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs index 50d7650d4..34156054e 100644 --- a/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs @@ -9,12 +9,11 @@ public class ConfigureSqlServerTransportInfrastructure : IConfigureTransportInfrastructure { - public TransportDefinition CreateTransportDefinition() - { - connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString") ?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True;TrustServerCertificate=true"; + public static string ConnectionString => + Environment.GetEnvironmentVariable("SqlServerTransportConnectionString") + ?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True;TrustServerCertificate=true"; - return new SqlServerTransport(connectionString); - } + public TransportDefinition CreateTransportDefinition() => new SqlServerTransport(ConnectionString); public async Task Configure(TransportDefinition transportDefinition, HostSettings hostSettings, QueueAddress queueAddress, string errorQueueName, CancellationToken cancellationToken = default) { @@ -51,7 +50,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default) return; } - if (string.IsNullOrWhiteSpace(connectionString) == false) + if (string.IsNullOrWhiteSpace(ConnectionString) == false) { var queues = new[] { @@ -60,7 +59,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default) sqlServerTransport.Testing.DelayedDeliveryQueue }; - using (var conn = new SqlConnection(connectionString)) + using (var conn = new SqlConnection(ConnectionString)) { await conn.OpenAsync(cancellationToken); @@ -80,7 +79,6 @@ public async Task Cleanup(CancellationToken cancellationToken = default) } } - string connectionString; string inputQueueName; string errorQueueName; SqlServerTransport sqlServerTransport; diff --git a/src/NServiceBus.Transport.SqlServer.TransportTests/NServiceBus.Transport.SqlServer.TransportTests.csproj b/src/NServiceBus.Transport.SqlServer.TransportTests/NServiceBus.Transport.SqlServer.TransportTests.csproj index 703995baf..8df3cf86e 100644 --- a/src/NServiceBus.Transport.SqlServer.TransportTests/NServiceBus.Transport.SqlServer.TransportTests.csproj +++ b/src/NServiceBus.Transport.SqlServer.TransportTests/NServiceBus.Transport.SqlServer.TransportTests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/NServiceBus.Transport.SqlServer.TransportTests/When_receive_takes_long_to_complete.cs b/src/NServiceBus.Transport.SqlServer.TransportTests/When_receive_takes_long_to_complete.cs new file mode 100644 index 000000000..d3123fa41 --- /dev/null +++ b/src/NServiceBus.Transport.SqlServer.TransportTests/When_receive_takes_long_to_complete.cs @@ -0,0 +1,124 @@ +#pragma warning disable PS0018 +namespace NServiceBus.TransportTests; + +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Transport; +using Transport.SqlServer; + +//HINT: This test operates on the lower level than the transport tests because we need to verify +// internal behavior of the transport. In this case the peek behavior that is not exposed by the transport seam. +// Therefore we are not using the transport base class. +public class When_receive_takes_long_to_complete +{ + [TestCase(TransportTransactionMode.None)] + [TestCase(TransportTransactionMode.ReceiveOnly)] + [TestCase(TransportTransactionMode.SendsAtomicWithReceive)] + [TestCase(TransportTransactionMode.TransactionScope)] + public async Task Peeker_should_provide_accurate_queue_length_estimate(TransportTransactionMode transactionMode) + { + queue = await CreateATestQueue(connectionFactory); + + await SendAMessage(connectionFactory, queue); + await SendAMessage(connectionFactory, queue); + + var (txStarted, txFinished, txCompletionSource) = SpawnALongRunningReceiveTransaction(connectionFactory, queue); + + await txStarted; + + int peekCount; + + using (var connection = await connectionFactory.OpenNewConnection()) + { + var transaction = await connection.BeginTransactionAsync(); + + queue.FormatPeekCommand(); + peekCount = await queue.TryPeek(connection, transaction, null); + } + + txCompletionSource.SetResult(); + await txFinished; + + Assert.That(peekCount, Is.EqualTo(1), "A long running receive transaction should not skew the estimation for number of messages in the queue."); + } + + static async Task CreateATestQueue(SqlServerDbConnectionFactory connectionFactory) + { + var queueName = "queue_length_estimation_test"; + + var sqlConstants = new SqlServerConstants(); + + var queue = new SqlTableBasedQueue(sqlConstants, queueName, queueName, false); + + var addressTranslator = new QueueAddressTranslator("nservicebus", "dbo", null, null); + var queueCreator = new QueueCreator(sqlConstants, connectionFactory, addressTranslator.Parse, false); + + await queueCreator.CreateQueueIfNecessary(new[] { queueName }, null); + + await using var connection = await connectionFactory.OpenNewConnection(); + await queue.Purge(connection); + + return queue; + } + + static async Task SendAMessage(SqlServerDbConnectionFactory connectionFactory, SqlTableBasedQueue queue) + { + await using var connection = await connectionFactory.OpenNewConnection(); + var transaction = await connection.BeginTransactionAsync(); + + await queue.Send( + new OutgoingMessage(Guid.NewGuid().ToString(), [], Array.Empty()), + TimeSpan.MaxValue, connection, transaction); + + await transaction.CommitAsync(); + } + + (Task, Task, TaskCompletionSource) SpawnALongRunningReceiveTransaction(SqlServerDbConnectionFactory connectionFactory, SqlTableBasedQueue queue) + { + var started = new TaskCompletionSource(); + var cancellationTokenSource = new TaskCompletionSource(); + + var task = Task.Run(async () => + { + await using var connection = await connectionFactory.OpenNewConnection(); + var transaction = await connection.BeginTransactionAsync(); + + await queue.TryReceive(connection, transaction); + + started.SetResult(); + + await cancellationTokenSource.Task; + }); + + return (started.Task, task, cancellationTokenSource); + } + + [SetUp] + public async Task Setup() + { + connectionFactory = new SqlServerDbConnectionFactory(ConfigureSqlServerTransportInfrastructure.ConnectionString); + + queue = await CreateATestQueue(connectionFactory); + } + + [TearDown] + public async Task TearDown() + { + if (queue == null) + { + return; + } + + await using var connection = await connectionFactory.OpenNewConnection(CancellationToken.None); + await using var comm = connection.CreateCommand(); + + comm.CommandText = $"IF OBJECT_ID('{queue}', 'U') IS NOT NULL DROP TABLE {queue}"; + + await comm.ExecuteNonQueryAsync(CancellationToken.None); + } + + SqlTableBasedQueue queue; + SqlServerDbConnectionFactory connectionFactory; +} \ No newline at end of file