diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs index bb13f8c94..eee05cf06 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs @@ -19,10 +19,11 @@ public void When_multi_db_via_configuration_it_fails_to_start() var context = new Context(); Scenario.Define(context) .WithEndpoint(b => b.CustomConfig(c => AddConnectionString("NServiceBus/Transport/OtherEndpoint", OtherDatabaseConnectionString))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] @@ -33,10 +34,11 @@ public void When_multi_db_via_code_it_fails_to_start() .WithEndpoint(b => b.CustomConfig(c => c.UseTransport().UseSpecificConnectionInformation( EndpointConnectionInfo.For("A").UseConnectionString(OtherDatabaseConnectionString) ))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] @@ -47,10 +49,11 @@ public void When_multi_db_via_callback_it_fails_to_start_even_when_not_using_oth .WithEndpoint(b => b.CustomConfig(c => c.UseTransport().UseSpecificConnectionInformation( e => ConnectionInfo.Create().UseSchema("nsb") ))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] diff --git a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs index 0dbec0fe7..7897d5c4f 100644 --- a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs +++ b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs @@ -37,12 +37,20 @@ public RealSimulator(string address, string connectionString) taskStarted = transportNotifications.ReceiveTaskStarted.Subscribe(x => AddMessage("Thread started")); taskEnded = transportNotifications.ReceiveTaskStopped.Subscribe(x => AddMessage("Thread died")); + var purgeExpiredMessagesParams = new PurgeExpiredMessagesParams + { + PurgeTaskDelay = Timeout.InfiniteTimeSpan, + PurgeBatchSize = 1 + }; + dequeueStrategy = new SqlServerPollingDequeueStrategy(localConnectionParams, new ReceiveStrategyFactory(new DummyConnectionStore(), localConnectionParams, Address.Parse("error"), ConnectionFactory.Default()), new QueuePurger(new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), localConnectionParams, ConnectionFactory.Default()), new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), transportNotifications, - new RepeatedFailuresOverTimeCircuitBreaker("A", TimeSpan.FromDays(1000), _ => { })); + new RepeatedFailuresOverTimeCircuitBreaker("A", TimeSpan.FromDays(1000), _ => { }), + ConnectionFactory.Default(), + purgeExpiredMessagesParams); dequeueStrategy.Init(Address.Parse(address), new TransactionSettings(true, TimeSpan.FromMinutes(2), System.Transactions.IsolationLevel.ReadCommitted, 1, false, false), ProcessMessage, (message, exception) => { }); diff --git a/src/NServiceBus.SqlServer/Config/PurgingConfig.cs b/src/NServiceBus.SqlServer/Config/PurgingConfig.cs index 7e04ffa6b..db3177a48 100644 --- a/src/NServiceBus.SqlServer/Config/PurgingConfig.cs +++ b/src/NServiceBus.SqlServer/Config/PurgingConfig.cs @@ -1,9 +1,20 @@ namespace NServiceBus.Transports.SQLServer.Config { + using System; using NServiceBus.Features; + using NServiceBus.Settings; class PurgingConfig : ConfigBase { + const string PurgeTaskDelayKey = "SqlServer.PurgeTaskDelay"; + const string PurgeBatchSizeKey = "SqlServer.PurgeBatchSize"; + + public override void SetUpDefaults(SettingsHolder settings) + { + settings.SetDefault(PurgeTaskDelayKey, TimeSpan.FromMinutes(5)); + settings.SetDefault(PurgeBatchSizeKey, 10000); + } + public override void Configure(FeatureConfigurationContext context, string connectionStringWithSchema) { bool purgeOnStartup; @@ -15,6 +26,13 @@ public override void Configure(FeatureConfigurationContext context, string conne { context.Container.ConfigureComponent(DependencyLifecycle.SingleInstance); } + + var purgeParams = new PurgeExpiredMessagesParams + { + PurgeTaskDelay = context.Settings.Get(PurgeTaskDelayKey), + PurgeBatchSize = context.Settings.Get(PurgeBatchSizeKey) + }; + context.Container.ConfigureComponent(() => purgeParams, DependencyLifecycle.SingleInstance); } } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs new file mode 100644 index 000000000..7ff8194a0 --- /dev/null +++ b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs @@ -0,0 +1,108 @@ +namespace NServiceBus.Transports.SQLServer +{ + using System; + using System.Data.SqlClient; + using System.Threading; + using NServiceBus.Logging; + + class ExpiredMessagesPurger : IExecutor + { + static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); + + readonly TableBasedQueue queue; + readonly Func openConnection; + readonly PurgeExpiredMessagesParams parameters; + + Timer purgeTaskTimer; + CancellationToken token; + + public ExpiredMessagesPurger(TableBasedQueue queue, Func openConnection, PurgeExpiredMessagesParams parameters) + { + this.queue = queue; + this.openConnection = openConnection; + this.parameters = parameters; + } + + public void Start(int maximumConcurrency, CancellationToken token) + { + LogWarningWhenIndexIsMissing(); + + this.token = token; + purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan); + } + + public void Stop() + { + using (var waitHandle = new ManualResetEvent(false)) + { + purgeTaskTimer.Dispose(waitHandle); + waitHandle.WaitOne(); + } + } + + void LogWarningWhenIndexIsMissing() + { + try + { + using (var connection = openConnection()) + { + queue.LogWarningWhenIndexIsMissing(connection); + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); + } + } + + void PurgeExpiredMessagesCallback(object state) + { + Logger.DebugFormat("Starting a new expired message purge task for table {0}.", queue); + + if (token.IsCancellationRequested) + { + return; + } + + PurgeExpiredMessages(); + + if (token.IsCancellationRequested) + { + return; + } + + Logger.DebugFormat("Scheduling next expired message purge task for table {0} in {1}", queue, parameters.PurgeTaskDelay); + purgeTaskTimer.Change(parameters.PurgeTaskDelay, Timeout.InfiniteTimeSpan); + } + + void PurgeExpiredMessages() + { + int totalPurgedRowsCount = 0; + + try + { + using (var connection = openConnection()) + { + var continuePurging = true; + + while (continuePurging && !token.IsCancellationRequested) + { + var purgedRowsCount = queue.PurgeBatchOfExpiredMessages(connection, parameters.PurgeBatchSize); + + totalPurgedRowsCount += purgedRowsCount; + continuePurging = (purgedRowsCount == parameters.PurgeBatchSize); + } + } + + if (totalPurgedRowsCount > 0) + { + Logger.InfoFormat("{0} expired messages were successfully purged from table {1}", totalPurgedRowsCount, queue); + } + } + catch (Exception ex) + { + Logger.WarnFormat("Purging expired messages from table {0} failed after purging {1} messages. Exception: {2}", queue, totalPurgedRowsCount, ex); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index cbca3c838..80c80daba 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -70,6 +70,7 @@ + @@ -104,6 +105,7 @@ + @@ -145,4 +147,4 @@ - + \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs b/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs new file mode 100644 index 000000000..501e52641 --- /dev/null +++ b/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs @@ -0,0 +1,10 @@ +namespace NServiceBus.Transports.SQLServer +{ + using System; + + class PurgeExpiredMessagesParams + { + public TimeSpan PurgeTaskDelay { get; set; } + public int PurgeBatchSize { get; set; } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 25260b8d5..2ceb18a64 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -18,7 +18,9 @@ public SqlServerPollingDequeueStrategy( IQueuePurger queuePurger, SecondaryReceiveConfiguration secondaryReceiveConfiguration, TransportNotifications transportNotifications, - RepeatedFailuresOverTimeCircuitBreaker circuitBreaker) + RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, + ConnectionFactory connectionFactory, + PurgeExpiredMessagesParams purgeExpiredMessagesParams) { this.locaConnectionParams = locaConnectionParams; this.receiveStrategyFactory = receiveStrategyFactory; @@ -26,6 +28,8 @@ public SqlServerPollingDequeueStrategy( this.secondaryReceiveConfiguration = secondaryReceiveConfiguration; this.transportNotifications = transportNotifications; this.circuitBreaker = circuitBreaker; + this.connectionFactory = connectionFactory; + this.purgeExpiredMessagesParams = purgeExpiredMessagesParams; } /// @@ -48,7 +52,8 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings secondaryReceiveSettings = secondaryReceiveConfiguration.GetSettings(primaryAddress.Queue); var receiveStrategy = receiveStrategyFactory.Create(transactionSettings, tryProcessMessage); - primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, new TableBasedQueue(primaryAddress, locaConnectionParams.Schema), endProcessMessage, circuitBreaker, transportNotifications); + var primaryQueue = new TableBasedQueue(primaryAddress, locaConnectionParams.Schema); + primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, primaryQueue, endProcessMessage, circuitBreaker, transportNotifications); if (secondaryReceiveSettings.IsEnabled) { @@ -59,6 +64,8 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings { secondaryReceiver = new NullExecutor(); } + + expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString), purgeExpiredMessagesParams); } /// @@ -73,6 +80,7 @@ public void Start(int maximumConcurrencyLevel) primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token); secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token); + expiredMessagesPurger.Start(maximumConcurrencyLevel, tokenSource.Token); } /// @@ -89,6 +97,7 @@ public void Stop() primaryReceiver.Stop(); secondaryReceiver.Stop(); + expiredMessagesPurger.Stop(); tokenSource.Dispose(); } @@ -112,10 +121,13 @@ SecondaryReceiveSettings SecondaryReceiveSettings IExecutor primaryReceiver; IExecutor secondaryReceiver; + IExecutor expiredMessagesPurger; RepeatedFailuresOverTimeCircuitBreaker circuitBreaker; readonly LocalConnectionParams locaConnectionParams; readonly ReceiveStrategyFactory receiveStrategyFactory; readonly IQueuePurger queuePurger; + readonly ConnectionFactory connectionFactory; + readonly PurgeExpiredMessagesParams purgeExpiredMessagesParams; readonly SecondaryReceiveConfiguration secondaryReceiveConfiguration; [SkipWeaving] //Do not dispose with dequeue strategy diff --git a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs index 150c37575..13612dba5 100644 --- a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs +++ b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs @@ -24,6 +24,17 @@ CREATE CLUSTERED INDEX [Index_RowVersion] ON [{0}].[{1}] [RowVersion] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] + CREATE NONCLUSTERED INDEX [Index_Expires] ON [{0}].[{1}] + ( + [Expires] ASC + ) + INCLUDE + ( + [Id], + [RowVersion] + ) + WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) + END"; public void CreateQueueIfNecessary(Address address, string account) diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 0d7be72a0..620607c24 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -78,7 +78,7 @@ static object[] ExtractTransportMessageData(TransportMessage message, SendOption } else { - data[TimeToBeReceivedColumn] = DateTime.UtcNow.Add(message.TimeToBeReceived); + data[TimeToBeReceivedColumn] = message.TimeToBeReceived.TotalMilliseconds; } data[HeadersColumn] = HeaderSerializer.SerializeObject(message.Headers); if (message.Body == null) @@ -129,14 +129,14 @@ MessageReadResult ExecuteReader(SqlCommand command) { var id = rowData[0].ToString(); - DateTime? expireDateTime = null; + int? millisecondsToExpiry = null; if (rowData[TimeToBeReceivedColumn] != DBNull.Value) { - expireDateTime = (DateTime)rowData[TimeToBeReceivedColumn]; + millisecondsToExpiry = (int)rowData[TimeToBeReceivedColumn]; } //Has message expired? - if (expireDateTime.HasValue && expireDateTime.Value < DateTime.UtcNow) + if (millisecondsToExpiry.HasValue && millisecondsToExpiry.Value < 0L) { Logger.InfoFormat("Message with ID={0} has expired. Removing it from queue.", id); return MessageReadResult.NoMessage; @@ -161,9 +161,9 @@ MessageReadResult ExecuteReader(SqlCommand command) message.Headers[Headers.ReplyToAddress] = replyToAddress; } - if (expireDateTime.HasValue) + if (millisecondsToExpiry.HasValue) { - message.TimeToBeReceived = TimeSpan.FromTicks(expireDateTime.Value.Ticks - DateTime.UtcNow.Ticks); + message.TimeToBeReceived = TimeSpan.FromMilliseconds(millisecondsToExpiry.Value); } return MessageReadResult.Success(message); @@ -189,6 +189,32 @@ static T GetNullableValue(object value) return (T)value; } + public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchSize) + { + var commandText = string.Format(SqlPurgeBatchOfExpiredMessages, purgeBatchSize, this.schema, this.tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + return command.ExecuteNonQuery(); + } + } + + public void LogWarningWhenIndexIsMissing(SqlConnection connection) + { + var commandText = string.Format(SqlCheckIfExpiresIndexIsPresent, ExpiresIndexName, this.schema, this.tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + var rowsCount = (int) command.ExecuteScalar(); + + if (rowsCount == 0) + { + Logger.Warn($@"Table [{schema}].[{tableName}] does not contain index '{ExpiresIndexName}'. +Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + } + } + } + public override string ToString() { return tableName; @@ -200,7 +226,7 @@ public override string ToString() readonly string schema; static readonly JsonMessageSerializer HeaderSerializer = new JsonMessageSerializer(null); - static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "Expires", "Headers", "Body" }; + static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "TimeToBeReceivedMs", "Headers", "Body" }; static readonly SqlDbType[] ParameterTypes = { @@ -208,22 +234,28 @@ public override string ToString() SqlDbType.VarChar, SqlDbType.VarChar, SqlDbType.Bit, - SqlDbType.DateTime, + SqlDbType.Int, SqlDbType.VarChar, SqlDbType.VarBinary }; - - const string SqlSend = @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) - VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)"; + VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,IIF(@TimeToBeReceivedMs IS NOT NULL, DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()), NULL),@Headers,@Body)"; const string SqlReceive = @"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, - deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;"; + deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;"; + + const string SqlPurgeBatchOfExpiredMessages = + @"DELETE FROM [{1}].[{2}] WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])"; + + const string SqlCheckIfExpiresIndexIsPresent = + @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; + + const string ExpiresIndexName = "Index_Expires"; const int IdColumn = 0; const int CorrelationIdColumn = 1;