diff --git a/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs b/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs index 07e5ee08..5de67667 100644 --- a/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs +++ b/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs @@ -4,8 +4,10 @@ namespace Microsoft.Azure.ServiceBus { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Core; using Primitives; /// Provides options associated with message pump processing using @@ -26,6 +28,22 @@ public sealed class MessageHandlerOptions /// A that is invoked during exceptions. /// contains contextual information regarding the exception. public MessageHandlerOptions(Func exceptionReceivedHandler) + : this(async args => { await exceptionReceivedHandler(args); return null; }) + { + } + + /// Initializes a new instance of the class. + /// Default Values: + /// = 1 + /// = true + /// = 1 minute + /// = 5 minutes + /// + /// A that is invoked during exceptions. + /// When the exception happens during user callback, the returned dictionary is passed to . + /// For other actions, the returned dictionary is ignored. + /// contains contextual information regarding the exception. + public MessageHandlerOptions(Func>> exceptionReceivedHandler) { this.MaxConcurrentCalls = 1; this.AutoComplete = true; @@ -36,7 +54,7 @@ public MessageHandlerOptions(Func exceptionRec /// Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump. /// When errors are received calls will automatically be retried, so this is informational. - public Func ExceptionReceivedHandler { get; } + public Func>> ExceptionReceivedHandler { get; } /// Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate. /// The maximum number of concurrent calls to the callback. @@ -79,15 +97,16 @@ public TimeSpan MaxAutoRenewDuration internal TimeSpan ReceiveTimeOut { get; } - internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs) + internal async Task> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs) { try { - await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false); + return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false); } catch (Exception exception) { MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception); + return null; } } } diff --git a/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs b/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs index fa517e95..f9d76ae6 100644 --- a/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs +++ b/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.ServiceBus { using System; + using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -47,7 +48,7 @@ bool ShouldRenewLock() this.registerHandlerOptions.AutoRenewLock; } - Task RaiseExceptionReceived(Exception e, string action) + Task> RaiseExceptionReceived(Exception e, string action) { var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId); return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs); @@ -147,12 +148,12 @@ async Task MessageDispatchTask(Message message) catch (Exception exception) { MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception); - await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false); + var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false); // Nothing much to do if UserCallback throws, Abandon message and Release semaphore. if (!(exception is MessageLockLostException)) { - await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false); + await this.AbandonMessageIfNeededAsync(message, propertiesToModify).ConfigureAwait(false); } if (ServiceBusDiagnosticSource.IsEnabled()) @@ -191,13 +192,13 @@ void CancelAutoRenewLock(object state) } } - async Task AbandonMessageIfNeededAsync(Message message) + async Task AbandonMessageIfNeededAsync(Message message, IDictionary propertiesToModify) { try { if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock) { - await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false); } } catch (Exception exception) diff --git a/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs b/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs index 185a24a9..b654b946 100644 --- a/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs +++ b/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs @@ -4,8 +4,10 @@ namespace Microsoft.Azure.ServiceBus { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Core; using Primitives; /// Provides options associated with session pump processing using @@ -27,6 +29,22 @@ public sealed class SessionHandlerOptions /// A that is invoked during exceptions. /// contains contextual information regarding the exception. public SessionHandlerOptions(Func exceptionReceivedHandler) + : this(async args => { await exceptionReceivedHandler(args); return null; }) + { + } + + /// Initializes a new instance of the class. + /// Default Values: + /// = 2000 + /// = true + /// = 1 minute + /// = 5 minutes + /// + /// A that is invoked during exceptions. + /// When the exception happens during user callback, the returned dictionary is passed to . + /// For other actions, the returned dictionary is ignored. + /// contains contextual information regarding the exception. + public SessionHandlerOptions(Func>> exceptionReceivedHandler) { // These are default values this.AutoComplete = true; @@ -38,7 +56,7 @@ public SessionHandlerOptions(Func exceptionRec /// Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump. /// When errors are received calls will automatically be retried, so this is informational. - public Func ExceptionReceivedHandler { get; } + public Func>> ExceptionReceivedHandler { get; } /// Gets or sets the duration for which the session lock will be renewed automatically. /// The duration for which the session renew its state. @@ -92,15 +110,16 @@ public int MaxConcurrentSessions internal int MaxConcurrentAcceptSessionCalls { get; set; } - internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs) + internal async Task> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs) { try { - await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false); + return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false); } catch (Exception exception) { MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception); + return null; } } } diff --git a/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs b/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs index d611ae6c..49c56bfd 100644 --- a/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs +++ b/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.ServiceBus { using System; + using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -74,7 +75,7 @@ bool ShouldRenewSessionLock() this.sessionHandlerOptions.AutoRenewLock; } - Task RaiseExceptionReceived(Exception e, string action) + Task> RaiseExceptionReceived(Exception e, string action) { var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId); return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs); @@ -96,13 +97,13 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message } } - async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message) + async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, IDictionary propertiesToModify) { try { if (session.ReceiveMode == ReceiveMode.PeekLock) { - await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false); } } catch (Exception exception) @@ -239,11 +240,11 @@ async Task MessagePumpTaskAsync(IMessageSession session) } MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception); - await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false); + var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false); callbackExceptionOccurred = true; if (!(exception is MessageLockLostException || exception is SessionLockLostException)) { - await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false); + await this.AbandonMessageIfNeededAsync(session, message, propertiesToModify).ConfigureAwait(false); } } finally diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 1e1bae48..0059cad1 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -179,8 +179,9 @@ namespace Microsoft.Azure.ServiceBus public sealed class MessageHandlerOptions { public MessageHandlerOptions(System.Func exceptionReceivedHandler) { } + public MessageHandlerOptions(System.Func>> exceptionReceivedHandler) { } public bool AutoComplete { get; set; } - public System.Func ExceptionReceivedHandler { get; } + public System.Func>> ExceptionReceivedHandler { get; } public System.TimeSpan MaxAutoRenewDuration { get; set; } public int MaxConcurrentCalls { get; set; } } @@ -384,8 +385,9 @@ namespace Microsoft.Azure.ServiceBus public sealed class SessionHandlerOptions { public SessionHandlerOptions(System.Func exceptionReceivedHandler) { } + public SessionHandlerOptions(System.Func>> exceptionReceivedHandler) { } public bool AutoComplete { get; set; } - public System.Func ExceptionReceivedHandler { get; } + public System.Func>> ExceptionReceivedHandler { get; } public System.TimeSpan MaxAutoRenewDuration { get; set; } public int MaxConcurrentSessions { get; set; } public System.TimeSpan MessageWaitTimeout { get; set; } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Microsoft.Azure.ServiceBus.UnitTests.csproj b/test/Microsoft.Azure.ServiceBus.UnitTests/Microsoft.Azure.ServiceBus.UnitTests.csproj index d2b6540a..da76db76 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Microsoft.Azure.ServiceBus.UnitTests.csproj +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Microsoft.Azure.ServiceBus.UnitTests.csproj @@ -6,6 +6,7 @@ true true true + 7.3