From 6b6fe729bc4b98a45a4652cb76bf3a94a314708c Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Mon, 9 Nov 2020 03:06:39 -0800 Subject: [PATCH] Peek batch size in 6.2 (#711) * Add MaxRecordsToPeek to QueuePeekerOptions * Use MaxRecordsToPeek to determine the peek batch size * Allows to configure queue peeker options * Fix test * Rework obsolete strategy * Validate peek batch size * Approved API * typo --- .../APIApprovals.Approve.approved.txt | 3 +++ .../When_receiving_messages.cs | 1 + .../APIApprovals.Approve.approved.txt | 3 +++ .../Receiving/MessagePump.cs | 6 ++++-- .../Receiving/QueuePeekerOptions.cs | 15 ++++++++++++++- .../SqlServerTransportInfrastructure.cs | 2 +- .../SqlServerTransportSettingsExtensions.cs | 17 ++++++++++++++++- 7 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt index 3caac1b0a..8d7d1388a 100644 --- a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -32,6 +32,7 @@ namespace NServiceBus public static NServiceBus.TransportExtensions DefaultSchema(this NServiceBus.TransportExtensions transportExtensions, string schemaName) { } public static NServiceBus.Transport.SqlServer.DelayedDeliverySettings NativeDelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions PurgeExpiredMessagesOnStartup(this NServiceBus.TransportExtensions transportExtensions, System.Nullable purgeBatchSize) { } + public static NServiceBus.TransportExtensions QueuePeekerOptions(this NServiceBus.TransportExtensions transportExtensions, System.Nullable delay = null, System.Nullable peekBatchSize = null) { } public static NServiceBus.Transport.SqlServer.SubscriptionSettings SubscriptionSettings(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } public static NServiceBus.TransportExtensions TransactionScopeOptions(this NServiceBus.TransportExtensions transportExtensions, System.Nullable timeout = null, System.Nullable isolationLevel = null) { } @@ -40,6 +41,8 @@ namespace NServiceBus public static NServiceBus.TransportExtensions UseCustomSqlConnectionFactory(this NServiceBus.TransportExtensions transportExtensions, System.Func> sqlConnectionFactory) { } public static NServiceBus.TransportExtensions UseSchemaForEndpoint(this NServiceBus.TransportExtensions transportExtensions, string endpointName, string schema) { } public static NServiceBus.TransportExtensions UseSchemaForQueue(this NServiceBus.TransportExtensions transportExtensions, string queueName, string schema) { } + [System.ObsoleteAttribute("WithPeekDelay has been obsoleted. Use `QueuePeekerOptions` instead. Will be treat" + + "ed as an error from version 7.0.0. Will be removed in version 8.0.0.", false)] public static NServiceBus.TransportExtensions WithPeekDelay(this NServiceBus.TransportExtensions transportExtensions, System.Nullable delay = null) { } } } diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs index f5936a3d0..3106aafe8 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs @@ -38,6 +38,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive( new QueuePurger(sqlConnectionFactory), new NoOpExpiredMessagesPurger(), new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()), + new QueuePeekerOptions(), new SchemaInspector(_ => sqlConnectionFactory.OpenNewConnection(), false), TimeSpan.MaxValue); diff --git a/src/NServiceBus.Transport.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt index 30cae22a9..3a6b78f4a 100644 --- a/src/NServiceBus.Transport.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -31,6 +31,7 @@ namespace NServiceBus public static NServiceBus.TransportExtensions DefaultSchema(this NServiceBus.TransportExtensions transportExtensions, string schemaName) { } public static NServiceBus.Transport.SqlServer.DelayedDeliverySettings NativeDelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions PurgeExpiredMessagesOnStartup(this NServiceBus.TransportExtensions transportExtensions, System.Nullable purgeBatchSize) { } + public static NServiceBus.TransportExtensions QueuePeekerOptions(this NServiceBus.TransportExtensions transportExtensions, System.Nullable delay = null, System.Nullable peekBatchSize = null) { } public static NServiceBus.Transport.SqlServer.SubscriptionSettings SubscriptionSettings(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } public static NServiceBus.TransportExtensions TransactionScopeOptions(this NServiceBus.TransportExtensions transportExtensions, System.Nullable timeout = null, System.Nullable isolationLevel = null) { } @@ -39,6 +40,8 @@ namespace NServiceBus public static NServiceBus.TransportExtensions UseCustomSqlConnectionFactory(this NServiceBus.TransportExtensions transportExtensions, System.Func> sqlConnectionFactory) { } public static NServiceBus.TransportExtensions UseSchemaForEndpoint(this NServiceBus.TransportExtensions transportExtensions, string endpointName, string schema) { } public static NServiceBus.TransportExtensions UseSchemaForQueue(this NServiceBus.TransportExtensions transportExtensions, string queueName, string schema) { } + [System.ObsoleteAttribute("WithPeekDelay has been obsoleted. Use `QueuePeekerOptions` instead. Will be treat" + + "ed as an error from version 7.0.0. Will be removed in version 8.0.0.", false)] public static NServiceBus.TransportExtensions WithPeekDelay(this NServiceBus.TransportExtensions transportExtensions, System.Nullable delay = null) { } } } diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/MessagePump.cs b/src/NServiceBus.Transport.SqlServer/Receiving/MessagePump.cs index 648e292a4..d252542d4 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/MessagePump.cs @@ -12,13 +12,14 @@ class MessagePump : IPushMessages { - public MessagePump(Func receiveStrategyFactory, Func queueFactory, IPurgeQueues queuePurger, IExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, SchemaInspector schemaInspector, TimeSpan waitTimeCircuitBreaker) + public MessagePump(Func receiveStrategyFactory, Func queueFactory, IPurgeQueues queuePurger, IExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, QueuePeekerOptions queuePeekerOptions, SchemaInspector schemaInspector, TimeSpan waitTimeCircuitBreaker) { this.receiveStrategyFactory = receiveStrategyFactory; this.queuePurger = queuePurger; this.queueFactory = queueFactory; this.expiredMessagesPurger = expiredMessagesPurger; this.queuePeeker = queuePeeker; + this.queuePeekerOptions = queuePeekerOptions; this.schemaInspector = schemaInspector; this.waitTimeCircuitBreaker = waitTimeCircuitBreaker; } @@ -56,7 +57,7 @@ public async Task Init(Func onMessage, Func queueFactory; IExpiredMessagesPurger expiredMessagesPurger; IPeekMessagesInQueue queuePeeker; + QueuePeekerOptions queuePeekerOptions; SchemaInspector schemaInspector; TimeSpan waitTimeCircuitBreaker; SemaphoreSlim concurrencyLimiter; diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/QueuePeekerOptions.cs b/src/NServiceBus.Transport.SqlServer/Receiving/QueuePeekerOptions.cs index b118faaf3..ff030d834 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/QueuePeekerOptions.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/QueuePeekerOptions.cs @@ -5,7 +5,7 @@ class QueuePeekerOptions { - public QueuePeekerOptions(TimeSpan? delayTime = null) + public QueuePeekerOptions(TimeSpan? delayTime = null, int? maxRecordsToPeek = null) { var delay = DefaultDelay; @@ -16,6 +16,18 @@ public QueuePeekerOptions(TimeSpan? delayTime = null) } Delay = delay; + + Validate(maxRecordsToPeek); + MaxRecordsToPeek = maxRecordsToPeek; + } + + static void Validate(int? maxRecordsToPeek) + { + if (maxRecordsToPeek.HasValue && maxRecordsToPeek < 1) + { + var message = "Peek batch size is invalid. THe value must be greater than zero."; + throw new Exception(message); + } } static void Validate(TimeSpan delay) @@ -34,6 +46,7 @@ static void Validate(TimeSpan delay) } public TimeSpan Delay { get; } + public int? MaxRecordsToPeek { get; } static TimeSpan DefaultDelay = TimeSpan.FromSeconds(1); static ILog Logger = LogManager.GetLogger(); } diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs index 6420da239..ae4f6f258 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs @@ -195,7 +195,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() } return new TransportReceiveInfrastructure( - () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker), + () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, queuePeekerOptions, schemaVerification, waitTimeCircuitBreaker), () => new QueueCreator(connectionFactory, addressTranslator, delayedQueueCanonicalAddress, createMessageBodyComputedColumn), () => ValidateDatabaseAccess(scopeOptions.TransactionOptions)); } diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportSettingsExtensions.cs index 795ab0615..6c6e6df87 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -174,11 +174,26 @@ public static TransportExtensions TransactionScopeOptions(th /// /// The to extend. /// The delay value + [ObsoleteEx(Message = "WithPeekDelay has been obsoleted.", ReplacementTypeOrMember = "QueuePeekerOptions", RemoveInVersion = "8.0", TreatAsErrorFromVersion = "7.0")] public static TransportExtensions WithPeekDelay(this TransportExtensions transportExtensions, TimeSpan? delay = null) { Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay)); + transportExtensions.QueuePeekerOptions(delay: delay); + return transportExtensions; + } + + /// + /// Allows changing the queue peek delay, and the peek batch size. + /// + /// The to extend. + /// The delay value + /// The peek batch size + public static TransportExtensions QueuePeekerOptions(this TransportExtensions transportExtensions, TimeSpan? delay = null, int? peekBatchSize = null) + { + Guard.AgainstNull(nameof(transportExtensions), transportExtensions); + + transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay, peekBatchSize)); return transportExtensions; }