From 2b1bdc55ad30e978d44d6890896cc8e9f0c74a5f Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Mon, 28 Aug 2023 11:45:00 +0800 Subject: [PATCH] Backport #1225 to 7.0 (#1230) * Handle `PurgeOnStartup` and `ExpiredMessagePurger.PurgeOnStartup` correctly (#1225) * Respect PurgeOnStartup core configuration * More descriptive variable name * Add test to verify non-expiring messages are not purged * Bump NSB version * Bump acceptance tests to match nsb version --- ...erviceBus.SqlServer.AcceptanceTests.csproj | 3 +- ...Transport.SqlServer.AcceptanceTests.csproj | 3 +- ...ed_to_purge_expired_messages_at_startup.cs | 144 ++++++++++++++++++ .../Receiving/MessageReceiver.cs | 7 +- .../SqlServerTransportInfrastructure.cs | 2 +- 5 files changed, 154 insertions(+), 5 deletions(-) create mode 100644 src/NServiceBus.Transport.SqlServer.AcceptanceTests/NativeTimeouts/When_configured_to_purge_expired_messages_at_startup.cs diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj index 0eb175735..9806b58f4 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj @@ -13,7 +13,8 @@ - + + diff --git a/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NServiceBus.Transport.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NServiceBus.Transport.SqlServer.AcceptanceTests.csproj index 57cf47412..06fa8a51a 100644 --- a/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NServiceBus.Transport.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NServiceBus.Transport.SqlServer.AcceptanceTests.csproj @@ -12,7 +12,8 @@ - + + diff --git a/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NativeTimeouts/When_configured_to_purge_expired_messages_at_startup.cs b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NativeTimeouts/When_configured_to_purge_expired_messages_at_startup.cs new file mode 100644 index 000000000..4e27b1095 --- /dev/null +++ b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/NativeTimeouts/When_configured_to_purge_expired_messages_at_startup.cs @@ -0,0 +1,144 @@ +namespace NServiceBus.Transport.SqlServer.AcceptanceTests.NativeTimeouts +{ + using System; + using System.Collections.Generic; + using System.Data.Common; + using System.Threading.Tasks; +#if SYSTEMDATASQLCLIENT + using System.Data.SqlClient; +#else + using Microsoft.Data.SqlClient; +#endif + using NServiceBus; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + + class When_configured_to_purge_expired_messages_at_startup : NServiceBusAcceptanceTest + { + [SetUp] + public void SetUpConnectionString() => + connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString") ?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"; + + [Test] + public async Task Should_only_purge_expired_messages() + { + await SetupInputQueue(); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.ConfigureSqlServerTransport().ExpiredMessagesPurger.PurgeOnStartup = true; + }); + }) + .Done(c => QueueIsEmpty()) + .Run(); + + Assert.That(context.MessageWasHandled, Is.True, "Non expired message should have been handled."); + } + + // NOTE: The input queue must exist so that we can send messages to it + async Task SetupInputQueue() + { + var connectionFactory = new SqlConnectionFactory(async token => + { + var connection = new SqlConnection(connectionString); + + await connection.OpenAsync(token); + + return connection; + }); + + var parser = new DbConnectionStringBuilder { ConnectionString = connectionString }; + if (!parser.TryGetValue("Initial Catalog", out var catalogSetting) && !parser.TryGetValue("database", out catalogSetting)) + { + throw new Exception("Database is not configured on connection string"); + } + + var queueAddressTranslator = new QueueAddressTranslator((string)catalogSetting, "dbo", null, null); + var queueCreator = new QueueCreator(connectionFactory, queueAddressTranslator); + + var endpoint = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(TestEndpoint)); + await queueCreator.CreateQueueIfNecessary(new[] { endpoint }, null); + + var tableBasedQueueCache = new TableBasedQueueCache(queueAddressTranslator, true); + var tableBasedQueue = tableBasedQueueCache.Get(endpoint); + + using (var connection = await connectionFactory.OpenNewConnection()) + using (var transaction = connection.BeginTransaction()) + { + await SendMessage(new Message(), TimeSpan.FromSeconds(5)); + transaction.Commit(); + + Task SendMessage(T message, TimeSpan ttbr) + { + var messageId = Guid.NewGuid().ToString(); + + var messageBody = System.Text.Json.JsonSerializer.Serialize(message); + var messageBytes = System.Text.Encoding.UTF8.GetBytes(messageBody); + + var outgoingMessage = new OutgoingMessage(messageId, new Dictionary + { + [Headers.MessageId] = messageId, + [Headers.EnclosedMessageTypes] = typeof(T).ToString(), + [Headers.ContentType] = ContentTypes.Json + }, messageBytes); + + return tableBasedQueue.Send(outgoingMessage, ttbr, connection, transaction); + } + } + } + + bool QueueIsEmpty() + { + var endpoint = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(TestEndpoint)); + using (var connection = new SqlConnection(connectionString)) + { + connection.Open(); + using (var command = new SqlCommand($"SELECT COUNT(*) FROM [dbo].[{endpoint}]", connection)) + { + var numberOfMessagesInQueue = (int)command.ExecuteScalar(); + return numberOfMessagesInQueue == 0; + } + } + } + + class TestEndpoint : EndpointConfigurationBuilder + { + public TestEndpoint() + { + EndpointSetup(config => config.UseSerialization()); + } + + class MessageHandler : IHandleMessages + { + readonly Context testContext; + + public MessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(Message message, IMessageHandlerContext context) + { + testContext.MessageWasHandled = true; + return Task.CompletedTask; + } + } + } + + public class Message : IMessage + { + } + + class Context : ScenarioContext + { + public bool MessageWasHandled { get; set; } + } + + string connectionString; + } +} diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs b/src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs index fd837ac5f..23d995302 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/MessageReceiver.cs @@ -26,7 +26,8 @@ public MessageReceiver( QueuePeekerOptions queuePeekerOptions, SchemaInspector schemaInspector, TimeSpan waitTimeCircuitBreaker, - ISubscriptionManager subscriptionManager) + ISubscriptionManager subscriptionManager, + bool purgeAllMessagesOnStartup) { this.transport = transport; this.processStrategyFactory = processStrategyFactory; @@ -39,6 +40,7 @@ public MessageReceiver( this.waitTimeCircuitBreaker = waitTimeCircuitBreaker; this.errorQueueAddress = errorQueueAddress; this.criticalErrorAction = criticalErrorAction; + this.purgeAllMessagesOnStartup = purgeAllMessagesOnStartup; Subscriptions = subscriptionManager; Id = receiverId; ReceiveAddress = receiveAddress; @@ -60,7 +62,7 @@ public async Task Initialize(PushRuntimeSettings limitations, OnMessage onMessag processStrategy.Init(inputQueue, errorQueue, onMessage, onError, criticalErrorAction); - if (transport.ExpiredMessagesPurger.PurgeOnStartup) + if (purgeAllMessagesOnStartup) { try { @@ -259,6 +261,7 @@ async Task PurgeExpiredMessages(CancellationToken cancellationToken) readonly IPeekMessagesInQueue queuePeeker; readonly QueuePeekerOptions queuePeekerOptions; readonly SchemaInspector schemaInspector; + readonly bool purgeAllMessagesOnStartup; TimeSpan waitTimeCircuitBreaker; volatile SemaphoreSlim concurrencyLimiter; CancellationTokenSource messageReceivingCancellationTokenSource; diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs index 900ab92e8..1dd8befbe 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs @@ -155,7 +155,7 @@ public async Task ConfigureReceiveInfrastructure(ReceiveSettings[] receiveSettin return new MessageReceiver(transport, receiveSetting.Id, receiveAddress, receiveSetting.ErrorQueue, hostSettings.CriticalErrorAction, processStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, - queuePeeker, queuePeekerOptions, schemaVerification, transport.TimeToWaitBeforeTriggeringCircuitBreaker, subscriptionManager); + queuePeeker, queuePeekerOptions, schemaVerification, transport.TimeToWaitBeforeTriggeringCircuitBreaker, subscriptionManager, receiveSetting.PurgeOnStartup); }).ToDictionary(receiver => receiver.Id, receiver => receiver);