Skip to content

Commit

Permalink
Merge branch 'hotfix-2.1.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Jul 9, 2015
2 parents da8ecde + 2c405ba commit 7f417b8
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ II.2.12 <HandlesEvent />
<s:String x:Key="/Default/CodeStyle/Naming/VBNaming/PredefinedNamingRules/=TypeParameters/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="T" Suffix="" Style="AaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/VBNaming/PredefinedNamingRules/=TypesAndNamespaces/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpFileLayoutPatternsUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
Expand Down
40 changes: 22 additions & 18 deletions src/NServiceBus.SqlServer/AdaptiveExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,54 @@ protected AdaptiveExecutor(RepeatedFailuresOverTimeCircuitBreaker circuitBreaker
this.circuitBreaker = circuitBreaker;
}

public virtual void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public virtual void Start(int maximumConcurrency, CancellationToken token)
{
if (taskTracker != null)
{
throw new InvalidOperationException("The executor has already been started. Use Stop() to stop it.");
}
this.tokenSource = tokenSource;
this.token = token;
taskTracker = CreateTaskTracker(maximumConcurrency);
StartTask();
}

public virtual void Stop()
public virtual void Stop()
{
taskTracker.ShutdownAll();

taskTracker = null;
tokenSource = null;
}

void StartTask()
{
var token = tokenSource.Token;

taskTracker.StartAndTrack(() =>
{
var taskId = Guid.NewGuid();
var receiveTask = Task.Factory
.StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
receiveTask.ContinueWith(t =>
{
if (t.IsFaulted)
.StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(t =>
{
t.Exception.Handle(ex =>
{
HandleException(ex);
circuitBreaker.Failure(ex);
return true;
});
}
taskTracker.Forget(t);
if (taskTracker.HasNoTasks)

}, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default)
.ContinueWith((_, s) =>
{
taskTracker.Forget((Guid)s);

if (!taskTracker.ShouldStartAnotherTaskImmediately)
{
return;
}

StartTask();
}
});
return receiveTask;
}, taskId, token);

return Tuple.Create(taskId, receiveTask);
});
}

Expand All @@ -71,7 +75,7 @@ void ReceiveLoop(object obj)
var backOff = new BackOff(1000);
var rampUpController = CreateRampUpController(StartTask);

while (!tokenSource.IsCancellationRequested && rampUpController.CheckHasEnoughWork())
while (!token.IsCancellationRequested && rampUpController.CheckHasEnoughWork())
{
bool success;
rampUpController.RampUpIfTooMuchWork();
Expand Down Expand Up @@ -99,7 +103,7 @@ void ReceiveLoop(object obj)
}

readonly RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
CancellationTokenSource tokenSource;
CancellationToken token;
ITaskTracker taskTracker;

}
Expand Down
6 changes: 3 additions & 3 deletions src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public AdaptivePollingReceiver(
TableBasedQueue queue,
Action<TransportMessage, Exception> endProcessMessage,
RepeatedFailuresOverTimeCircuitBreaker circuitBreaker,
TransportNotifications transportNotifications)
TransportNotifications transportNotifications)
: base(circuitBreaker)
{
this.receiveStrategy = receiveStrategy;
Expand All @@ -21,10 +21,10 @@ public AdaptivePollingReceiver(
this.transportNotifications = transportNotifications;
}

public override void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public override void Start(int maximumConcurrency, CancellationToken token)
{
Logger.InfoFormat("Receiver for queue '{0}' started with maximum concurrency '{1}'", queue, maximumConcurrency);
base.Start(maximumConcurrency, tokenSource);
base.Start(maximumConcurrency, token);
}

protected override ReceiveResult Init()
Expand Down
16 changes: 10 additions & 6 deletions src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,19 @@ protected override void Configure(FeatureConfigurationContext context, string co
config.Configure(context, connectionStringWithSchema);
}

context.Container.ConfigureComponent<TransportNotifications>(DependencyLifecycle.SingleInstance);
context.Container.ConfigureComponent<SqlServerMessageSender>(DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent<SqlServerQueueCreator>(DependencyLifecycle.InstancePerCall);

var errorQueue = ErrorQueueSettings.GetConfiguredErrorQueue(context.Settings);
context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>(), errorQueue), DependencyLifecycle.InstancePerCall);
if (!context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
{
context.Container.ConfigureComponent<TransportNotifications>(DependencyLifecycle.SingleInstance);
context.Container.ConfigureComponent<SqlServerQueueCreator>(DependencyLifecycle.InstancePerCall);

var errorQueue = ErrorQueueSettings.GetConfiguredErrorQueue(context.Settings);
context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>(), errorQueue), DependencyLifecycle.InstancePerCall);

context.Container.ConfigureComponent<SqlServerPollingDequeueStrategy>(DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>()), DependencyLifecycle.InstancePerUnitOfWork);
context.Container.ConfigureComponent<SqlServerPollingDequeueStrategy>(DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>()), DependencyLifecycle.InstancePerUnitOfWork);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.SqlServer/IExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

