Skip to content

Commit

Permalink
Allow disabling delayed delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Nov 21, 2019
1 parent cb787c6 commit fe23ab5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
5 changes: 5 additions & 0 deletions src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ class SettingsKeys
/// For testing the migration process only
/// </summary>
public const string DisableNativePubSub = "SqlServer.DisableNativePubSub";

/// <summary>
/// For endpoints that only consume messages e.g. ServiceControl error
/// </summary>
public const string DisableDelayedDelivery = "SqlServer.DisableDelayedDelivery";
}
}
7 changes: 5 additions & 2 deletions src/NServiceBus.SqlServer/Receiving/QueueCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide
{
await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
}

await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false);

if (delayedQueueAddress != null)
{
await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
}
transaction.Commit();
}
}
Expand Down
36 changes: 20 additions & 16 deletions src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,27 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
Func<string, TableBasedQueue> queueFactory = queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName);

//Create delayed delivery infrastructure
var delayedDeliverySettings = settings.GetOrDefault<DelayedDeliverySettings>();
settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new
CanonicalQueueAddress delayedQueueCanonicalAddress = null;
if (false == settings.GetOrDefault<bool>(SettingsKeys.DisableDelayedDelivery))
{
Native = true,
delayedDeliverySettings.Suffix,
delayedDeliverySettings.Interval,
BatchSize = delayedDeliverySettings.MatureBatchSize,
TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled"
});

var delayedQueueCanonicalAddress = GetDelayedTableAddress(delayedDeliverySettings);
var inputQueueTable = addressTranslator.Parse(ToTransportAddress(logicalAddress())).QualifiedTableName;
var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable);

//Allows dispatcher to store messages in the delayed store
delayedMessageStore = delayedMessageTable;
dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize);
var delayedDeliverySettings = settings.GetOrDefault<DelayedDeliverySettings>();
settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new
{
Native = true,
delayedDeliverySettings.Suffix,
delayedDeliverySettings.Interval,
BatchSize = delayedDeliverySettings.MatureBatchSize,
TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled"
});

delayedQueueCanonicalAddress = GetDelayedTableAddress(delayedDeliverySettings);
var inputQueueTable = addressTranslator.Parse(ToTransportAddress(logicalAddress())).QualifiedTableName;
var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable);

//Allows dispatcher to store messages in the delayed store
delayedMessageStore = delayedMessageTable;
dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize);
}

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker),
Expand Down

0 comments on commit fe23ab5

Please sign in to comment.