From 8e1afc67f6cf7cb99721918e61fefa54b7c97218 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 21 Sep 2015 13:11:47 +0200 Subject: [PATCH 01/12] Fix for memory leak (issue #90) that seems to occur with .net framework prior to v4.6 caused by using multiple `ContinueWith` continuations and a `CancellationToken`. Issue details here: https://stackoverflow.com/questions/30029462/net-tpl-cancellationtoken-memory-leak --- src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 45edc4b9e..1f68d3955 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -44,18 +44,18 @@ void StartTask() var taskId = Guid.NewGuid(); var receiveTask = Task.Factory .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) - .ContinueWith(t => + .ContinueWith((t, s) => { - t.Exception.Handle(ex => + if (t.IsFaulted) { - HandleException(ex); - circuitBreaker.Failure(ex); - return true; - }); + t.Exception.Handle(ex => + { + HandleException(ex); + circuitBreaker.Failure(ex); + return true; + }); + } - }, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default) - .ContinueWith((_, s) => - { taskTracker.Forget((Guid)s); if (!taskTracker.ShouldStartAnotherTaskImmediately) From 464a56bc8e9c4e4336a6cf2fe92ffba0d34d074a Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 22 Sep 2015 14:03:39 +0200 Subject: [PATCH 02/12] Updated Fody package v1.24.0 => 1.29.3 as this is required by the build server. --- src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj | 8 ++++---- src/NServiceBus.SqlServer/packages.config | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index 1029f20cd..f0d713873 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -15,7 +15,7 @@ $(SolutionDir)NServiceBus.snk ..\ - 3ead7d6a + 6113ccb8 true @@ -130,15 +130,15 @@ - + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. - + - \ No newline at end of file + diff --git a/src/NServiceBus.SqlServer/packages.config b/src/NServiceBus.SqlServer/packages.config index acae12b57..8da7c6068 100644 --- a/src/NServiceBus.SqlServer/packages.config +++ b/src/NServiceBus.SqlServer/packages.config @@ -1,6 +1,6 @@  - + From 1650a79bba8630882ab7379fcac268f8a713926b Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Thu, 26 Nov 2015 10:49:09 +0100 Subject: [PATCH 03/12] Fixed deadlock condition during shutdown after circuit breaker failure --- .../When_scaling_out_senders_that_uses_callbacks.cs | 10 +++++----- src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 4 ++-- src/nuget.config | 9 ++++----- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs index 124c26160..3c607d512 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs @@ -9,7 +9,7 @@ public class When_scaling_out_senders_that_uses_callbacks { - const int numMessagesToSend = 1; + const int numMessagesToSend = 5; [Test] public void Should_only_deliver_response_to_one_of_the_instances() @@ -29,9 +29,9 @@ public void Should_only_deliver_response_to_one_of_the_instances() { ReturnCode = 1 }) - .Register(m => + .Register(r => { - if (m.ErrorCode != 1) + if (r != 1) { throw new Exception("Wrong server got the response"); } @@ -51,9 +51,9 @@ public void Should_only_deliver_response_to_one_of_the_instances() { ReturnCode = 2 }) - .Register(m => + .Register(r => { - if (m.ErrorCode != 2) + if (r != 2) { throw new Exception("Wrong server got the response"); } diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 9f667e392..cdb3ca7e1 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -69,6 +69,8 @@ void StartTask() .StartNew(ReceiveLoop, state, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) .ContinueWith((t, s) => { + taskTracker.Forget((Guid)s); + if (t.IsFaulted) { t.Exception.Handle(ex => @@ -79,8 +81,6 @@ void StartTask() }); } - taskTracker.Forget((Guid)s); - if (!taskTracker.ShouldStartAnotherTaskImmediately) { return; diff --git a/src/nuget.config b/src/nuget.config index 9f16f776e..2f07197db 100644 --- a/src/nuget.config +++ b/src/nuget.config @@ -5,11 +5,6 @@ - - - - - @@ -17,4 +12,8 @@ + + + + \ No newline at end of file From 03786f2ec7322a104d735cfe47ae8b98e9042fe2 Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Fri, 19 Feb 2016 17:19:39 +0100 Subject: [PATCH 04/12] Periodically purge expired messages from the queue --- .../When_in_native_transaction_mode.cs | 9 +- .../RealSimulator.cs | 3 +- .../ExpiredMessagesPurger.cs | 120 ++++++++++++++++++ .../NServiceBus.SqlServer.csproj | 3 +- .../SqlServerPollingDequeueStrategy.cs | 13 +- .../SqlServerQueueCreator.cs | 5 + src/NServiceBus.SqlServer/TableBasedQueue.cs | 40 +++++- 7 files changed, 184 insertions(+), 9 deletions(-) create mode 100644 src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs index bb13f8c94..eee05cf06 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs @@ -19,10 +19,11 @@ public void When_multi_db_via_configuration_it_fails_to_start() var context = new Context(); Scenario.Define(context) .WithEndpoint(b => b.CustomConfig(c => AddConnectionString("NServiceBus/Transport/OtherEndpoint", OtherDatabaseConnectionString))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] @@ -33,10 +34,11 @@ public void When_multi_db_via_code_it_fails_to_start() .WithEndpoint(b => b.CustomConfig(c => c.UseTransport().UseSpecificConnectionInformation( EndpointConnectionInfo.For("A").UseConnectionString(OtherDatabaseConnectionString) ))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] @@ -47,10 +49,11 @@ public void When_multi_db_via_callback_it_fails_to_start_even_when_not_using_oth .WithEndpoint(b => b.CustomConfig(c => c.UseTransport().UseSpecificConnectionInformation( e => ConnectionInfo.Create().UseSchema("nsb") ))) + .AllowExceptions() .Done(c => true) .Run(); - Assert.IsTrue(context.Exceptions.Contains(ExceptionText)); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] diff --git a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs index 0dbec0fe7..a8f04e377 100644 --- a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs +++ b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs @@ -42,7 +42,8 @@ public RealSimulator(string address, string connectionString) new QueuePurger(new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), localConnectionParams, ConnectionFactory.Default()), new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), transportNotifications, - new RepeatedFailuresOverTimeCircuitBreaker("A", TimeSpan.FromDays(1000), _ => { })); + new RepeatedFailuresOverTimeCircuitBreaker("A", TimeSpan.FromDays(1000), _ => { }), + ConnectionFactory.Default()); dequeueStrategy.Init(Address.Parse(address), new TransactionSettings(true, TimeSpan.FromMinutes(2), System.Transactions.IsolationLevel.ReadCommitted, 1, false, false), ProcessMessage, (message, exception) => { }); diff --git a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs new file mode 100644 index 000000000..a5bb09538 --- /dev/null +++ b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs @@ -0,0 +1,120 @@ +namespace NServiceBus.Transports.SQLServer +{ + using System; + using System.Data.SqlClient; + using System.Threading; + using NServiceBus.Logging; + + class ExpiredMessagesPurger : IExecutor + { + static readonly TimeSpan purgeTaskDelay = TimeSpan.FromMinutes(5); + + static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); + + readonly TableBasedQueue queue; + readonly Func openConnection; + + Timer purgeTaskTimer; + CancellationToken token; + + object lockObject = new object(); + + public ExpiredMessagesPurger(TableBasedQueue queue, Func openConnection) + { + this.queue = queue; + this.openConnection = openConnection; + } + + public void Start(int maximumConcurrency, CancellationToken token) + { + LogWarningWhenIndexIsMissing(); + + this.token = token; + purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, purgeTaskDelay); + } + + public void Stop() + { + using (var waitHandle = new ManualResetEvent(false)) + { + purgeTaskTimer.Dispose(waitHandle); + waitHandle.WaitOne(); + } + } + + void LogWarningWhenIndexIsMissing() + { + try + { + using (var connection = openConnection()) + { + queue.LogWarningWhenIndexIsMissing(connection); + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); + } + } + + void PurgeExpiredMessagesCallback(object state) + { + Logger.DebugFormat("Starting a new expired message purge task for table {0}.", queue); + + if (token.IsCancellationRequested) + { + return; + } + + var lockTaken = false; + try + { + Monitor.TryEnter(lockObject, ref lockTaken); + if (!lockTaken) + { + Logger.DebugFormat("An expired message purge task for table {0} is already running. Nothing to do.", queue); + return; + } + + PurgeExpiredMessages(); + } + finally + { + if (lockTaken) + { + Monitor.Exit(lockObject); + } + } + } + + void PurgeExpiredMessages() + { + int totalPurgedRowsCount = 0; + + try + { + using (var connection = openConnection()) + { + var continuePurging = true; + + while (continuePurging && !token.IsCancellationRequested) + { + var purgedRowsCount = queue.PurgeBatchOfExpiredMessages(connection); + + totalPurgedRowsCount += purgedRowsCount; + continuePurging = (purgedRowsCount == TableBasedQueue.PurgeBatchSize); + } + } + + if (totalPurgedRowsCount > 0) + { + Logger.InfoFormat("{0} expired messages were successfully purged from table {1}", totalPurgedRowsCount, queue); + } + } + catch (Exception ex) + { + Logger.WarnFormat("Purging expired messages from table {0} failed after purging {1} messages. Exception: {2}", queue, totalPurgedRowsCount, ex); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index cbca3c838..ad61018cf 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -70,6 +70,7 @@ + @@ -145,4 +146,4 @@ - + \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 25260b8d5..4fe2f2ea2 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -18,7 +18,8 @@ public SqlServerPollingDequeueStrategy( IQueuePurger queuePurger, SecondaryReceiveConfiguration secondaryReceiveConfiguration, TransportNotifications transportNotifications, - RepeatedFailuresOverTimeCircuitBreaker circuitBreaker) + RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, + ConnectionFactory connectionFactory) { this.locaConnectionParams = locaConnectionParams; this.receiveStrategyFactory = receiveStrategyFactory; @@ -26,6 +27,7 @@ public SqlServerPollingDequeueStrategy( this.secondaryReceiveConfiguration = secondaryReceiveConfiguration; this.transportNotifications = transportNotifications; this.circuitBreaker = circuitBreaker; + this.connectionFactory = connectionFactory; } /// @@ -48,7 +50,8 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings secondaryReceiveSettings = secondaryReceiveConfiguration.GetSettings(primaryAddress.Queue); var receiveStrategy = receiveStrategyFactory.Create(transactionSettings, tryProcessMessage); - primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, new TableBasedQueue(primaryAddress, locaConnectionParams.Schema), endProcessMessage, circuitBreaker, transportNotifications); + var primaryQueue = new TableBasedQueue(primaryAddress, locaConnectionParams.Schema); + primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, primaryQueue, endProcessMessage, circuitBreaker, transportNotifications); if (secondaryReceiveSettings.IsEnabled) { @@ -59,6 +62,8 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings { secondaryReceiver = new NullExecutor(); } + + expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString)); } /// @@ -73,6 +78,7 @@ public void Start(int maximumConcurrencyLevel) primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token); secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token); + expiredMessagesPurger.Start(maximumConcurrencyLevel, tokenSource.Token); } /// @@ -89,6 +95,7 @@ public void Stop() primaryReceiver.Stop(); secondaryReceiver.Stop(); + expiredMessagesPurger.Stop(); tokenSource.Dispose(); } @@ -112,10 +119,12 @@ SecondaryReceiveSettings SecondaryReceiveSettings IExecutor primaryReceiver; IExecutor secondaryReceiver; + IExecutor expiredMessagesPurger; RepeatedFailuresOverTimeCircuitBreaker circuitBreaker; readonly LocalConnectionParams locaConnectionParams; readonly ReceiveStrategyFactory receiveStrategyFactory; readonly IQueuePurger queuePurger; + readonly ConnectionFactory connectionFactory; readonly SecondaryReceiveConfiguration secondaryReceiveConfiguration; [SkipWeaving] //Do not dispose with dequeue strategy diff --git a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs index 150c37575..b3e64024f 100644 --- a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs +++ b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs @@ -24,6 +24,11 @@ CREATE CLUSTERED INDEX [Index_RowVersion] ON [{0}].[{1}] [RowVersion] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] + CREATE NONCLUSTERED INDEX [Index_Expires] ON [{0}].[{1}] + ( + [Expires] ASC + )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) + END"; public void CreateQueueIfNecessary(Address address, string account) diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 0d7be72a0..8b1e0211f 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -189,6 +189,34 @@ static T GetNullableValue(object value) return (T)value; } + public int PurgeBatchOfExpiredMessages(SqlConnection connection) + { + var commandText = string.Format(SqlPurgeBatchOfExpiredMessages, PurgeBatchSize, this.schema, this.tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + command.Parameters.Add("UTCNow", SqlDbType.DateTime).Value = DateTime.UtcNow; + + return command.ExecuteNonQuery(); + } + } + + public void LogWarningWhenIndexIsMissing(SqlConnection connection) + { + var commandText = string.Format(SqlCheckIfExpiresIndexIsPresent, ExpiresIndexName, this.schema, this.tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + var rowsCount = (int) command.ExecuteScalar(); + + if (rowsCount == 0) + { + Logger.Warn($@"Table [{schema}].[{tableName}] does not contain index '{ExpiresIndexName}'. +Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + } + } + } + public override string ToString() { return tableName; @@ -213,8 +241,6 @@ public override string ToString() SqlDbType.VarBinary }; - - const string SqlSend = @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)"; @@ -225,6 +251,16 @@ DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;"; + const string SqlPurgeBatchOfExpiredMessages = + @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < @UTCNow"; + + const string SqlCheckIfExpiresIndexIsPresent = + @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; + + public const int PurgeBatchSize = 10000; + + const string ExpiresIndexName = "Index_Expires"; + const int IdColumn = 0; const int CorrelationIdColumn = 1; const int ReplyToAddressColumn = 2; From b511a6634b370102824ca6649e2ae7f5365e4bd2 Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Thu, 25 Feb 2016 15:53:48 +0100 Subject: [PATCH 05/12] Make purge task delay and purge batch size configurable via settings Refactor scheduling of the purge task --- .../RealSimulator.cs | 9 ++++- .../Config/PurgingConfig.cs | 18 +++++++++ .../ExpiredMessagesPurger.cs | 38 +++++++------------ .../NServiceBus.SqlServer.csproj | 1 + .../PurgeExpiredMessagesParams.cs | 10 +++++ .../SqlServerPollingDequeueStrategy.cs | 7 +++- src/NServiceBus.SqlServer/TableBasedQueue.cs | 6 +-- 7 files changed, 57 insertions(+), 32 deletions(-) create mode 100644 src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs diff --git a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs index a8f04e377..7897d5c4f 100644 --- a/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs +++ b/src/NServiceBus.SqlServer.UnitTests/AdaptiveExecutorSimulator/RealSimulator.cs @@ -37,13 +37,20 @@ public RealSimulator(string address, string connectionString) taskStarted = transportNotifications.ReceiveTaskStarted.Subscribe(x => AddMessage("Thread started")); taskEnded = transportNotifications.ReceiveTaskStopped.Subscribe(x => AddMessage("Thread died")); + var purgeExpiredMessagesParams = new PurgeExpiredMessagesParams + { + PurgeTaskDelay = Timeout.InfiniteTimeSpan, + PurgeBatchSize = 1 + }; + dequeueStrategy = new SqlServerPollingDequeueStrategy(localConnectionParams, new ReceiveStrategyFactory(new DummyConnectionStore(), localConnectionParams, Address.Parse("error"), ConnectionFactory.Default()), new QueuePurger(new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), localConnectionParams, ConnectionFactory.Default()), new SecondaryReceiveConfiguration(_ => SecondaryReceiveSettings.Disabled()), transportNotifications, new RepeatedFailuresOverTimeCircuitBreaker("A", TimeSpan.FromDays(1000), _ => { }), - ConnectionFactory.Default()); + ConnectionFactory.Default(), + purgeExpiredMessagesParams); dequeueStrategy.Init(Address.Parse(address), new TransactionSettings(true, TimeSpan.FromMinutes(2), System.Transactions.IsolationLevel.ReadCommitted, 1, false, false), ProcessMessage, (message, exception) => { }); diff --git a/src/NServiceBus.SqlServer/Config/PurgingConfig.cs b/src/NServiceBus.SqlServer/Config/PurgingConfig.cs index 7e04ffa6b..db3177a48 100644 --- a/src/NServiceBus.SqlServer/Config/PurgingConfig.cs +++ b/src/NServiceBus.SqlServer/Config/PurgingConfig.cs @@ -1,9 +1,20 @@ namespace NServiceBus.Transports.SQLServer.Config { + using System; using NServiceBus.Features; + using NServiceBus.Settings; class PurgingConfig : ConfigBase { + const string PurgeTaskDelayKey = "SqlServer.PurgeTaskDelay"; + const string PurgeBatchSizeKey = "SqlServer.PurgeBatchSize"; + + public override void SetUpDefaults(SettingsHolder settings) + { + settings.SetDefault(PurgeTaskDelayKey, TimeSpan.FromMinutes(5)); + settings.SetDefault(PurgeBatchSizeKey, 10000); + } + public override void Configure(FeatureConfigurationContext context, string connectionStringWithSchema) { bool purgeOnStartup; @@ -15,6 +26,13 @@ public override void Configure(FeatureConfigurationContext context, string conne { context.Container.ConfigureComponent(DependencyLifecycle.SingleInstance); } + + var purgeParams = new PurgeExpiredMessagesParams + { + PurgeTaskDelay = context.Settings.Get(PurgeTaskDelayKey), + PurgeBatchSize = context.Settings.Get(PurgeBatchSizeKey) + }; + context.Container.ConfigureComponent(() => purgeParams, DependencyLifecycle.SingleInstance); } } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs index a5bb09538..7ff8194a0 100644 --- a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs +++ b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs @@ -7,22 +7,20 @@ class ExpiredMessagesPurger : IExecutor { - static readonly TimeSpan purgeTaskDelay = TimeSpan.FromMinutes(5); - - static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); + static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); readonly TableBasedQueue queue; readonly Func openConnection; + readonly PurgeExpiredMessagesParams parameters; Timer purgeTaskTimer; CancellationToken token; - object lockObject = new object(); - - public ExpiredMessagesPurger(TableBasedQueue queue, Func openConnection) + public ExpiredMessagesPurger(TableBasedQueue queue, Func openConnection, PurgeExpiredMessagesParams parameters) { this.queue = queue; this.openConnection = openConnection; + this.parameters = parameters; } public void Start(int maximumConcurrency, CancellationToken token) @@ -30,7 +28,7 @@ public void Start(int maximumConcurrency, CancellationToken token) LogWarningWhenIndexIsMissing(); this.token = token; - purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, purgeTaskDelay); + purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan); } public void Stop() @@ -66,25 +64,15 @@ void PurgeExpiredMessagesCallback(object state) return; } - var lockTaken = false; - try - { - Monitor.TryEnter(lockObject, ref lockTaken); - if (!lockTaken) - { - Logger.DebugFormat("An expired message purge task for table {0} is already running. Nothing to do.", queue); - return; - } + PurgeExpiredMessages(); - PurgeExpiredMessages(); - } - finally + if (token.IsCancellationRequested) { - if (lockTaken) - { - Monitor.Exit(lockObject); - } + return; } + + Logger.DebugFormat("Scheduling next expired message purge task for table {0} in {1}", queue, parameters.PurgeTaskDelay); + purgeTaskTimer.Change(parameters.PurgeTaskDelay, Timeout.InfiniteTimeSpan); } void PurgeExpiredMessages() @@ -99,10 +87,10 @@ void PurgeExpiredMessages() while (continuePurging && !token.IsCancellationRequested) { - var purgedRowsCount = queue.PurgeBatchOfExpiredMessages(connection); + var purgedRowsCount = queue.PurgeBatchOfExpiredMessages(connection, parameters.PurgeBatchSize); totalPurgedRowsCount += purgedRowsCount; - continuePurging = (purgedRowsCount == TableBasedQueue.PurgeBatchSize); + continuePurging = (purgedRowsCount == parameters.PurgeBatchSize); } } diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index ad61018cf..80c80daba 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -105,6 +105,7 @@ + diff --git a/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs b/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs new file mode 100644 index 000000000..501e52641 --- /dev/null +++ b/src/NServiceBus.SqlServer/PurgeExpiredMessagesParams.cs @@ -0,0 +1,10 @@ +namespace NServiceBus.Transports.SQLServer +{ + using System; + + class PurgeExpiredMessagesParams + { + public TimeSpan PurgeTaskDelay { get; set; } + public int PurgeBatchSize { get; set; } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 4fe2f2ea2..2ceb18a64 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -19,7 +19,8 @@ public SqlServerPollingDequeueStrategy( SecondaryReceiveConfiguration secondaryReceiveConfiguration, TransportNotifications transportNotifications, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, - ConnectionFactory connectionFactory) + ConnectionFactory connectionFactory, + PurgeExpiredMessagesParams purgeExpiredMessagesParams) { this.locaConnectionParams = locaConnectionParams; this.receiveStrategyFactory = receiveStrategyFactory; @@ -28,6 +29,7 @@ public SqlServerPollingDequeueStrategy( this.transportNotifications = transportNotifications; this.circuitBreaker = circuitBreaker; this.connectionFactory = connectionFactory; + this.purgeExpiredMessagesParams = purgeExpiredMessagesParams; } /// @@ -63,7 +65,7 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings secondaryReceiver = new NullExecutor(); } - expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString)); + expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString), purgeExpiredMessagesParams); } /// @@ -125,6 +127,7 @@ SecondaryReceiveSettings SecondaryReceiveSettings readonly ReceiveStrategyFactory receiveStrategyFactory; readonly IQueuePurger queuePurger; readonly ConnectionFactory connectionFactory; + readonly PurgeExpiredMessagesParams purgeExpiredMessagesParams; readonly SecondaryReceiveConfiguration secondaryReceiveConfiguration; [SkipWeaving] //Do not dispose with dequeue strategy diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 8b1e0211f..409ee82ca 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -189,9 +189,9 @@ static T GetNullableValue(object value) return (T)value; } - public int PurgeBatchOfExpiredMessages(SqlConnection connection) + public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchSize) { - var commandText = string.Format(SqlPurgeBatchOfExpiredMessages, PurgeBatchSize, this.schema, this.tableName); + var commandText = string.Format(SqlPurgeBatchOfExpiredMessages, purgeBatchSize, this.schema, this.tableName); using (var command = new SqlCommand(commandText, connection)) { @@ -257,8 +257,6 @@ DELETE FROM message const string SqlCheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; - public const int PurgeBatchSize = 10000; - const string ExpiresIndexName = "Index_Expires"; const int IdColumn = 0; From 8de507139f5df1a19ace995fd717274b9a2d59c9 Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Fri, 26 Feb 2016 20:00:32 +0100 Subject: [PATCH 06/12] TTBR calculations are now based on broker (database) time --- src/NServiceBus.SqlServer/TableBasedQueue.cs | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 409ee82ca..770067f62 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -78,7 +78,7 @@ static object[] ExtractTransportMessageData(TransportMessage message, SendOption } else { - data[TimeToBeReceivedColumn] = DateTime.UtcNow.Add(message.TimeToBeReceived); + data[TimeToBeReceivedColumn] = message.TimeToBeReceived.TotalMilliseconds; } data[HeadersColumn] = HeaderSerializer.SerializeObject(message.Headers); if (message.Body == null) @@ -129,14 +129,14 @@ MessageReadResult ExecuteReader(SqlCommand command) { var id = rowData[0].ToString(); - DateTime? expireDateTime = null; + int? millisecondsToExpiry = null; if (rowData[TimeToBeReceivedColumn] != DBNull.Value) { - expireDateTime = (DateTime)rowData[TimeToBeReceivedColumn]; + millisecondsToExpiry = (int)rowData[TimeToBeReceivedColumn]; } //Has message expired? - if (expireDateTime.HasValue && expireDateTime.Value < DateTime.UtcNow) + if (millisecondsToExpiry.HasValue && millisecondsToExpiry.Value < 0L) { Logger.InfoFormat("Message with ID={0} has expired. Removing it from queue.", id); return MessageReadResult.NoMessage; @@ -161,9 +161,9 @@ MessageReadResult ExecuteReader(SqlCommand command) message.Headers[Headers.ReplyToAddress] = replyToAddress; } - if (expireDateTime.HasValue) + if (millisecondsToExpiry.HasValue) { - message.TimeToBeReceived = TimeSpan.FromTicks(expireDateTime.Value.Ticks - DateTime.UtcNow.Ticks); + message.TimeToBeReceived = TimeSpan.FromMilliseconds(millisecondsToExpiry.Value); } return MessageReadResult.Success(message); @@ -195,8 +195,6 @@ public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchS using (var command = new SqlCommand(commandText, connection)) { - command.Parameters.Add("UTCNow", SqlDbType.DateTime).Value = DateTime.UtcNow; - return command.ExecuteNonQuery(); } } @@ -228,7 +226,7 @@ public override string ToString() readonly string schema; static readonly JsonMessageSerializer HeaderSerializer = new JsonMessageSerializer(null); - static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "Expires", "Headers", "Body" }; + static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "TimeToBeReceivedMs", "Headers", "Body" }; static readonly SqlDbType[] ParameterTypes = { @@ -236,23 +234,23 @@ public override string ToString() SqlDbType.VarChar, SqlDbType.VarChar, SqlDbType.Bit, - SqlDbType.DateTime, + SqlDbType.Int, SqlDbType.VarChar, SqlDbType.VarBinary }; const string SqlSend = @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) - VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)"; + VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,IIF(@TimeToBeReceivedMs IS NOT NULL, DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()), NULL),@Headers,@Body)"; const string SqlReceive = @"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, - deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;"; + deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;"; const string SqlPurgeBatchOfExpiredMessages = - @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < @UTCNow"; + @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE()"; const string SqlCheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; From e89b4a88dbd5558aa71118af8ba6bd07749c6774 Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Fri, 4 Mar 2016 16:02:37 +0100 Subject: [PATCH 07/12] Purge expired message in order --- src/NServiceBus.SqlServer/SqlServerQueueCreator.cs | 8 +++++++- src/NServiceBus.SqlServer/TableBasedQueue.cs | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs index b3e64024f..13612dba5 100644 --- a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs +++ b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs @@ -27,7 +27,13 @@ [RowVersion] ASC CREATE NONCLUSTERED INDEX [Index_Expires] ON [{0}].[{1}] ( [Expires] ASC - )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) + ) + INCLUDE + ( + [Id], + [RowVersion] + ) + WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) END"; diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 770067f62..620607c24 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -250,7 +250,7 @@ DELETE FROM message deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;"; const string SqlPurgeBatchOfExpiredMessages = - @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE()"; + @"DELETE FROM [{1}].[{2}] WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])"; const string SqlCheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; From a9f492417f174eb7fc6068e7f6697aeab91e0bed Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Tue, 22 Mar 2016 10:24:32 +0100 Subject: [PATCH 08/12] Replace IIF function with CASE statement in TTBR calculations --- src/NServiceBus.SqlServer/TableBasedQueue.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 620607c24..927bd535e 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -241,13 +241,13 @@ public override string ToString() const string SqlSend = @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) - VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,IIF(@TimeToBeReceivedMs IS NOT NULL, DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()), NULL),@Headers,@Body)"; + VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,CASE WHEN @TimeToBeReceivedMs IS NOT NULL THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END,@Headers,@Body)"; const string SqlReceive = @"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, - deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;"; + deleted.Recoverable, CASE WHEN deleted.Expires IS NOT NULL THEN DATEDIFF(ms, GETUTCDATE(), deleted.Expires) END, deleted.Headers, deleted.Body;"; const string SqlPurgeBatchOfExpiredMessages = @"DELETE FROM [{1}].[{2}] WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])"; From 459186c5063c9a75b28309620fd8a6a85019f37f Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Thu, 9 Jun 2016 13:45:42 +0200 Subject: [PATCH 09/12] Quote table and schema name when assembling queue table name --- .../NServiceBus.SqlServer.UnitTests.csproj | 1 + .../TableBasedQueueTests.cs | 15 +++++++++++++++ src/NServiceBus.SqlServer/TableBasedQueue.cs | 18 ++++++++++-------- 3 files changed, 26 insertions(+), 8 deletions(-) create mode 100644 src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs diff --git a/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj b/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj index fb932a30b..93e22a46e 100644 --- a/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj +++ b/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj @@ -84,6 +84,7 @@ + diff --git a/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs b/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs new file mode 100644 index 000000000..9e588f024 --- /dev/null +++ b/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs @@ -0,0 +1,15 @@ +namespace NServiceBus.SqlServer.UnitTests +{ + using NServiceBus.Transports.SQLServer; + using NUnit.Framework; + + class TableBasedQueueTests + { + [Test] + public void Table_name_and_schema_should_be_quoted() + { + Assert.AreEqual("[nsb].[MyEndpoint]", new TableBasedQueue("MyEndpoint", "nsb").ToString()); + Assert.AreNotEqual("[nsb].[MyEndoint]SomeOtherData]", new TableBasedQueue("MyEndoint]SomeOtherData", "nsb").ToString()); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 927bd535e..490e61167 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -17,8 +17,10 @@ public TableBasedQueue(Address address, string schema) public TableBasedQueue(string tableName, string schema) { - this.tableName = tableName; - this.schema = schema; + var sanitizer = new SqlCommandBuilder(); + + this.tableName = sanitizer.QuoteIdentifier(tableName); + this.schema = sanitizer.QuoteIdentifier(schema); } public void Send(TransportMessage message, SendOptions sendOptions, SqlConnection connection, SqlTransaction transaction = null) @@ -209,7 +211,7 @@ public void LogWarningWhenIndexIsMissing(SqlConnection connection) if (rowsCount == 0) { - Logger.Warn($@"Table [{schema}].[{tableName}] does not contain index '{ExpiresIndexName}'. + Logger.Warn($@"Table {schema}.{tableName} does not contain index '{ExpiresIndexName}'. Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); } } @@ -217,7 +219,7 @@ public void LogWarningWhenIndexIsMissing(SqlConnection connection) public override string ToString() { - return tableName; + return $"{schema}.{tableName}"; } static readonly ILog Logger = LogManager.GetLogger(typeof(TableBasedQueue)); @@ -240,20 +242,20 @@ public override string ToString() }; const string SqlSend = - @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) + @"INSERT INTO {0}.{1} ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,CASE WHEN @TimeToBeReceivedMs IS NOT NULL THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END,@Headers,@Body)"; const string SqlReceive = - @"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) + @"WITH message AS (SELECT TOP(1) * FROM {0}.{1} WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, deleted.Recoverable, CASE WHEN deleted.Expires IS NOT NULL THEN DATEDIFF(ms, GETUTCDATE(), deleted.Expires) END, deleted.Headers, deleted.Body;"; const string SqlPurgeBatchOfExpiredMessages = - @"DELETE FROM [{1}].[{2}] WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])"; + @"DELETE FROM {1}.{2} WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM {1}.{2} WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])"; const string SqlCheckIfExpiresIndexIsPresent = - @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')"; + @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('{1}.{2}')"; const string ExpiresIndexName = "Index_Expires"; From 7e1d3672c9428ead37cd53710f6b03bf1890f44c Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Thu, 9 Jun 2016 21:23:01 +0200 Subject: [PATCH 10/12] Refactor unit test for better readability --- src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs b/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs index 9e588f024..1cb000ec1 100644 --- a/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs +++ b/src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs @@ -9,7 +9,7 @@ class TableBasedQueueTests public void Table_name_and_schema_should_be_quoted() { Assert.AreEqual("[nsb].[MyEndpoint]", new TableBasedQueue("MyEndpoint", "nsb").ToString()); - Assert.AreNotEqual("[nsb].[MyEndoint]SomeOtherData]", new TableBasedQueue("MyEndoint]SomeOtherData", "nsb").ToString()); + Assert.AreEqual("[nsb].[MyEndoint]]; SOME OTHER SQL;--]", new TableBasedQueue("MyEndoint]; SOME OTHER SQL;--", "nsb").ToString()); } } } \ No newline at end of file From c9c13421d9aed1ca0cc394adc56f77bbae517cd2 Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Wed, 15 Jun 2016 14:03:53 +0200 Subject: [PATCH 11/12] Acceptance test to validate proper queue table name quoting --- ...erviceBus.SqlServer.AcceptanceTests.csproj | 1 + .../When_ReplyTo_address_does_not_exist.cs | 118 ++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 src/NServiceBus.SqlServer.AcceptanceTests/When_ReplyTo_address_does_not_exist.cs diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj index 57d5346c7..4bf6a07c2 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj @@ -207,6 +207,7 @@ + diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_ReplyTo_address_does_not_exist.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_ReplyTo_address_does_not_exist.cs new file mode 100644 index 000000000..2f4e4f22a --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_ReplyTo_address_does_not_exist.cs @@ -0,0 +1,118 @@ +namespace NServiceBus.SqlServer.AcceptanceTests +{ + using System; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.Config; + using NServiceBus.Faults; + using NServiceBus.Features; + using NUnit.Framework; + + public class When_ReplyTo_address_does_not_exist + { + [Test] + public void Should_throw() + { + var context = Scenario.Define() + .WithEndpoint(b => b.When(bus => bus.SendLocal(new StartCommand()))) + .WithEndpoint() + .AllowExceptions() + .Done(c => c.ExceptionReceived) + .Run(); + + Assert.That(context.ExceptionMessage, Contains.Substring("The destination queue") & Contains.Substring("could not be found")); + Assert.That(context.ExceptionMessage, Contains.Substring("error] VALUES(NEWID(), NULL, NULL, 1, NULL, '', NULL); DROP TABLE [Victim]; INSERT INTO [error")); + } + + class Context : ScenarioContext + { + public bool ExceptionReceived { get; set; } + public string ExceptionMessage { get; set; } + } + + class Attacker : EndpointConfigurationBuilder + { + public Attacker() + { + EndpointSetup(c => + { + c.UseTransport() + .DisableCallbackReceiver(); + c.OverridePublicReturnAddress(Address.Parse("error] VALUES(NEWID(), NULL, NULL, 1, NULL, '', NULL); DROP TABLE [Victim]; INSERT INTO [error")); + }) + .AddMapping(typeof(Victim)); + } + + class StartHandler : IHandleMessages + { + public IBus Bus { get; set; } + + public void Handle(StartCommand message) + { + Bus.Send(new AttackCommand()); + } + } + + class AttackResponseHandler : IHandleMessages + { + public void Handle(AttackResponse message) + { + } + } + } + + class Victim : EndpointConfigurationBuilder + { + public Victim() + { + EndpointSetup(b => + { + b.RegisterComponents(c => c.ConfigureComponent(DependencyLifecycle.SingleInstance)); + b.DisableFeature(); + }) + .WithConfig(c => c.MaxRetries = 0); + } + + class AttackHandler : IHandleMessages + { + public IBus Bus { get; set; } + + public void Handle(AttackCommand message) + { + Bus.Reply(new AttackResponse()); + } + } + + class CustomFaultManager : IManageMessageFailures + { + public Context Context { get; set; } + + public void SerializationFailedForMessage(TransportMessage message, Exception e) + { + } + + public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) + { + Context.ExceptionMessage = e.Message; + Context.ExceptionReceived = true; + } + + public void Init(Address address) + { + } + } + } + + class StartCommand : ICommand + { + } + + class AttackCommand : ICommand + { + } + + class AttackResponse : IMessage + { + } + } +} \ No newline at end of file From ff5bed13c2f94b2d77cc8765a53940eb379d98fc Mon Sep 17 00:00:00 2001 From: Marcin Hoppe Date: Mon, 27 Jun 2016 11:31:03 +0200 Subject: [PATCH 12/12] Dispose SqlCommandBuilder --- src/NServiceBus.SqlServer/TableBasedQueue.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 490e61167..fa92c3e6d 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -17,10 +17,11 @@ public TableBasedQueue(Address address, string schema) public TableBasedQueue(string tableName, string schema) { - var sanitizer = new SqlCommandBuilder(); - - this.tableName = sanitizer.QuoteIdentifier(tableName); - this.schema = sanitizer.QuoteIdentifier(schema); + using (var sanitizer = new SqlCommandBuilder()) + { + this.tableName = sanitizer.QuoteIdentifier(tableName); + this.schema = sanitizer.QuoteIdentifier(schema); + } } public void Send(TransportMessage message, SendOptions sendOptions, SqlConnection connection, SqlTransaction transaction = null)