Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Add the possibility to modify properties when abandoning a message #671

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with message pump processing using
Expand All @@ -26,6 +28,22 @@ public sealed class MessageHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentCalls"/> = 1
/// <see cref="AutoComplete"/> = true
/// <see cref="ReceiveTimeOut"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
this.MaxConcurrentCalls = 1;
this.AutoComplete = true;
Expand All @@ -36,7 +54,7 @@ public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
/// <value>The maximum number of concurrent calls to the callback.</value>
Expand Down Expand Up @@ -79,15 +97,16 @@ public TimeSpan MaxAutoRenewDuration

internal TimeSpan ReceiveTimeOut { get; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -47,7 +48,7 @@ bool ShouldRenewLock()
this.registerHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand Down Expand Up @@ -147,12 +148,12 @@ async Task MessageDispatchTask(Message message)
catch (Exception exception)
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);

// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(message, propertiesToModify).ConfigureAwait(false);
}

if (ServiceBusDiagnosticSource.IsEnabled())
Expand Down Expand Up @@ -191,13 +192,13 @@ void CancelAutoRenewLock(object state)
}
}

async Task AbandonMessageIfNeededAsync(Message message)
async Task AbandonMessageIfNeededAsync(Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
{
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
25 changes: 22 additions & 3 deletions src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with session pump processing using
Expand All @@ -27,6 +29,22 @@ public sealed class SessionHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentSessions"/> = 2000
/// <see cref="AutoComplete"/> = true
/// <see cref="MessageWaitTimeout"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
// These are default values
this.AutoComplete = true;
Expand All @@ -38,7 +56,7 @@ public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
/// <value>The duration for which the session renew its state.</value>
Expand Down Expand Up @@ -92,15 +110,16 @@ public int MaxConcurrentSessions

internal int MaxConcurrentAcceptSessionCalls { get; set; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,7 +75,7 @@ bool ShouldRenewSessionLock()
this.sessionHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId);
return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand All @@ -96,13 +97,13 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
}
}

async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (session.ReceiveMode == ReceiveMode.PeekLock)
{
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -239,11 +240,11 @@ async Task MessagePumpTaskAsync(IMessageSession session)
}

MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
callbackExceptionOccurred = true;
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(session, message, propertiesToModify).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class MessageHandlerOptions
{
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentCalls { get; set; }
}
Expand Down Expand Up @@ -384,8 +385,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionHandlerOptions
{
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentSessions { get; set; }
public System.TimeSpan MessageWaitTimeout { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<LangVersion>7.3</LangVersion>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it latest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set it explicitly to 7.3 because C# 7.3 is needed for the tests to compile after I added the second MessageHandlerOptions constructor. See Unable to invoke overload when argument is func of non generic task and supplied as method which describes the exact same situation where the compiler generates errors if LangVersion is smaller than 7.3:

OnSessionQueueTests.cs(63, 47): [CS0407] 'Task OnSessionQueueTests.ExceptionReceivedHandler(ExceptionReceivedEventArgs)' has the wrong return type
OnSessionQueueTests.cs(147, 47): [CS0407] 'Task OnSessionQueueTests.ExceptionReceivedHandler(ExceptionReceivedEventArgs)' has the wrong return type
OnSessionTopicSubscriptionTests.cs(104, 47): [CS0407] 'Task OnSessionTopicSubscriptionTests.ExceptionReceivedHandler(ExceptionReceivedEventArgs)' has the wrong return type
SenderReceiverClientTestBase.cs(262, 43): [CS0407] 'Task SenderReceiverClientTestBase.ExceptionReceivedHandler(ExceptionReceivedEventArgs)' has the wrong return type
SenderReceiverClientTestBase.cs(292, 43): [CS0407] 'Task SenderReceiverClientTestBase.ExceptionReceivedHandler(ExceptionReceivedEventArgs)' has the wrong return type

But I’m happy to bump it to latest if you think it’s better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test project is free to use the latest and greatest. Rather than bumping it manually all the time, it could be on the latest all the time.

</PropertyGroup>

<ItemGroup>
Expand Down