Skip to content

Commit

Permalink
Using CancellationToken
Browse files Browse the repository at this point in the history
Changed to use a dictionary to keep track of running tasks
  • Loading branch information
John Simons committed Apr 16, 2015
1 parent f25e30b commit eb55676
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ II.2.12 <HandlesEvent />
<s:String x:Key="/Default/CodeStyle/Naming/VBNaming/PredefinedNamingRules/=TypeParameters/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="T" Suffix="" Style="AaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/VBNaming/PredefinedNamingRules/=TypesAndNamespaces/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpFileLayoutPatternsUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
Expand Down
48 changes: 24 additions & 24 deletions src/NServiceBus.SqlServer/AdaptiveExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,54 @@ protected AdaptiveExecutor(RepeatedFailuresOverTimeCircuitBreaker circuitBreaker
this.circuitBreaker = circuitBreaker;
}

public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public virtual void Start(int maximumConcurrency, CancellationToken token)
{
if (taskTracker != null)
{
throw new InvalidOperationException("The executor has already been started. Use Stop() to stop it.");
}
this.tokenSource = tokenSource;
this.token = token;
taskTracker = CreateTaskTracker(maximumConcurrency);
StartTask();
}

public virtual void Stop()
{
taskTracker.ShutdownAll();

taskTracker = null;
tokenSource.Dispose();
tokenSource = null;
}

void StartTask()
{
var token = tokenSource.Token;
taskTracker.StartAndTrack(() =>
{
Task receiveTask = null;

receiveTask = Task.Factory
var taskId = Guid.NewGuid();
var receiveTask = Task.Factory
.StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(t =>
{
if (t.IsFaulted)
t.Exception.Handle(ex =>
{
t.Exception.Handle(ex =>
{
HandleException(ex);
circuitBreaker.Failure(ex);
return true;
});
}
// ReSharper disable once AccessToModifiedClosure
taskTracker.Forget(receiveTask);
if (taskTracker.ShouldStartAnotherTaskImmediately)
HandleException(ex);
circuitBreaker.Failure(ex);
return true;
});

}, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default)
.ContinueWith((_, s) =>
{
taskTracker.Forget((Guid)s);

if (!taskTracker.ShouldStartAnotherTaskImmediately)
{
StartTask();
return;
}
}, token);

return receiveTask;
StartTask();
}, taskId, token);

return Tuple.Create(taskId, receiveTask);
});
}

Expand All @@ -75,7 +75,7 @@ void ReceiveLoop(object obj)
var backOff = new BackOff(1000);
var rampUpController = CreateRampUpController(StartTask);

while (!tokenSource.IsCancellationRequested && rampUpController.CheckHasEnoughWork())
while (!token.IsCancellationRequested && rampUpController.CheckHasEnoughWork())
{
bool success;
rampUpController.RampUpIfTooMuchWork();
Expand Down Expand Up @@ -103,7 +103,7 @@ void ReceiveLoop(object obj)
}

readonly RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
CancellationTokenSource tokenSource;
CancellationToken token;
ITaskTracker taskTracker;

}
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public AdaptivePollingReceiver(
this.transportNotifications = transportNotifications;
}

public override void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public override void Start(int maximumConcurrency, CancellationToken token)
{
Logger.InfoFormat("Receiver for queue '{0}' started with maximum concurrency '{1}'", queue, maximumConcurrency);
base.Start(maximumConcurrency, tokenSource);
base.Start(maximumConcurrency, token);
}

protected override ReceiveResult Init()
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.SqlServer/IExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

interface IExecutor
{
void Start(int maximumConcurrency, CancellationTokenSource tokenSource);
void Start(int maximumConcurrency, CancellationToken token);
void Stop();
}

class NullExecutor : IExecutor
{
public void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public void Start(int maximumConcurrency, CancellationToken token)
{
}

Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.SqlServer/ITaskTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

interface ITaskTracker
{
void StartAndTrack(Func<Task> taskFactory);
void Forget(Task receiveTask);
void StartAndTrack(Func<Tuple<Guid, Task>> taskFactory);
void Forget(Guid id);
bool ShouldStartAnotherTaskImmediately { get; }
void ShutdownAll();
}
Expand Down
31 changes: 14 additions & 17 deletions src/NServiceBus.SqlServer/ReceiveTaskTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public ReceiveTaskTracker(int maximumConcurrency, TransportNotifications transpo
this.queueName = queueName;
}

public void StartAndTrack(Func<Task> taskFactory)
public void StartAndTrack(Func<Tuple<Guid,Task>> taskFactory)
{
lock (lockObj)
{
Expand All @@ -33,8 +33,8 @@ public void StartAndTrack(Func<Task> taskFactory)
transportNotifications.InvokeMaximumConcurrencyLevelReached(queueName, maximumConcurrency);
return;
}
var task = taskFactory();
trackedTasks.Add(task);
var tuple = taskFactory();
trackedTasks.Add(tuple.Item1, tuple.Item2);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat("Starting a new receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency);
Expand All @@ -43,15 +43,15 @@ public void StartAndTrack(Func<Task> taskFactory)
}
}

public void Forget(Task receiveTask)
public void Forget(Guid id)
{
lock (lockObj)
{
if (shuttingDown)
{
return;
}
trackedTasks.Remove(receiveTask);
trackedTasks.Remove(id);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat("Stopping a receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency);
Expand Down Expand Up @@ -81,25 +81,22 @@ public void ShutdownAll()
{
Logger.Debug("Stopping all receive tasks.");
}
Task[] awaitedTasks;
lock (lockObj)
{
shuttingDown = true;
awaitedTasks = trackedTasks.ToArray();
}

try
{
Task.WaitAll(awaitedTasks);
}
catch (AggregateException aex)
{
aex.Handle(ex => ex is TaskCanceledException);
try
{
Task.WaitAll(trackedTasks.Values.ToArray());
}
catch (AggregateException aex)
{
aex.Handle(ex => ex is TaskCanceledException);
}
}
}

readonly object lockObj = new object();
readonly List<Task> trackedTasks = new List<Task>();
readonly Dictionary<Guid, Task> trackedTasks = new Dictionary<Guid, Task>();
bool shuttingDown;
readonly int maximumConcurrency;
readonly TransportNotifications transportNotifications;
Expand Down
6 changes: 4 additions & 2 deletions src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void Start(int maximumConcurrencyLevel)
{
tokenSource = new CancellationTokenSource();

primaryReceiver.Start(maximumConcurrencyLevel, tokenSource);
secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource);
primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token);
secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token);
}

/// <summary>
Expand All @@ -89,6 +89,8 @@ public void Stop()

primaryReceiver.Stop();
secondaryReceiver.Stop();

tokenSource.Dispose();
}

public void Dispose()
Expand Down

0 comments on commit eb55676

Please sign in to comment.