From eb55676d22b25b3a1cc989eb524228a55d0c7b02 Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 16 Apr 2015 11:03:17 +1000 Subject: [PATCH] Using CancellationToken Changed to use a dictionary to keep track of running tasks --- src/NServiceBus.SqlServer.sln.DotSettings | 1 + src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 48 +++++++++---------- .../AdaptivePollingReceiver.cs | 4 +- src/NServiceBus.SqlServer/IExecutor.cs | 4 +- src/NServiceBus.SqlServer/ITaskTracker.cs | 4 +- .../ReceiveTaskTracker.cs | 31 ++++++------ .../SqlServerPollingDequeueStrategy.cs | 6 ++- 7 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/NServiceBus.SqlServer.sln.DotSettings b/src/NServiceBus.SqlServer.sln.DotSettings index 7244d3c82..2154b918d 100644 --- a/src/NServiceBus.SqlServer.sln.DotSettings +++ b/src/NServiceBus.SqlServer.sln.DotSettings @@ -580,6 +580,7 @@ II.2.12 <HandlesEvent /> <Policy Inspect="True" Prefix="T" Suffix="" Style="AaBb" /> <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> True + True True True True diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 6e7db1482..7d882bb2d 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -19,13 +19,13 @@ protected AdaptiveExecutor(RepeatedFailuresOverTimeCircuitBreaker circuitBreaker this.circuitBreaker = circuitBreaker; } - public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public virtual void Start(int maximumConcurrency, CancellationToken token) { if (taskTracker != null) { throw new InvalidOperationException("The executor has already been started. Use Stop() to stop it."); } - this.tokenSource = tokenSource; + this.token = token; taskTracker = CreateTaskTracker(maximumConcurrency); StartTask(); } @@ -33,40 +33,40 @@ public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenS public virtual void Stop() { taskTracker.ShutdownAll(); + taskTracker = null; - tokenSource.Dispose(); - tokenSource = null; } void StartTask() { - var token = tokenSource.Token; taskTracker.StartAndTrack(() => { - Task receiveTask = null; - - receiveTask = Task.Factory + var taskId = Guid.NewGuid(); + var receiveTask = Task.Factory .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) .ContinueWith(t => { - if (t.IsFaulted) + t.Exception.Handle(ex => { - t.Exception.Handle(ex => - { - HandleException(ex); - circuitBreaker.Failure(ex); - return true; - }); - } -// ReSharper disable once AccessToModifiedClosure - taskTracker.Forget(receiveTask); - if (taskTracker.ShouldStartAnotherTaskImmediately) + HandleException(ex); + circuitBreaker.Failure(ex); + return true; + }); + + }, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default) + .ContinueWith((_, s) => + { + taskTracker.Forget((Guid)s); + + if (!taskTracker.ShouldStartAnotherTaskImmediately) { - StartTask(); + return; } - }, token); - return receiveTask; + StartTask(); + }, taskId, token); + + return Tuple.Create(taskId, receiveTask); }); } @@ -75,7 +75,7 @@ void ReceiveLoop(object obj) var backOff = new BackOff(1000); var rampUpController = CreateRampUpController(StartTask); - while (!tokenSource.IsCancellationRequested && rampUpController.CheckHasEnoughWork()) + while (!token.IsCancellationRequested && rampUpController.CheckHasEnoughWork()) { bool success; rampUpController.RampUpIfTooMuchWork(); @@ -103,7 +103,7 @@ void ReceiveLoop(object obj) } readonly RepeatedFailuresOverTimeCircuitBreaker circuitBreaker; - CancellationTokenSource tokenSource; + CancellationToken token; ITaskTracker taskTracker; } diff --git a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs index 9d25dbc18..a0bd7557e 100644 --- a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs +++ b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs @@ -21,10 +21,10 @@ public AdaptivePollingReceiver( this.transportNotifications = transportNotifications; } - public override void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public override void Start(int maximumConcurrency, CancellationToken token) { Logger.InfoFormat("Receiver for queue '{0}' started with maximum concurrency '{1}'", queue, maximumConcurrency); - base.Start(maximumConcurrency, tokenSource); + base.Start(maximumConcurrency, token); } protected override ReceiveResult Init() diff --git a/src/NServiceBus.SqlServer/IExecutor.cs b/src/NServiceBus.SqlServer/IExecutor.cs index f6b07b0b3..b598537df 100644 --- a/src/NServiceBus.SqlServer/IExecutor.cs +++ b/src/NServiceBus.SqlServer/IExecutor.cs @@ -4,13 +4,13 @@ interface IExecutor { - void Start(int maximumConcurrency, CancellationTokenSource tokenSource); + void Start(int maximumConcurrency, CancellationToken token); void Stop(); } class NullExecutor : IExecutor { - public void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public void Start(int maximumConcurrency, CancellationToken token) { } diff --git a/src/NServiceBus.SqlServer/ITaskTracker.cs b/src/NServiceBus.SqlServer/ITaskTracker.cs index 204a8149b..538ea9297 100644 --- a/src/NServiceBus.SqlServer/ITaskTracker.cs +++ b/src/NServiceBus.SqlServer/ITaskTracker.cs @@ -5,8 +5,8 @@ interface ITaskTracker { - void StartAndTrack(Func taskFactory); - void Forget(Task receiveTask); + void StartAndTrack(Func> taskFactory); + void Forget(Guid id); bool ShouldStartAnotherTaskImmediately { get; } void ShutdownAll(); } diff --git a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs index 98296cb5b..917c82fe3 100644 --- a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs +++ b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs @@ -15,7 +15,7 @@ public ReceiveTaskTracker(int maximumConcurrency, TransportNotifications transpo this.queueName = queueName; } - public void StartAndTrack(Func taskFactory) + public void StartAndTrack(Func> taskFactory) { lock (lockObj) { @@ -33,8 +33,8 @@ public void StartAndTrack(Func taskFactory) transportNotifications.InvokeMaximumConcurrencyLevelReached(queueName, maximumConcurrency); return; } - var task = taskFactory(); - trackedTasks.Add(task); + var tuple = taskFactory(); + trackedTasks.Add(tuple.Item1, tuple.Item2); if (Logger.IsDebugEnabled) { Logger.DebugFormat("Starting a new receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); @@ -43,7 +43,7 @@ public void StartAndTrack(Func taskFactory) } } - public void Forget(Task receiveTask) + public void Forget(Guid id) { lock (lockObj) { @@ -51,7 +51,7 @@ public void Forget(Task receiveTask) { return; } - trackedTasks.Remove(receiveTask); + trackedTasks.Remove(id); if (Logger.IsDebugEnabled) { Logger.DebugFormat("Stopping a receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); @@ -81,25 +81,22 @@ public void ShutdownAll() { Logger.Debug("Stopping all receive tasks."); } - Task[] awaitedTasks; lock (lockObj) { shuttingDown = true; - awaitedTasks = trackedTasks.ToArray(); - } - - try - { - Task.WaitAll(awaitedTasks); - } - catch (AggregateException aex) - { - aex.Handle(ex => ex is TaskCanceledException); + try + { + Task.WaitAll(trackedTasks.Values.ToArray()); + } + catch (AggregateException aex) + { + aex.Handle(ex => ex is TaskCanceledException); + } } } readonly object lockObj = new object(); - readonly List trackedTasks = new List(); + readonly Dictionary trackedTasks = new Dictionary(); bool shuttingDown; readonly int maximumConcurrency; readonly TransportNotifications transportNotifications; diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 006103baa..25260b8d5 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -71,8 +71,8 @@ public void Start(int maximumConcurrencyLevel) { tokenSource = new CancellationTokenSource(); - primaryReceiver.Start(maximumConcurrencyLevel, tokenSource); - secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource); + primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token); + secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token); } /// @@ -89,6 +89,8 @@ public void Stop() primaryReceiver.Stop(); secondaryReceiver.Stop(); + + tokenSource.Dispose(); } public void Dispose()