diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index 3d07e8f07..192e71260 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -14,6 +14,7 @@ public DelayedMessageHandler(DelayedMessageTable table, SqlConnectionFactory con this.connectionFactory = connectionFactory; this.interval = interval; this.batchSize = batchSize; + message = $"Scheduling next attempt to move matured delayed messages to input queue in {interval}"; } public void Start() @@ -43,28 +44,35 @@ async Task MoveMaturedDelayedMessages() { using (var transaction = connection.BeginTransaction()) { - await table.MoveMaturedMessages(batchSize, connection, transaction).ConfigureAwait(false); + await table.MoveMaturedMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false); transaction.Commit(); } } - Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); - await Task.Delay(interval, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { // Graceful shutdown + return; } catch (SqlException e) when (cancellationToken.IsCancellationRequested) { Logger.Debug("Exception thrown while performing cancellation", e); + return; } catch (Exception e) { - Logger.Fatal("Exception thrown while performing cancellation", e); + Logger.Fatal("Exception thrown while moving matured delayed messages", e); + } + finally + { + Logger.DebugFormat(message); + await Task.Delay(interval, cancellationToken).IgnoreCancellation() + .ConfigureAwait(false); } } } + string message; DelayedMessageTable table; SqlConnectionFactory connectionFactory; TimeSpan interval; @@ -75,4 +83,4 @@ async Task MoveMaturedDelayedMessages() static ILog Logger = LogManager.GetLogger(); } -} +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs index 526a363d2..101fcf176 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.SQLServer { using System; using System.Data.SqlClient; + using System.Threading; using System.Threading.Tasks; using Transport; @@ -25,12 +26,12 @@ public async Task Store(OutgoingMessage message, DateTime dueTime, string destin } } - public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction) + public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) { using (var command = new SqlCommand(moveMaturedCommand, connection, transaction)) { command.Parameters.AddWithValue("BatchSize", batchSize); - await command.ExecuteNonQueryAsync().ConfigureAwait(false); + await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } } diff --git a/src/NServiceBus.SqlServer/TaskEx.cs b/src/NServiceBus.SqlServer/TaskEx.cs index 22f7199ae..d71c2a4b9 100644 --- a/src/NServiceBus.SqlServer/TaskEx.cs +++ b/src/NServiceBus.SqlServer/TaskEx.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Transport.SQLServer { + using System; using System.Threading.Tasks; static class TaskEx @@ -11,6 +12,17 @@ public static void Ignore(this Task task) { } + public static async Task IgnoreCancellation(this Task task) + { + try + { + await task.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + //TODO: remove when we update to 4.6 and can use Task.CompletedTask public static readonly Task CompletedTask = Task.FromResult(0);