From 9dfce99d633706e7aef1024089d5a43871dbfd29 Mon Sep 17 00:00:00 2001 From: John Simons Date: Wed, 15 Apr 2015 15:41:05 +1000 Subject: [PATCH 1/5] Wait for all tasks We need to return the continuation task to ensure we wait on all tasks. Fixes #79 --- src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 966ef4c10..24340bcf2 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -30,7 +30,7 @@ public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenS StartTask(); } - public virtual void Stop() + public virtual void Stop() { taskTracker.ShutdownAll(); taskTracker = null; @@ -44,24 +44,25 @@ void StartTask() taskTracker.StartAndTrack(() => { var receiveTask = Task.Factory - .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - receiveTask.ContinueWith(t => - { - if (t.IsFaulted) + .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) + .ContinueWith(t => { - t.Exception.Handle(ex => + if (t.IsFaulted) { - HandleException(ex); - circuitBreaker.Failure(ex); - return true; - }); - } - taskTracker.Forget(t); - if (taskTracker.HasNoTasks) - { - StartTask(); - } - }); + t.Exception.Handle(ex => + { + HandleException(ex); + circuitBreaker.Failure(ex); + return true; + }); + } + taskTracker.Forget(t); + if (taskTracker.HasNoTasks) + { + StartTask(); + } + }); + return receiveTask; }); } From f25e30bdf76a4a7dcc38b1f659f322fae28e4827 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Wed, 15 Apr 2015 12:52:02 +0200 Subject: [PATCH 2/5] Attempt to fix the race condition when shutting down --- src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 13 ++++++++----- src/NServiceBus.SqlServer/ITaskTracker.cs | 2 +- src/NServiceBus.SqlServer/ReceiveTaskTracker.cs | 10 +++++++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 24340bcf2..6e7db1482 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -34,16 +34,18 @@ public virtual void Stop() { taskTracker.ShutdownAll(); taskTracker = null; + tokenSource.Dispose(); tokenSource = null; } void StartTask() { var token = tokenSource.Token; - taskTracker.StartAndTrack(() => { - var receiveTask = Task.Factory + Task receiveTask = null; + + receiveTask = Task.Factory .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) .ContinueWith(t => { @@ -56,12 +58,13 @@ void StartTask() return true; }); } - taskTracker.Forget(t); - if (taskTracker.HasNoTasks) +// ReSharper disable once AccessToModifiedClosure + taskTracker.Forget(receiveTask); + if (taskTracker.ShouldStartAnotherTaskImmediately) { StartTask(); } - }); + }, token); return receiveTask; }); diff --git a/src/NServiceBus.SqlServer/ITaskTracker.cs b/src/NServiceBus.SqlServer/ITaskTracker.cs index 5e420a291..204a8149b 100644 --- a/src/NServiceBus.SqlServer/ITaskTracker.cs +++ b/src/NServiceBus.SqlServer/ITaskTracker.cs @@ -7,7 +7,7 @@ interface ITaskTracker { void StartAndTrack(Func taskFactory); void Forget(Task receiveTask); - bool HasNoTasks { get; } + bool ShouldStartAnotherTaskImmediately { get; } void ShutdownAll(); } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs index 6b8eb5631..98296cb5b 100644 --- a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs +++ b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs @@ -47,6 +47,10 @@ public void Forget(Task receiveTask) { lock (lockObj) { + if (shuttingDown) + { + return; + } trackedTasks.Remove(receiveTask); if (Logger.IsDebugEnabled) { @@ -56,12 +60,16 @@ public void Forget(Task receiveTask) } } - public bool HasNoTasks + public bool ShouldStartAnotherTaskImmediately { get { lock (lockObj) { + if (shuttingDown) + { + return false; + } return !trackedTasks.Any(); } } From eb55676d22b25b3a1cc989eb524228a55d0c7b02 Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 16 Apr 2015 11:03:17 +1000 Subject: [PATCH 3/5] Using CancellationToken Changed to use a dictionary to keep track of running tasks --- src/NServiceBus.SqlServer.sln.DotSettings | 1 + src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 48 +++++++++---------- .../AdaptivePollingReceiver.cs | 4 +- src/NServiceBus.SqlServer/IExecutor.cs | 4 +- src/NServiceBus.SqlServer/ITaskTracker.cs | 4 +- .../ReceiveTaskTracker.cs | 31 ++++++------ .../SqlServerPollingDequeueStrategy.cs | 6 ++- 7 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/NServiceBus.SqlServer.sln.DotSettings b/src/NServiceBus.SqlServer.sln.DotSettings index 7244d3c82..2154b918d 100644 --- a/src/NServiceBus.SqlServer.sln.DotSettings +++ b/src/NServiceBus.SqlServer.sln.DotSettings @@ -580,6 +580,7 @@ II.2.12 <HandlesEvent /> <Policy Inspect="True" Prefix="T" Suffix="" Style="AaBb" /> <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> True + True True True True diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 6e7db1482..7d882bb2d 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -19,13 +19,13 @@ protected AdaptiveExecutor(RepeatedFailuresOverTimeCircuitBreaker circuitBreaker this.circuitBreaker = circuitBreaker; } - public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public virtual void Start(int maximumConcurrency, CancellationToken token) { if (taskTracker != null) { throw new InvalidOperationException("The executor has already been started. Use Stop() to stop it."); } - this.tokenSource = tokenSource; + this.token = token; taskTracker = CreateTaskTracker(maximumConcurrency); StartTask(); } @@ -33,40 +33,40 @@ public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenS public virtual void Stop() { taskTracker.ShutdownAll(); + taskTracker = null; - tokenSource.Dispose(); - tokenSource = null; } void StartTask() { - var token = tokenSource.Token; taskTracker.StartAndTrack(() => { - Task receiveTask = null; - - receiveTask = Task.Factory + var taskId = Guid.NewGuid(); + var receiveTask = Task.Factory .StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default) .ContinueWith(t => { - if (t.IsFaulted) + t.Exception.Handle(ex => { - t.Exception.Handle(ex => - { - HandleException(ex); - circuitBreaker.Failure(ex); - return true; - }); - } -// ReSharper disable once AccessToModifiedClosure - taskTracker.Forget(receiveTask); - if (taskTracker.ShouldStartAnotherTaskImmediately) + HandleException(ex); + circuitBreaker.Failure(ex); + return true; + }); + + }, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default) + .ContinueWith((_, s) => + { + taskTracker.Forget((Guid)s); + + if (!taskTracker.ShouldStartAnotherTaskImmediately) { - StartTask(); + return; } - }, token); - return receiveTask; + StartTask(); + }, taskId, token); + + return Tuple.Create(taskId, receiveTask); }); } @@ -75,7 +75,7 @@ void ReceiveLoop(object obj) var backOff = new BackOff(1000); var rampUpController = CreateRampUpController(StartTask); - while (!tokenSource.IsCancellationRequested && rampUpController.CheckHasEnoughWork()) + while (!token.IsCancellationRequested && rampUpController.CheckHasEnoughWork()) { bool success; rampUpController.RampUpIfTooMuchWork(); @@ -103,7 +103,7 @@ void ReceiveLoop(object obj) } readonly RepeatedFailuresOverTimeCircuitBreaker circuitBreaker; - CancellationTokenSource tokenSource; + CancellationToken token; ITaskTracker taskTracker; } diff --git a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs index 9d25dbc18..a0bd7557e 100644 --- a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs +++ b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs @@ -21,10 +21,10 @@ public AdaptivePollingReceiver( this.transportNotifications = transportNotifications; } - public override void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public override void Start(int maximumConcurrency, CancellationToken token) { Logger.InfoFormat("Receiver for queue '{0}' started with maximum concurrency '{1}'", queue, maximumConcurrency); - base.Start(maximumConcurrency, tokenSource); + base.Start(maximumConcurrency, token); } protected override ReceiveResult Init() diff --git a/src/NServiceBus.SqlServer/IExecutor.cs b/src/NServiceBus.SqlServer/IExecutor.cs index f6b07b0b3..b598537df 100644 --- a/src/NServiceBus.SqlServer/IExecutor.cs +++ b/src/NServiceBus.SqlServer/IExecutor.cs @@ -4,13 +4,13 @@ interface IExecutor { - void Start(int maximumConcurrency, CancellationTokenSource tokenSource); + void Start(int maximumConcurrency, CancellationToken token); void Stop(); } class NullExecutor : IExecutor { - public void Start(int maximumConcurrency, CancellationTokenSource tokenSource) + public void Start(int maximumConcurrency, CancellationToken token) { } diff --git a/src/NServiceBus.SqlServer/ITaskTracker.cs b/src/NServiceBus.SqlServer/ITaskTracker.cs index 204a8149b..538ea9297 100644 --- a/src/NServiceBus.SqlServer/ITaskTracker.cs +++ b/src/NServiceBus.SqlServer/ITaskTracker.cs @@ -5,8 +5,8 @@ interface ITaskTracker { - void StartAndTrack(Func taskFactory); - void Forget(Task receiveTask); + void StartAndTrack(Func> taskFactory); + void Forget(Guid id); bool ShouldStartAnotherTaskImmediately { get; } void ShutdownAll(); } diff --git a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs index 98296cb5b..917c82fe3 100644 --- a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs +++ b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs @@ -15,7 +15,7 @@ public ReceiveTaskTracker(int maximumConcurrency, TransportNotifications transpo this.queueName = queueName; } - public void StartAndTrack(Func taskFactory) + public void StartAndTrack(Func> taskFactory) { lock (lockObj) { @@ -33,8 +33,8 @@ public void StartAndTrack(Func taskFactory) transportNotifications.InvokeMaximumConcurrencyLevelReached(queueName, maximumConcurrency); return; } - var task = taskFactory(); - trackedTasks.Add(task); + var tuple = taskFactory(); + trackedTasks.Add(tuple.Item1, tuple.Item2); if (Logger.IsDebugEnabled) { Logger.DebugFormat("Starting a new receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); @@ -43,7 +43,7 @@ public void StartAndTrack(Func taskFactory) } } - public void Forget(Task receiveTask) + public void Forget(Guid id) { lock (lockObj) { @@ -51,7 +51,7 @@ public void Forget(Task receiveTask) { return; } - trackedTasks.Remove(receiveTask); + trackedTasks.Remove(id); if (Logger.IsDebugEnabled) { Logger.DebugFormat("Stopping a receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); @@ -81,25 +81,22 @@ public void ShutdownAll() { Logger.Debug("Stopping all receive tasks."); } - Task[] awaitedTasks; lock (lockObj) { shuttingDown = true; - awaitedTasks = trackedTasks.ToArray(); - } - - try - { - Task.WaitAll(awaitedTasks); - } - catch (AggregateException aex) - { - aex.Handle(ex => ex is TaskCanceledException); + try + { + Task.WaitAll(trackedTasks.Values.ToArray()); + } + catch (AggregateException aex) + { + aex.Handle(ex => ex is TaskCanceledException); + } } } readonly object lockObj = new object(); - readonly List trackedTasks = new List(); + readonly Dictionary trackedTasks = new Dictionary(); bool shuttingDown; readonly int maximumConcurrency; readonly TransportNotifications transportNotifications; diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 006103baa..25260b8d5 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -71,8 +71,8 @@ public void Start(int maximumConcurrencyLevel) { tokenSource = new CancellationTokenSource(); - primaryReceiver.Start(maximumConcurrencyLevel, tokenSource); - secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource); + primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token); + secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token); } /// @@ -89,6 +89,8 @@ public void Stop() primaryReceiver.Stop(); secondaryReceiver.Stop(); + + tokenSource.Dispose(); } public void Dispose() From 7ce46ad530c3c7c54571167742b8905046a2d1c4 Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 16 Apr 2015 15:37:09 +1000 Subject: [PATCH 4/5] Disabling receiving is running in SendOnly mode Fixes #78 --- .../Config/SqlServerTransportFeature.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs b/src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs index 3c2a30362..ca036f84d 100644 --- a/src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs +++ b/src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs @@ -56,15 +56,19 @@ protected override void Configure(FeatureConfigurationContext context, string co config.Configure(context, connectionStringWithSchema); } - context.Container.ConfigureComponent(DependencyLifecycle.SingleInstance); context.Container.ConfigureComponent(DependencyLifecycle.InstancePerCall); - context.Container.ConfigureComponent(DependencyLifecycle.InstancePerCall); - var errorQueue = ErrorQueueSettings.GetConfiguredErrorQueue(context.Settings); - context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(b.Build(), b.Build(), errorQueue), DependencyLifecycle.InstancePerCall); + if (!context.Settings.GetOrDefault("Endpoint.SendOnly")) + { + context.Container.ConfigureComponent(DependencyLifecycle.SingleInstance); + context.Container.ConfigureComponent(DependencyLifecycle.InstancePerCall); + + var errorQueue = ErrorQueueSettings.GetConfiguredErrorQueue(context.Settings); + context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(b.Build(), b.Build(), errorQueue), DependencyLifecycle.InstancePerCall); - context.Container.ConfigureComponent(DependencyLifecycle.InstancePerCall); - context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build(), b.Build()), DependencyLifecycle.InstancePerUnitOfWork); + context.Container.ConfigureComponent(DependencyLifecycle.InstancePerCall); + context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build(), b.Build()), DependencyLifecycle.InstancePerUnitOfWork); + } } } From 2c405ba135b18c6995371ff47b613fdddfe7156f Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 16 Apr 2015 16:53:59 +1000 Subject: [PATCH 5/5] Adding queue name to the trace logging --- src/NServiceBus.SqlServer/AdaptiveExecutor.cs | 2 +- .../AdaptivePollingReceiver.cs | 2 +- .../ReceiveTaskTracker.cs | 19 +++++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs index 7d882bb2d..45edc4b9e 100644 --- a/src/NServiceBus.SqlServer/AdaptiveExecutor.cs +++ b/src/NServiceBus.SqlServer/AdaptiveExecutor.cs @@ -65,7 +65,7 @@ void StartTask() StartTask(); }, taskId, token); - + return Tuple.Create(taskId, receiveTask); }); } diff --git a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs index a0bd7557e..6cb1a99ae 100644 --- a/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs +++ b/src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs @@ -12,7 +12,7 @@ public AdaptivePollingReceiver( TableBasedQueue queue, Action endProcessMessage, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, - TransportNotifications transportNotifications) + TransportNotifications transportNotifications) : base(circuitBreaker) { this.receiveStrategy = receiveStrategy; diff --git a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs index 917c82fe3..bf875115f 100644 --- a/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs +++ b/src/NServiceBus.SqlServer/ReceiveTaskTracker.cs @@ -15,20 +15,20 @@ public ReceiveTaskTracker(int maximumConcurrency, TransportNotifications transpo this.queueName = queueName; } - public void StartAndTrack(Func> taskFactory) + public void StartAndTrack(Func> taskFactory) { lock (lockObj) { if (shuttingDown) { - Logger.Debug("Ignoring start task request because shutdown is in progress."); + Logger.DebugFormat("Ignoring start task request for '{0}' because shutdown is in progress.", queueName); return; } if (trackedTasks.Count >= maximumConcurrency) { if (Logger.IsDebugEnabled) { - Logger.DebugFormat("Ignoring start task request because of maximum concurrency limit {0}", maximumConcurrency); + Logger.DebugFormat("Ignoring start task request for '{0}' because of maximum concurrency limit of {1} has been reached.", queueName, maximumConcurrency); } transportNotifications.InvokeMaximumConcurrencyLevelReached(queueName, maximumConcurrency); return; @@ -37,7 +37,7 @@ public void StartAndTrack(Func> taskFactory) trackedTasks.Add(tuple.Item1, tuple.Item2); if (Logger.IsDebugEnabled) { - Logger.DebugFormat("Starting a new receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); + Logger.DebugFormat("Starting a new receive task for '{0}'. Total count current/max {1}/{2}", queueName, trackedTasks.Count, maximumConcurrency); } transportNotifications.InvokeReceiveTaskStarted(queueName, trackedTasks.Count, maximumConcurrency); } @@ -51,12 +51,15 @@ public void Forget(Guid id) { return; } - trackedTasks.Remove(id); - if (Logger.IsDebugEnabled) + + if (trackedTasks.Remove(id)) { - Logger.DebugFormat("Stopping a receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency); + if (Logger.IsDebugEnabled) + { + Logger.DebugFormat("Stopping a receive task for '{0}'. Total count current/max {1}/{2}", queueName, trackedTasks.Count, maximumConcurrency); + } + transportNotifications.InvokeReceiveTaskStopped(queueName, trackedTasks.Count, maximumConcurrency); } - transportNotifications.InvokeReceiveTaskStopped(queueName, trackedTasks.Count, maximumConcurrency); } }