interface IExecutor
{
void Start(int maximumConcurrency, CancellationTokenSource tokenSource);
void Start(int maximumConcurrency, CancellationToken token);
void Stop();
}

class NullExecutor : IExecutor
{
public void Start(int maximumConcurrency, CancellationTokenSource tokenSource)
public void Start(int maximumConcurrency, CancellationToken token)
{
}

Expand Down
6 changes: 3 additions & 3 deletions src/NServiceBus.SqlServer/ITaskTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

interface ITaskTracker
{
void StartAndTrack(Func<Task> taskFactory);
void Forget(Task receiveTask);
bool HasNoTasks { get; }
void StartAndTrack(Func<Tuple<Guid, Task>> taskFactory);
void Forget(Guid id);
bool ShouldStartAnotherTaskImmediately { get; }
void ShutdownAll();
}
}
56 changes: 32 additions & 24 deletions src/NServiceBus.SqlServer/ReceiveTaskTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,64 @@ public ReceiveTaskTracker(int maximumConcurrency, TransportNotifications transpo
this.queueName = queueName;
}

public void StartAndTrack(Func<Task> taskFactory)
public void StartAndTrack(Func<Tuple<Guid, Task>> taskFactory)
{
lock (lockObj)
{
if (shuttingDown)
{
Logger.Debug("Ignoring start task request because shutdown is in progress.");
Logger.DebugFormat("Ignoring start task request for '{0}' because shutdown is in progress.", queueName);
return;
}
if (trackedTasks.Count >= maximumConcurrency)
{
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat("Ignoring start task request because of maximum concurrency limit {0}", maximumConcurrency);
Logger.DebugFormat("Ignoring start task request for '{0}' because of maximum concurrency limit of {1} has been reached.", queueName, maximumConcurrency);
}
transportNotifications.InvokeMaximumConcurrencyLevelReached(queueName, maximumConcurrency);
return;
}
var task = taskFactory();
trackedTasks.Add(task);
var tuple = taskFactory();
trackedTasks.Add(tuple.Item1, tuple.Item2);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat("Starting a new receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency);
Logger.DebugFormat("Starting a new receive task for '{0}'. Total count current/max {1}/{2}", queueName, trackedTasks.Count, maximumConcurrency);
}
transportNotifications.InvokeReceiveTaskStarted(queueName, trackedTasks.Count, maximumConcurrency);
}
}

public void Forget(Task receiveTask)
public void Forget(Guid id)
{
lock (lockObj)
{
trackedTasks.Remove(receiveTask);
if (Logger.IsDebugEnabled)
if (shuttingDown)
{
return;
}

if (trackedTasks.Remove(id))
{
Logger.DebugFormat("Stopping a receive task. Total count current/max {0}/{1}", trackedTasks.Count, maximumConcurrency);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat("Stopping a receive task for '{0}'. Total count current/max {1}/{2}", queueName, trackedTasks.Count, maximumConcurrency);
}
transportNotifications.InvokeReceiveTaskStopped(queueName, trackedTasks.Count, maximumConcurrency);
}
transportNotifications.InvokeReceiveTaskStopped(queueName, trackedTasks.Count, maximumConcurrency);
}
}

public bool HasNoTasks
public bool ShouldStartAnotherTaskImmediately
{
get
{
lock (lockObj)
{
if (shuttingDown)
{
return false;
}
return !trackedTasks.Any();
}
}
Expand All @@ -73,25 +84,22 @@ public void ShutdownAll()
{
Logger.Debug("Stopping all receive tasks.");
}
Task[] awaitedTasks;
lock (lockObj)
{
shuttingDown = true;
awaitedTasks = trackedTasks.ToArray();
}

try
{
Task.WaitAll(awaitedTasks);
}
catch (AggregateException aex)
{
aex.Handle(ex => ex is TaskCanceledException);
try
{
Task.WaitAll(trackedTasks.Values.ToArray());
}
catch (AggregateException aex)
{
aex.Handle(ex => ex is TaskCanceledException);
}
}
}

readonly object lockObj = new object();
readonly List<Task> trackedTasks = new List<Task>();
readonly Dictionary<Guid, Task> trackedTasks = new Dictionary<Guid, Task>();
bool shuttingDown;
readonly int maximumConcurrency;
readonly TransportNotifications transportNotifications;
Expand Down
6 changes: 4 additions & 2 deletions src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void Start(int maximumConcurrencyLevel)
{
tokenSource = new CancellationTokenSource();

primaryReceiver.Start(maximumConcurrencyLevel, tokenSource);
secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource);
primaryReceiver.Start(maximumConcurrencyLevel, tokenSource.Token);
secondaryReceiver.Start(SecondaryReceiveSettings.MaximumConcurrencyLevel, tokenSource.Token);
}

/// <summary>
Expand All @@ -89,6 +89,8 @@ public void Stop()

primaryReceiver.Stop();
secondaryReceiver.Stop();

tokenSource.Dispose();
}

public void Dispose()
Expand Down

0 comments on commit 7f417b8

Please sign in to comment.