From 1fa10a72518f1ddbff0385050b2352c6ede493ad Mon Sep 17 00:00:00 2001 From: simoncropp Date: Fri, 21 Sep 2018 19:18:36 +1000 Subject: [PATCH 01/13] spelling --- ...ed_for_endpoint_inside_physical_address.cs | 2 +- ...sing_custom_transaction_via_sendoptions.cs | 28 +++++++++---------- .../EndpointFacadeBuilder.cs | 2 +- .../SqlServerTransportTests.cs | 1 - .../DictionarySerializerTests.cs | 4 +-- src/NServiceBus.SqlServer.sln.DotSettings | 2 ++ .../Queuing/SqlConstants.cs | 12 ++++---- .../Receiving/MessagePump.cs | 2 +- .../Receiving/QueuePeeker.cs | 2 +- .../Receiving/ReceiveStrategy.cs | 2 +- .../SendOptionsExtensions.cs | 6 ++-- .../SqlServerTransportInfrastructure.cs | 8 +++--- .../SqlServerTransportSettingsExtensions.cs | 12 ++++---- 13 files changed, 42 insertions(+), 41 deletions(-) diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_endpoint_inside_physical_address.cs b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_endpoint_inside_physical_address.cs index 23b73c84e..480b1a0c1 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_endpoint_inside_physical_address.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_endpoint_inside_physical_address.cs @@ -9,7 +9,7 @@ using Routing; using static AcceptanceTesting.Customization.Conventions; - //HINT: Message mappings specifed in app.config added to routing table using UnicastRoute.CreateFromPhysicalAddress. + //HINT: Message mappings specified in app.config added to routing table using UnicastRoute.CreateFromPhysicalAddress. // As a result this test covers also an app.config message mappings scenario public class When_custom_schema_configured_for_endpoint_inside_physical_address : When_custom_schema_configured_for_endpoint { diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs index 70d95cfa0..149a0b34c 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs @@ -34,26 +34,26 @@ public async Task Should_be_used_by_send_operations() rolledbackTransaction.Rollback(); } - using (var commitedTransaction = connection.BeginTransaction()) + using (var committedTransaction = connection.BeginTransaction()) { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(connection, commitedTransaction); + options.UseCustomSqlConnectionAndTransaction(connection, committedTransaction); - await bus.Send(new FromCommitedTransaction(), options); + await bus.Send(new FromCommittedTransaction(), options); - commitedTransaction.Commit(); + committedTransaction.Commit(); } } - + })) - .Done(c => c.ReceivedFromCommitedTransaction) + .Done(c => c.ReceivedFromCommittedTransaction) .Run(TimeSpan.FromMinutes(1)); Assert.IsFalse(context.ReceivedFromRolledbackTransaction); } - class FromCommitedTransaction : IMessage + class FromCommittedTransaction : IMessage { } @@ -63,7 +63,7 @@ class FromRolledbackTransaction : IMessage class MyContext : ScenarioContext { - public bool ReceivedFromCommitedTransaction { get; set; } + public bool ReceivedFromCommittedTransaction { get; set; } public bool ReceivedFromRolledbackTransaction { get; set; } } @@ -78,13 +78,13 @@ public AnEndpoint() var routing = c.ConfigureTransport().Routing(); var anEndpointName = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(AnEndpoint)); - routing.RouteToEndpoint(typeof(FromCommitedTransaction), anEndpointName); + routing.RouteToEndpoint(typeof(FromCommittedTransaction), anEndpointName); routing.RouteToEndpoint(typeof(FromRolledbackTransaction), anEndpointName); }); } - class ReplyHandler : IHandleMessages, - IHandleMessages + class ReplyHandler : IHandleMessages, + IHandleMessages { public MyContext Context { get; set; } @@ -95,15 +95,15 @@ public Task Handle(FromRolledbackTransaction message, IMessageHandlerContext con return Task.FromResult(0); } - public Task Handle(FromCommitedTransaction message, IMessageHandlerContext context) + public Task Handle(FromCommittedTransaction message, IMessageHandlerContext context) { - Context.ReceivedFromCommitedTransaction = true; + Context.ReceivedFromCommittedTransaction = true; return Task.FromResult(0); } } } - + } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.CompatibilityTests.Common/EndpointFacadeBuilder.cs b/src/NServiceBus.SqlServer.CompatibilityTests.Common/EndpointFacadeBuilder.cs index 64c50beee..b6c0ced9c 100644 --- a/src/NServiceBus.SqlServer.CompatibilityTests.Common/EndpointFacadeBuilder.cs +++ b/src/NServiceBus.SqlServer.CompatibilityTests.Common/EndpointFacadeBuilder.cs @@ -60,7 +60,7 @@ static string GetEndpointVersion() return "3.1"; } - throw new Exception("Unknow endpoint version."); + throw new Exception("Unknown endpoint version."); } } diff --git a/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs b/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs index 33c9150a0..5157b8303 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs @@ -26,7 +26,6 @@ public void It_reads_catalog_from_open_connection() var definition = new SqlServerTransport(); Func> factory = async () => { - var connection = new SqlConnection(connectionString); await connection.OpenAsync().ConfigureAwait(false); return connection; diff --git a/src/NServiceBus.SqlServer.UnitTests/DictionarySerializerTests.cs b/src/NServiceBus.SqlServer.UnitTests/DictionarySerializerTests.cs index d1c41744d..8b2e70a11 100644 --- a/src/NServiceBus.SqlServer.UnitTests/DictionarySerializerTests.cs +++ b/src/NServiceBus.SqlServer.UnitTests/DictionarySerializerTests.cs @@ -32,9 +32,9 @@ public void Can_round_trip() void AssertDictionariesAreTheSame(Dictionary before, Dictionary after) { - foreach (var beforeitem in before) + foreach (var beforeItem in before) { - Assert.AreEqual(beforeitem.Value, after[beforeitem.Key]); + Assert.AreEqual(beforeItem.Value, after[beforeItem.Key]); } Assert.AreEqual(before.Count, after.Count); } diff --git a/src/NServiceBus.SqlServer.sln.DotSettings b/src/NServiceBus.SqlServer.sln.DotSettings index 5ee41346d..025afdd2a 100644 --- a/src/NServiceBus.SqlServer.sln.DotSettings +++ b/src/NServiceBus.SqlServer.sln.DotSettings @@ -591,6 +591,8 @@ II.2.12 <HandlesEvent /> <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> True + True + True True True True diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index 90a41afe9..1a9adb441 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -119,9 +119,9 @@ AND type in (N'U')) RETURN IF EXISTS ( - SELECT * - FROM {1}.sys.columns - WHERE object_id = OBJECT_ID(N'{0}') + SELECT * + FROM {1}.sys.columns + WHERE object_id = OBJECT_ID(N'{0}') AND name = 'BodyString' ) RETURN @@ -129,9 +129,9 @@ IF EXISTS ( EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive' IF EXISTS ( - SELECT * - FROM {1}.sys.columns - WHERE object_id = OBJECT_ID(N'{0}') + SELECT * + FROM {1}.sys.columns + WHERE object_id = OBJECT_ID(N'{0}') AND name = 'BodyString' ) BEGIN diff --git a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs index 4aafbe619..0b95fa9b1 100644 --- a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs +++ b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs @@ -202,7 +202,7 @@ async Task PurgeExpiredMessages() } catch (SqlException e) when (cancellationToken.IsCancellationRequested) { - Logger.Debug("Exception thown while performing cancellation", e); + Logger.Debug("Exception thrown while performing cancellation", e); } } diff --git a/src/NServiceBus.SqlServer/Receiving/QueuePeeker.cs b/src/NServiceBus.SqlServer/Receiving/QueuePeeker.cs index c2e2181b9..50d23d4d2 100644 --- a/src/NServiceBus.SqlServer/Receiving/QueuePeeker.cs +++ b/src/NServiceBus.SqlServer/Receiving/QueuePeeker.cs @@ -64,7 +64,7 @@ public async Task Peek(TableBasedQueue inputQueue, RepeatedFailuresOverTime } catch (SqlException e) when (cancellationToken.IsCancellationRequested) { - Logger.Debug("Exception thown while performing cancellation", e); + Logger.Debug("Exception thrown while performing cancellation", e); } catch (Exception ex) { diff --git a/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs b/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs index d6161676c..eebc8eb5a 100644 --- a/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs +++ b/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs @@ -74,7 +74,7 @@ protected async Task HandleError(Exception exception, Message } catch (Exception ex) { - criticalError.Raise($"Failed to execute reverability actions for message `{message.TransportId}`", ex); + criticalError.Raise($"Failed to execute recoverability actions for message `{message.TransportId}`", ex); return ErrorHandleResult.RetryRequired; } diff --git a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs index 678f922cb..0df56716e 100644 --- a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs +++ b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs @@ -9,12 +9,12 @@ public static class SendOptionsExtensions { /// - /// Enables providing SqlConnection and SqlTransaction instances that will be used by send operations. The same connection and transaction + /// Enables providing and instances that will be used by send operations. The same connection and transaction /// can be used in more than one send operation. /// /// The to extend. - /// Open SqlConnection instance to be used by send operations. - /// SqlTransaction instance that will be used by any operations perfromed by the transport. + /// Open instance to be used by send operations. + /// instance that will be used by any operations performed by the transport. public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction) { var transportTransaction = new TransportTransaction(); diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 0a7764a00..a9dc69864 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -224,10 +224,10 @@ public override TransportSendInfrastructure ConfigureSendInfrastructure() DelayedMessageTable CreateDelayedMessageTable() { - var delatedQueueTableName = GetDelayedQueueTableName(); + var deletedQueueTableName = GetDelayedQueueTableName(); var inputQueueTable = addressTranslator.Parse(ToTransportAddress(GetLogicalAddress())).QualifiedTableName; - return new DelayedMessageTable(delatedQueueTableName.QualifiedTableName, inputQueueTable); + return new DelayedMessageTable(deletedQueueTableName.QualifiedTableName, inputQueueTable); } /// @@ -254,8 +254,8 @@ CanonicalQueueAddress GetDelayedQueueTableName() { throw new Exception("Native delayed delivery feature requires configuring a table suffix."); } - var delayedQueueLogialAddress = GetLogicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix); - var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogialAddress); + var delayedQueueLogicalAddress = GetLogicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix); + var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogicalAddress); return addressTranslator.GetCanonicalForm(delayedQueueAddress); } diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index aae6367e7..937589db1 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -125,7 +125,7 @@ public static TransportExtensions TimeToWaitBeforeTriggering Guard.AgainstNegativeAndZero(nameof(waitTime), waitTime); transportExtensions.GetSettings().Set(SettingsKeys.TimeToWaitBeforeTriggering, waitTime); - + return transportExtensions; } @@ -145,11 +145,11 @@ public static TransportExtensions UseCustomSqlConnectionFact } /// - /// Allows the IsolationLevel and transaction timeout to be changed for the TransactionScope used to receive messages. + /// Allows the and transaction timeout to be changed for the used to receive messages. /// /// /// If not specified the default transaction timeout of the machine will be used and the isolation level will be set to - /// `ReadCommited`. + /// . /// public static TransportExtensions TransactionScopeOptions(this TransportExtensions transportExtensions, TimeSpan? timeout = null, IsolationLevel? isolationLevel = null) { @@ -157,7 +157,7 @@ public static TransportExtensions TransactionScopeOptions(th if (isolationLevel != IsolationLevel.ReadCommitted && isolationLevel != IsolationLevel.RepeatableRead) { - Logger.Warn("TransactionScope should be only used with either the ReadCommited or the RepeatableRead isolation level."); + Logger.Warn("TransactionScope should be only used with either the ReadCommitted or the RepeatableRead isolation level."); } transportExtensions.GetSettings().Set(new SqlScopeOptions(timeout, isolationLevel)); @@ -226,8 +226,8 @@ public static TransportExtensions CreateMessageBodyComputedC /// The to extend. /// Function that returns opened sql connection based on destination queue name. [ObsoleteEx( - RemoveInVersion = "5.0", - TreatAsErrorFromVersion = "4.0", + RemoveInVersion = "5.0", + TreatAsErrorFromVersion = "4.0", Message = "Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalog addressing instead.")] public static TransportExtensions EnableLegacyMultiInstanceMode(this TransportExtensions transportExtensions, Func> sqlConnectionFactory) { From 5880da90b885554e0781a936428edcf39394badf Mon Sep 17 00:00:00 2001 From: simoncropp Date: Fri, 21 Sep 2018 19:31:00 +1000 Subject: [PATCH 02/13] remove redundant connection param --- ...sing_custom_transaction_via_sendoptions.cs | 10 +++++----- .../SendOptionsExtensions.cs | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs index 70d95cfa0..91c448a47 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs @@ -27,7 +27,7 @@ public async Task Should_be_used_by_send_operations() { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(connection, rolledbackTransaction); + options.UseCustomSqlConnectionAndTransaction(rolledbackTransaction); await bus.Send(new FromRolledbackTransaction(), options); @@ -38,14 +38,14 @@ public async Task Should_be_used_by_send_operations() { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(connection, commitedTransaction); + options.UseCustomSqlConnectionAndTransaction(commitedTransaction); await bus.Send(new FromCommitedTransaction(), options); commitedTransaction.Commit(); } } - + })) .Done(c => c.ReceivedFromCommitedTransaction) .Run(TimeSpan.FromMinutes(1)); @@ -83,7 +83,7 @@ public AnEndpoint() }); } - class ReplyHandler : IHandleMessages, + class ReplyHandler : IHandleMessages, IHandleMessages { public MyContext Context { get; set; } @@ -104,6 +104,6 @@ public Task Handle(FromCommitedTransaction message, IMessageHandlerContext conte } } - + } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs index 678f922cb..67b259515 100644 --- a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs +++ b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs @@ -15,6 +15,10 @@ public static class SendOptionsExtensions /// The to extend. /// Open SqlConnection instance to be used by send operations. /// SqlTransaction instance that will be used by any operations perfromed by the transport. + [ObsoleteEx( + RemoveInVersion = "5.0", + TreatAsErrorFromVersion = "4.0", + Message = "The connection parameter is no longer required. Use the alternate overload with only the transaction parameter.")] public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction) { var transportTransaction = new TransportTransaction(); @@ -23,5 +27,20 @@ public static void UseCustomSqlConnectionAndTransaction(this SendOptions options options.GetExtensions().Set(transportTransaction); } + + /// + /// Enables providing SqlConnection and SqlTransaction instances that will be used by send operations. The same connection and transaction + /// can be used in more than one send operation. + /// + /// The to extend. + /// SqlTransaction instance that will be used by any operations perfromed by the transport. + public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlTransaction transaction) + { + var transportTransaction = new TransportTransaction(); + transportTransaction.Set(transaction.Connection); + transportTransaction.Set(transaction); + + options.GetExtensions().Set(transportTransaction); + } } } \ No newline at end of file From f86f0abf6ca24a9819e2ffd067271f50aa297e2d Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Wed, 26 Sep 2018 08:58:30 +0200 Subject: [PATCH 03/13] fixing api approvals --- .../APIApprovals.Approve.approved.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt index 029e28793..ff0cbf2d6 100644 --- a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt @@ -24,7 +24,10 @@ namespace NServiceBus.Transport.SQLServer } public class static SendOptionsExtensions { + [System.ObsoleteAttribute("The connection parameter is no longer required. Use the alternate overload with o" + + "nly the transaction parameter. Will be removed in version 5.0.0.", true)] public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { } + public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlTransaction transaction) { } } [System.ObsoleteAttribute("Not for public use.")] public class static SqlConstants From 04c8ba75cb374ce8fb887918b5fb98706a19dea3 Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Wed, 26 Sep 2018 09:07:50 +0200 Subject: [PATCH 04/13] typo --- .../When_passing_custom_transaction_via_sendoptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs index 4d3ad5cd4..1d7037b61 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs @@ -38,7 +38,7 @@ public async Task Should_be_used_by_send_operations() { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(commitedTransaction); + options.UseCustomSqlConnectionAndTransaction(committedTransaction); await bus.Send(new FromCommittedTransaction(), options); From b1c5c92f5f8973c0d0cd256527eecb27e5331cd5 Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Thu, 27 Sep 2018 13:30:17 +0200 Subject: [PATCH 05/13] adjusting extension method name --- .../When_passing_custom_transaction_via_sendoptions.cs | 4 ++-- .../APIApprovals.Approve.approved.txt | 2 +- src/NServiceBus.SqlServer/SendOptionsExtensions.cs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs index 1d7037b61..10eae10d0 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs @@ -27,7 +27,7 @@ public async Task Should_be_used_by_send_operations() { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(rolledbackTransaction); + options.UseCustomSqlTransaction(rolledbackTransaction); await bus.Send(new FromRolledbackTransaction(), options); @@ -38,7 +38,7 @@ public async Task Should_be_used_by_send_operations() { var options = new SendOptions(); - options.UseCustomSqlConnectionAndTransaction(committedTransaction); + options.UseCustomSqlTransaction(committedTransaction); await bus.Send(new FromCommittedTransaction(), options); diff --git a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt index ff0cbf2d6..623424db6 100644 --- a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt @@ -27,7 +27,7 @@ namespace NServiceBus.Transport.SQLServer [System.ObsoleteAttribute("The connection parameter is no longer required. Use the alternate overload with o" + "nly the transaction parameter. Will be removed in version 5.0.0.", true)] public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { } - public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlTransaction transaction) { } + public static void UseCustomSqlTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlTransaction transaction) { } } [System.ObsoleteAttribute("Not for public use.")] public class static SqlConstants diff --git a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs index dec794801..62df04923 100644 --- a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs +++ b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs @@ -34,7 +34,7 @@ public static void UseCustomSqlConnectionAndTransaction(this SendOptions options /// /// The to extend. /// SqlTransaction instance that will be used by any operations perfromed by the transport. - public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlTransaction transaction) + public static void UseCustomSqlTransaction(this SendOptions options, SqlTransaction transaction) { var transportTransaction = new TransportTransaction(); transportTransaction.Set(transaction.Connection); From ba34603f8d362591418c0db222edf8b964ef8d6b Mon Sep 17 00:00:00 2001 From: Bob Langley Date: Tue, 15 Jan 2019 15:53:49 -0800 Subject: [PATCH 06/13] Use core 7.1 timeout migration mode --- ...erviceBus.SqlServer.AcceptanceTests.csproj | 2 +- ...iceBus.SqlServer.CompatibilityTests.csproj | 2 +- ...rviceBus.SqlServer.IntegrationTests.csproj | 2 +- ...ServiceBus.SqlServer.TransportTests.csproj | 2 +- .../NServiceBus.SqlServer.UnitTests.csproj | 2 +- .../Configuration/SettingsKeys.cs | 2 ++ .../DelayedDeiveryTableBasedQueueFactory.cs | 17 -------------- .../DelayedDeliveryInfrastructure.cs | 5 ---- .../PreventRoutingMessagesToTimeoutManager.cs | 12 ---------- .../NServiceBus.SqlServer.csproj | 2 +- .../SqlServerTransportInfrastructure.cs | 23 +++++++------------ .../SqlServerTransportSettingsExtensions.cs | 2 -- 12 files changed, 16 insertions(+), 57 deletions(-) delete mode 100644 src/NServiceBus.SqlServer/DelayedDelivery/PreventRoutingMessagesToTimeoutManager.cs diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj index 3d247663f..c33943e1c 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/NServiceBus.SqlServer.CompatibilityTests/NServiceBus.SqlServer.CompatibilityTests.csproj b/src/NServiceBus.SqlServer.CompatibilityTests/NServiceBus.SqlServer.CompatibilityTests.csproj index b616e51e8..d864e3053 100644 --- a/src/NServiceBus.SqlServer.CompatibilityTests/NServiceBus.SqlServer.CompatibilityTests.csproj +++ b/src/NServiceBus.SqlServer.CompatibilityTests/NServiceBus.SqlServer.CompatibilityTests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj b/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj index 2a5c33966..ac984f5b3 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj +++ b/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj index 4f9d167e6..8890ec8f5 100644 --- a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj +++ b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj b/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj index a85b9c528..2bb0dcbf6 100644 --- a/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj +++ b/src/NServiceBus.SqlServer.UnitTests/NServiceBus.SqlServer.UnitTests.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs index a796c6b39..8424c84b5 100644 --- a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs +++ b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs @@ -16,5 +16,7 @@ class SettingsKeys public const string SchemaPropertyKey = "Schema"; public const string CatalogPropertyKey = "Catalog"; + + public const string EnableMigrationMode = "NServiceBus.TimeoutManager.EnableMigrationMode"; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs index 2341790ee..5cc451f52 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs @@ -43,17 +43,6 @@ static DispatchBehavior GetDueTime(UnicastTransportOperation operation) { return DispatchBehavior.Deferred(DateTime.UtcNow + delayDeliveryWith.Delay, operation.Destination); } - var headers = operation.Message.Headers; - if (headers.TryGetValue(TimeoutManagerHeaders.Expire, out var expireString)) - { - var expirationTime = DateTimeExtensions.ToUtcDateTime(expireString); - var destination = headers[TimeoutManagerHeaders.RouteExpiredTimeoutTo]; - - headers.Remove(TimeoutManagerHeaders.RouteExpiredTimeoutTo); - headers.Remove(TimeoutManagerHeaders.Expire); - - return DispatchBehavior.Deferred(expirationTime, destination); - } return DispatchBehavior.Immediately(); } @@ -87,11 +76,5 @@ public static DispatchBehavior Deferred(DateTime dueTime, string destination) }; } } - - static class TimeoutManagerHeaders - { - public const string Expire = "NServiceBus.Timeout.Expire"; - public const string RouteExpiredTimeoutTo = "NServiceBus.Timeout.RouteExpiredTimeoutTo"; - } } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs index d0b936eeb..8d2e0e444 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs @@ -6,11 +6,6 @@ static class DelayedDeliveryInfrastructure { public static StartupCheckResult CheckForInvalidSettings(SettingsHolder settings) { - var externalTimeoutManagerAddress = settings.GetOrDefault("NServiceBus.ExternalTimeoutManagerAddress") != null; - if (externalTimeoutManagerAddress) - { - return StartupCheckResult.Failed("An external timeout manager address cannot be configured because the timeout manager is not being used for delayed delivery."); - } var sendOnlyEndpoint = settings.GetOrDefault("Endpoint.SendOnly"); if (sendOnlyEndpoint) { diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/PreventRoutingMessagesToTimeoutManager.cs b/src/NServiceBus.SqlServer/DelayedDelivery/PreventRoutingMessagesToTimeoutManager.cs deleted file mode 100644 index 5e9edb021..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/PreventRoutingMessagesToTimeoutManager.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using Features; - - class PreventRoutingMessagesToTimeoutManager : Feature - { - protected override void Setup(FeatureConfigurationContext context) - { - context.Pipeline.Remove("RouteDeferredMessageToTimeoutManager"); - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index 15598aa68..7b0f22576 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -19,7 +19,7 @@ - + diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index a9dc69864..07208754c 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -23,30 +23,23 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat schemaAndCatalogSettings = settings.GetOrCreate(); delayedDeliverySettings = settings.GetOrDefault(); var timeoutManagerFeatureDisabled = settings.GetOrDefault(typeof(TimeoutManager).FullName) == FeatureState.Disabled; - + + var timeoutManagerDisabled = (delayedDeliverySettings != null && delayedDeliverySettings.TimeoutManagerDisabled) || timeoutManagerFeatureDisabled; + + settings.SetDefault(SettingsKeys.EnableMigrationMode, !timeoutManagerDisabled); + diagnostics.Add("NServiceBus.Transport.SqlServer.TimeoutManager", new { FeatureEnabled = !timeoutManagerFeatureDisabled }); - - if (delayedDeliverySettings != null && timeoutManagerFeatureDisabled) + + if (delayedDeliverySettings != null && timeoutManagerDisabled) { delayedDeliverySettings.DisableTimeoutManagerCompatibility(); } } - public override IEnumerable DeliveryConstraints - { - get - { - yield return typeof(DiscardIfNotReceivedBefore); - if (delayedDeliverySettings != null && delayedDeliverySettings.TimeoutManagerDisabled) - { - yield return typeof(DoNotDeliverBefore); - yield return typeof(DelayDeliveryWith); - } - } - } + public override IEnumerable DeliveryConstraints => new List {typeof(DiscardIfNotReceivedBefore), typeof(DoNotDeliverBefore), typeof(DelayDeliveryWith)}; public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.TransactionScope; diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index 937589db1..ea82690e4 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using System.Transactions; using Configuration.AdvancedExtensibility; - using Features; using Logging; /// @@ -190,7 +189,6 @@ public static DelayedDeliverySettings UseNativeDelayedDelivery(this TransportExt } var settings = new DelayedDeliverySettings(); transportExtensions.GetSettings().Set(settings); - transportExtensions.GetSettings().EnableFeatureByDefault(); return settings; } From 6eea0e0706fd101acd429b3b7849ab587ad0dc1e Mon Sep 17 00:00:00 2001 From: Bob Langley Date: Tue, 15 Jan 2019 19:43:06 -0800 Subject: [PATCH 07/13] Fix inspections --- .../SqlServerTransportSettingsExtensions.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index ea82690e4..193bf6ae0 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -159,7 +159,7 @@ public static TransportExtensions TransactionScopeOptions(th Logger.Warn("TransactionScope should be only used with either the ReadCommitted or the RepeatableRead isolation level."); } - transportExtensions.GetSettings().Set(new SqlScopeOptions(timeout, isolationLevel)); + transportExtensions.GetSettings().Set(new SqlScopeOptions(timeout, isolationLevel)); return transportExtensions; } @@ -173,7 +173,7 @@ public static TransportExtensions WithPeekDelay(this Transpo { Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay)); + transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay)); return transportExtensions; } @@ -188,7 +188,7 @@ public static DelayedDeliverySettings UseNativeDelayedDelivery(this TransportExt throw new Exception("Native delayed delivery is only supported for endpoints capable of receiving messages."); } var settings = new DelayedDeliverySettings(); - transportExtensions.GetSettings().Set(settings); + transportExtensions.GetSettings().Set(settings); return settings; } From c3a58d572298e4fefb8f6b01e9677c7080b2108a Mon Sep 17 00:00:00 2001 From: Bob Langley Date: Wed, 16 Jan 2019 17:13:28 -0800 Subject: [PATCH 08/13] Revert removing check on configured native delivery when reporting DeliveryConstraints --- .../SqlServerTransportInfrastructure.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 07208754c..30318b398 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -39,7 +39,18 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat } } - public override IEnumerable DeliveryConstraints => new List {typeof(DiscardIfNotReceivedBefore), typeof(DoNotDeliverBefore), typeof(DelayDeliveryWith)}; + public override IEnumerable DeliveryConstraints + { + get + { + yield return typeof(DiscardIfNotReceivedBefore); + if (delayedDeliverySettings != null) + { + yield return typeof(DoNotDeliverBefore); + yield return typeof(DelayDeliveryWith); + } + } + } public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.TransactionScope; From 87f64abd20a0ddc1c029864dc8c1e2098b4b0832 Mon Sep 17 00:00:00 2001 From: Bob Langley Date: Thu, 17 Jan 2019 21:55:02 -0800 Subject: [PATCH 09/13] Simplify infrastructure constructor --- .../SqlServerTransportInfrastructure.cs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 30318b398..f130e911f 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -22,20 +22,21 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat schemaAndCatalogSettings = settings.GetOrCreate(); delayedDeliverySettings = settings.GetOrDefault(); - var timeoutManagerFeatureDisabled = settings.GetOrDefault(typeof(TimeoutManager).FullName) == FeatureState.Disabled; - - var timeoutManagerDisabled = (delayedDeliverySettings != null && delayedDeliverySettings.TimeoutManagerDisabled) || timeoutManagerFeatureDisabled; - - settings.SetDefault(SettingsKeys.EnableMigrationMode, !timeoutManagerDisabled); + var timeoutManagerFeatureDisabled = !settings.IsFeatureActive(typeof(TimeoutManager)); diagnostics.Add("NServiceBus.Transport.SqlServer.TimeoutManager", new { FeatureEnabled = !timeoutManagerFeatureDisabled }); - if (delayedDeliverySettings != null && timeoutManagerDisabled) + if (delayedDeliverySettings != null) { - delayedDeliverySettings.DisableTimeoutManagerCompatibility(); + settings.Set(SettingsKeys.EnableMigrationMode, true); + + if (timeoutManagerFeatureDisabled) + { + delayedDeliverySettings.DisableTimeoutManagerCompatibility(); + } } } From 07b78c8743966872fed44e59b612fff4a25859a9 Mon Sep 17 00:00:00 2001 From: Brandon Ording Date: Mon, 21 Jan 2019 15:23:52 -0500 Subject: [PATCH 10/13] Don't overwrite settings when called more than once --- .../SqlServerTransportSettingsExtensions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index 193bf6ae0..b2cca8110 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -187,8 +187,8 @@ public static DelayedDeliverySettings UseNativeDelayedDelivery(this TransportExt { throw new Exception("Native delayed delivery is only supported for endpoints capable of receiving messages."); } - var settings = new DelayedDeliverySettings(); - transportExtensions.GetSettings().Set(settings); + + var settings = transportExtensions.GetSettings().GetOrCreate(); return settings; } From 688e3d6a145274dc5f65ee26dda6a6df6f7c76be Mon Sep 17 00:00:00 2001 From: Brandon Ording Date: Mon, 21 Jan 2019 16:02:43 -0500 Subject: [PATCH 11/13] Integrate migration mode with existing API --- .../DelayedDelivery/DelayedDeliverySettings.cs | 4 ++-- .../SqlServerTransportInfrastructure.cs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliverySettings.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliverySettings.cs index 9f9cf0473..9ffe44a4a 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliverySettings.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliverySettings.cs @@ -9,7 +9,7 @@ public class DelayedDeliverySettings { internal string Suffix = "Delayed"; internal TimeSpan Interval = TimeSpan.FromSeconds(1); - internal bool TimeoutManagerDisabled; + internal bool EnableMigrationMode = true; internal int MatureBatchSize = 100; /// @@ -39,7 +39,7 @@ public void BatchSize(int batchSize) /// public void DisableTimeoutManagerCompatibility() { - TimeoutManagerDisabled = true; + EnableMigrationMode = false; } /// diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index f130e911f..4accc0bfd 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -31,12 +31,12 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat if (delayedDeliverySettings != null) { - settings.Set(SettingsKeys.EnableMigrationMode, true); - if (timeoutManagerFeatureDisabled) { delayedDeliverySettings.DisableTimeoutManagerCompatibility(); } + + settings.Set(SettingsKeys.EnableMigrationMode, delayedDeliverySettings.EnableMigrationMode); } } @@ -286,7 +286,7 @@ public override Task Start() delayedDeliverySettings.Suffix, delayedDeliverySettings.Interval, BatchSize = delayedDeliverySettings.MatureBatchSize, - TimoutManager = delayedDeliverySettings.TimeoutManagerDisabled ? "disabled" : "enabled" + TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled" }); var delayedMessageTable = CreateDelayedMessageTable(); From b961496a82773ee4a629c95cee0c05fef9b38503 Mon Sep 17 00:00:00 2001 From: Brandon Ording Date: Mon, 21 Jan 2019 16:37:01 -0500 Subject: [PATCH 12/13] Use correct feature API --- src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 4accc0bfd..4f3a4e7b8 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -22,7 +22,7 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat schemaAndCatalogSettings = settings.GetOrCreate(); delayedDeliverySettings = settings.GetOrDefault(); - var timeoutManagerFeatureDisabled = !settings.IsFeatureActive(typeof(TimeoutManager)); + var timeoutManagerFeatureDisabled = !settings.IsFeatureEnabled(typeof(TimeoutManager)); diagnostics.Add("NServiceBus.Transport.SqlServer.TimeoutManager", new { From 06c5d01be8c97c9d499ce89785263f5376ecdd3f Mon Sep 17 00:00:00 2001 From: Kyle Baley Date: Mon, 18 Mar 2019 11:19:13 -0500 Subject: [PATCH 13/13] Faster way to peek messages to retrieve message count (#480) Fixes #481 When peeking messages, we retrieve the count only up to the maximum concurrency level. This prevents the transport from effectively locking up if there are a lot of records to peek. --- .../When_message_receive_takes_long.cs | 1 + src/NServiceBus.SqlServer/Queuing/SqlConstants.cs | 2 +- src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs | 8 +++++++- src/NServiceBus.SqlServer/Receiving/MessagePump.cs | 1 + 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs index c0827d9dc..94aa14a63 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs @@ -69,6 +69,7 @@ static async Task TryPeekQueueSize(TableBasedQueue tableBasedQueue, SqlConnectio using (var connection = await sqlConnectionFactory.OpenNewConnection()) using (var tx = connection.BeginTransaction()) { + tableBasedQueue.FormatPeekCommand(100); await tableBasedQueue.TryPeek(connection, tx, CancellationToken.None, PeekTimeoutInSeconds); scope.Complete(); } diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index 1a9adb441..7be977a9a 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -108,7 +108,7 @@ DELETE FROM message public static readonly string PeekText = @" SELECT count(*) Id -FROM {0} WITH (READPAST);"; +FROM (SELECT TOP {1} * FROM {0} WITH (READPAST)) as count_table;"; public static readonly string AddMessageBodyStringColumn = @" IF NOT EXISTS ( diff --git a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs index 19661837f..f9ea15c3a 100644 --- a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs @@ -17,7 +17,6 @@ public TableBasedQueue(string qualifiedTableName, string queueName) #pragma warning disable 618 this.qualifiedTableName = qualifiedTableName; Name = queueName; - peekCommand = Format(SqlConstants.PeekText, this.qualifiedTableName); receiveCommand = Format(SqlConstants.ReceiveText, this.qualifiedTableName); sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName); purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName); @@ -39,6 +38,13 @@ public virtual async Task TryPeek(SqlConnection connection, SqlTransaction } } + public void FormatPeekCommand(int maxRecordsToPeek) + { +#pragma warning disable 618 + peekCommand = Format(SqlConstants.PeekText, qualifiedTableName, maxRecordsToPeek); +#pragma warning restore 618 + } + public virtual async Task TryReceive(SqlConnection connection, SqlTransaction transaction) { using (var command = new SqlCommand(receiveCommand, connection, transaction)) diff --git a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs index 0b95fa9b1..9f0815f77 100644 --- a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs +++ b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs @@ -54,6 +54,7 @@ public async Task Init(Func onMessage, Func(); concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency); cancellationTokenSource = new CancellationTokenSource();