From fe23ab5170aa5da24ae17615aaf4379071cb63c5 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Thu, 21 Nov 2019 12:38:25 +0100 Subject: [PATCH] Allow disabling delayed delivery --- .../Configuration/SettingsKeys.cs | 5 +++ .../Receiving/QueueCreator.cs | 7 ++-- .../SqlServerTransportInfrastructure.cs | 36 ++++++++++--------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs index d1db73726..7d69ca59f 100644 --- a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs +++ b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs @@ -23,5 +23,10 @@ class SettingsKeys /// For testing the migration process only /// public const string DisableNativePubSub = "SqlServer.DisableNativePubSub"; + + /// + /// For endpoints that only consume messages e.g. ServiceControl error + /// + public const string DisableDelayedDelivery = "SqlServer.DisableDelayedDelivery"; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs index 372fa62df..35d290e94 100644 --- a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs +++ b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs @@ -31,8 +31,11 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide { await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); } - - await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false); + + if (delayedQueueAddress != null) + { + await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false); + } transaction.Commit(); } } diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index bde762d9c..08e2f441c 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -145,23 +145,27 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() Func queueFactory = queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName); //Create delayed delivery infrastructure - var delayedDeliverySettings = settings.GetOrDefault(); - settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new + CanonicalQueueAddress delayedQueueCanonicalAddress = null; + if (false == settings.GetOrDefault(SettingsKeys.DisableDelayedDelivery)) { - Native = true, - delayedDeliverySettings.Suffix, - delayedDeliverySettings.Interval, - BatchSize = delayedDeliverySettings.MatureBatchSize, - TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled" - }); - - var delayedQueueCanonicalAddress = GetDelayedTableAddress(delayedDeliverySettings); - var inputQueueTable = addressTranslator.Parse(ToTransportAddress(logicalAddress())).QualifiedTableName; - var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable); - - //Allows dispatcher to store messages in the delayed store - delayedMessageStore = delayedMessageTable; - dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize); + var delayedDeliverySettings = settings.GetOrDefault(); + settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new + { + Native = true, + delayedDeliverySettings.Suffix, + delayedDeliverySettings.Interval, + BatchSize = delayedDeliverySettings.MatureBatchSize, + TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled" + }); + + delayedQueueCanonicalAddress = GetDelayedTableAddress(delayedDeliverySettings); + var inputQueueTable = addressTranslator.Parse(ToTransportAddress(logicalAddress())).QualifiedTableName; + var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable); + + //Allows dispatcher to store messages in the delayed store + delayedMessageStore = delayedMessageTable; + dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize); + } return new TransportReceiveInfrastructure( () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker),