Skip to content

Commit

Permalink
Peek batch size in 6.2 (#711)
Browse files Browse the repository at this point in the history
* Add MaxRecordsToPeek to QueuePeekerOptions

* Use MaxRecordsToPeek to determine the peek batch size

* Allows to configure queue peeker options

* Fix test

* Rework obsolete strategy

* Validate peek batch size

* Approved API

* typo
  • Loading branch information
mauroservienti authored Nov 9, 2020
1 parent bbad596 commit 6b6fe72
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> DefaultSchema(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string schemaName) { }
public static NServiceBus.Transport.SqlServer.DelayedDeliverySettings NativeDelayedDelivery(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> PurgeExpiredMessagesOnStartup(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<int> purgeBatchSize) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> QueuePeekerOptions(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> delay = null, System.Nullable<int> peekBatchSize = null) { }
public static NServiceBus.Transport.SqlServer.SubscriptionSettings SubscriptionSettings(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.TimeSpan waitTime) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> TransactionScopeOptions(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> timeout = null, System.Nullable<System.Transactions.IsolationLevel> isolationLevel = null) { }
Expand All @@ -40,6 +41,8 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseCustomSqlConnectionFactory(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Func<System.Threading.Tasks.Task<System.Data.SqlClient.SqlConnection>> sqlConnectionFactory) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseSchemaForEndpoint(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string endpointName, string schema) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseSchemaForQueue(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string queueName, string schema) { }
[System.ObsoleteAttribute("WithPeekDelay has been obsoleted. Use `QueuePeekerOptions` instead. Will be treat" +
"ed as an error from version 7.0.0. Will be removed in version 8.0.0.", false)]
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> WithPeekDelay(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> delay = null) { }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive(
new QueuePurger(sqlConnectionFactory),
new NoOpExpiredMessagesPurger(),
new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()),
new QueuePeekerOptions(),
new SchemaInspector(_ => sqlConnectionFactory.OpenNewConnection(), false),
TimeSpan.MaxValue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> DefaultSchema(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string schemaName) { }
public static NServiceBus.Transport.SqlServer.DelayedDeliverySettings NativeDelayedDelivery(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> PurgeExpiredMessagesOnStartup(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<int> purgeBatchSize) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> QueuePeekerOptions(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> delay = null, System.Nullable<int> peekBatchSize = null) { }
public static NServiceBus.Transport.SqlServer.SubscriptionSettings SubscriptionSettings(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.TimeSpan waitTime) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> TransactionScopeOptions(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> timeout = null, System.Nullable<System.Transactions.IsolationLevel> isolationLevel = null) { }
Expand All @@ -39,6 +40,8 @@ namespace NServiceBus
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseCustomSqlConnectionFactory(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Func<System.Threading.Tasks.Task<Microsoft.Data.SqlClient.SqlConnection>> sqlConnectionFactory) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseSchemaForEndpoint(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string endpointName, string schema) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> UseSchemaForQueue(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string queueName, string schema) { }
[System.ObsoleteAttribute("WithPeekDelay has been obsoleted. Use `QueuePeekerOptions` instead. Will be treat" +
"ed as an error from version 7.0.0. Will be removed in version 8.0.0.", false)]
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> WithPeekDelay(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, System.Nullable<System.TimeSpan> delay = null) { }
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/NServiceBus.Transport.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@

class MessagePump : IPushMessages
{
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<string, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, IExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, SchemaInspector schemaInspector, TimeSpan waitTimeCircuitBreaker)
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<string, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, IExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, QueuePeekerOptions queuePeekerOptions, SchemaInspector schemaInspector, TimeSpan waitTimeCircuitBreaker)
{
this.receiveStrategyFactory = receiveStrategyFactory;
this.queuePurger = queuePurger;
this.queueFactory = queueFactory;
this.expiredMessagesPurger = expiredMessagesPurger;
this.queuePeeker = queuePeeker;
this.queuePeekerOptions = queuePeekerOptions;
this.schemaInspector = schemaInspector;
this.waitTimeCircuitBreaker = waitTimeCircuitBreaker;
}
Expand Down Expand Up @@ -56,7 +57,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,

public void Start(PushRuntimeSettings limitations)
{
inputQueue.FormatPeekCommand(Math.Min(100, 10 * limitations.MaxConcurrency));
inputQueue.FormatPeekCommand(queuePeekerOptions.MaxRecordsToPeek ?? Math.Min(100, 10 * limitations.MaxConcurrency));
maxConcurrency = limitations.MaxConcurrency;
concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
cancellationTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -210,6 +211,7 @@ async Task PurgeExpiredMessages()
Func<string, TableBasedQueue> queueFactory;
IExpiredMessagesPurger expiredMessagesPurger;
IPeekMessagesInQueue queuePeeker;
QueuePeekerOptions queuePeekerOptions;
SchemaInspector schemaInspector;
TimeSpan waitTimeCircuitBreaker;
SemaphoreSlim concurrencyLimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class QueuePeekerOptions
{
public QueuePeekerOptions(TimeSpan? delayTime = null)
public QueuePeekerOptions(TimeSpan? delayTime = null, int? maxRecordsToPeek = null)
{
var delay = DefaultDelay;

Expand All @@ -16,6 +16,18 @@ public QueuePeekerOptions(TimeSpan? delayTime = null)
}

Delay = delay;

Validate(maxRecordsToPeek);
MaxRecordsToPeek = maxRecordsToPeek;
}

static void Validate(int? maxRecordsToPeek)
{
if (maxRecordsToPeek.HasValue && maxRecordsToPeek < 1)
{
var message = "Peek batch size is invalid. THe value must be greater than zero.";
throw new Exception(message);
}
}

static void Validate(TimeSpan delay)
Expand All @@ -34,6 +46,7 @@ static void Validate(TimeSpan delay)
}

public TimeSpan Delay { get; }
public int? MaxRecordsToPeek { get; }
static TimeSpan DefaultDelay = TimeSpan.FromSeconds(1);
static ILog Logger = LogManager.GetLogger<QueuePeekerOptions>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
}

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker),
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, queuePeekerOptions, schemaVerification, waitTimeCircuitBreaker),
() => new QueueCreator(connectionFactory, addressTranslator, delayedQueueCanonicalAddress, createMessageBodyComputedColumn),
() => ValidateDatabaseAccess(scopeOptions.TransactionOptions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,26 @@ public static TransportExtensions<SqlServerTransport> TransactionScopeOptions(th
/// </summary>
/// <param name="transportExtensions">The <see cref="TransportExtensions{T}" /> to extend.</param>
/// <param name="delay">The delay value</param>
[ObsoleteEx(Message = "WithPeekDelay has been obsoleted.", ReplacementTypeOrMember = "QueuePeekerOptions", RemoveInVersion = "8.0", TreatAsErrorFromVersion = "7.0")]
public static TransportExtensions<SqlServerTransport> WithPeekDelay(this TransportExtensions<SqlServerTransport> transportExtensions, TimeSpan? delay = null)
{
Guard.AgainstNull(nameof(transportExtensions), transportExtensions);

transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay));
transportExtensions.QueuePeekerOptions(delay: delay);
return transportExtensions;
}

/// <summary>
/// Allows changing the queue peek delay, and the peek batch size.
/// </summary>
/// <param name="transportExtensions">The <see cref="TransportExtensions{T}" /> to extend.</param>
/// <param name="delay">The delay value</param>
/// <param name="peekBatchSize">The peek batch size</param>
public static TransportExtensions<SqlServerTransport> QueuePeekerOptions(this TransportExtensions<SqlServerTransport> transportExtensions, TimeSpan? delay = null, int? peekBatchSize = null)
{
Guard.AgainstNull(nameof(transportExtensions), transportExtensions);

transportExtensions.GetSettings().Set(new QueuePeekerOptions(delay, peekBatchSize));
return transportExtensions;
}

Expand Down

0 comments on commit 6b6fe72

Please sign in to comment.