Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Add the possibility to modify properties when abandoning a message
Browse files Browse the repository at this point in the history
This is useful to store exception details when abandoning a message and still keeping the convenience of registering a message or session handler with AutoComplete = true
  • Loading branch information
0xced committed Mar 21, 2019
1 parent 02c03d6 commit 4e6ca73
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 18 deletions.
25 changes: 22 additions & 3 deletions src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with message pump processing using
Expand All @@ -26,6 +28,22 @@ public sealed class MessageHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentCalls"/> = 1
/// <see cref="AutoComplete"/> = true
/// <see cref="ReceiveTimeOut"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
this.MaxConcurrentCalls = 1;
this.AutoComplete = true;
Expand All @@ -36,7 +54,7 @@ public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
/// <value>The maximum number of concurrent calls to the callback.</value>
Expand Down Expand Up @@ -79,15 +97,16 @@ public TimeSpan MaxAutoRenewDuration

internal TimeSpan ReceiveTimeOut { get; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -47,7 +48,7 @@ bool ShouldRenewLock()
this.registerHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand Down Expand Up @@ -147,12 +148,12 @@ async Task MessageDispatchTask(Message message)
catch (Exception exception)
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);

// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(message, propertiesToModify).ConfigureAwait(false);
}

if (ServiceBusDiagnosticSource.IsEnabled())
Expand Down Expand Up @@ -191,13 +192,13 @@ void CancelAutoRenewLock(object state)
}
}

async Task AbandonMessageIfNeededAsync(Message message)
async Task AbandonMessageIfNeededAsync(Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
{
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
25 changes: 22 additions & 3 deletions src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with session pump processing using
Expand All @@ -27,6 +29,22 @@ public sealed class SessionHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentSessions"/> = 2000
/// <see cref="AutoComplete"/> = true
/// <see cref="MessageWaitTimeout"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
// These are default values
this.AutoComplete = true;
Expand All @@ -38,7 +56,7 @@ public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
/// <value>The duration for which the session renew its state.</value>
Expand Down Expand Up @@ -92,15 +110,16 @@ public int MaxConcurrentSessions

internal int MaxConcurrentAcceptSessionCalls { get; set; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,7 +75,7 @@ bool ShouldRenewSessionLock()
this.sessionHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId);
return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand All @@ -96,13 +97,13 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
}
}

async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (session.ReceiveMode == ReceiveMode.PeekLock)
{
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -239,11 +240,11 @@ async Task MessagePumpTaskAsync(IMessageSession session)
}

MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
callbackExceptionOccurred = true;
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(session, message, propertiesToModify).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class MessageHandlerOptions
{
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentCalls { get; set; }
}
Expand Down Expand Up @@ -384,8 +385,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionHandlerOptions
{
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentSessions { get; set; }
public System.TimeSpan MessageWaitTimeout { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<LangVersion>7.3</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit 4e6ca73

Please sign in to comment.