Skip to content

Commit

Permalink
Merge pull request #2 from Shuttle/async
Browse files Browse the repository at this point in the history
Async
  • Loading branch information
eben-roux authored Apr 30, 2024
2 parents 2a01964 + 2468d3d commit 58f2466
Show file tree
Hide file tree
Showing 26 changed files with 480 additions and 124 deletions.
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The Shuttle.Core.Mediator package provides a [mediator pattern](https://en.wikip

## Configuration

In order to get all the relevant bits working you would need to register the `IMediator` dependency along with all the relevant `IParticipant` dependencies.
In order to get all the relevant bits working you would need to register the `IMediator` dependency along with all the relevant `IParticipant` (for synchronous `Send`), or `IAsyncParticipant` (for asynchronous `SendAsync`), dependencies.

You can register the mediator using `IServiceCollection`:

Expand All @@ -26,31 +26,30 @@ services.AddMediator(builder =>

The core interface is the `IMediator` interface and the default implementation provided is the `Mediator` class.

This interface provides a synchronous calling mechanism and all `IParticipant` implementations need to be thread-safe singleton implementations that are added to the mediator at startup. Any operations that require transient mechanisms should be handled by the relevant participant.
This interface provides a synchronous calling mechanism and all participant implementations need to be thread-safe singleton implementations that are added to the mediator at startup. Any operations that require transient mechanisms should be handled by the relevant participant.

```c#
void Send(object message, CancellationToken cancellationToken = default);
```

The `Send` method will find all participants that implements the `IParticipant<T>` with the type `T` of the message type that you are sending. Participants that are marked with the `BeforeParticipantAttribute` filter will be executed first followed by all participants with no filter attributes and then finally all participants marked with the `AfterParticipantAttribute` filter will be called.

### Extensions
The `Send` method will find all participants that implement the `IParticipant<T>` with the type `T` of the message type that you are sending.

```c#
Task SendAsync(this IMediator mediator, object message, CancellationToken cancellationToken = default)
Task SendAsync(object message, CancellationToken cancellationToken = default);
```

Sends a message asynchronously.

```c#
public static T Send<T>(this IMediator mediator, T message, CancellationToken cancellationToken = default);
```
The `SendAsync` method will find all participants that implement the `IAsyncParticipant<T>` with the type `T` of the message type that you are sending.

The same as `Send` except that it returns the given message.
Participants that are marked with the `BeforeParticipantAttribute` filter will be executed first followed by all participants with no filter attributes and then finally all participants marked with the `AfterParticipantAttribute` filter will be called.

## IParticipant
## Participants

```c#
public interface IAsyncParticipant<in T>
{
Task ProcessMessageAsync(IParticipantContext<T> context);
}

public interface IParticipant<in T>
{
void ProcessMessage(IParticipantContext<T> context);
Expand Down
8 changes: 8 additions & 0 deletions Shuttle.Core.Mediator.Tests/AbstractParticipant.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Shuttle.Core.Mediator.Tests
{
Expand All @@ -18,5 +19,12 @@ public void Call()
CallCount++;
WhenCalled = DateTime.Now;
}

public async Task CallAsync()
{
Call();

await Task.CompletedTask.ConfigureAwait(false);
}
}
}
13 changes: 11 additions & 2 deletions Shuttle.Core.Mediator.Tests/AfterRegisterParticipant.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Threading.Tasks;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Mediator.Tests
{
[AfterParticipant]
public class AfterRegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>
public class AfterRegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>, IAsyncParticipant<RegisterMessage>
{
public void ProcessMessage(IParticipantContext<RegisterMessage> context)
{
Expand All @@ -14,5 +14,14 @@ public void ProcessMessage(IParticipantContext<RegisterMessage> context)

Call();
}

public async Task ProcessMessageAsync(IParticipantContext<RegisterMessage> context)
{
Guard.AgainstNull(context, nameof(context));

context.Message.Touch($"[after] : {Id}");

await CallAsync();
}
}
}
13 changes: 11 additions & 2 deletions Shuttle.Core.Mediator.Tests/BeforeRegisterParticipant.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Threading.Tasks;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Mediator.Tests
{
[BeforeParticipant]
public class BeforeRegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>
public class BeforeRegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>, IAsyncParticipant<RegisterMessage>
{
public void ProcessMessage(IParticipantContext<RegisterMessage> context)
{
Expand All @@ -14,5 +14,14 @@ public void ProcessMessage(IParticipantContext<RegisterMessage> context)

Call();
}

public async Task ProcessMessageAsync(IParticipantContext<RegisterMessage> context)
{
Guard.AgainstNull(context, nameof(context));

context.Message.Touch($"[before] : {Id}");

await CallAsync();
}
}
}
7 changes: 7 additions & 0 deletions Shuttle.Core.Mediator.Tests/IMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Shuttle.Core.Mediator.Tests;

public interface IMessageTracker
{
void Received(object message);
int MessageTypeCount<T>();
}
99 changes: 97 additions & 2 deletions Shuttle.Core.Mediator.Tests/MediatorFixture.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;

Expand All @@ -24,8 +25,25 @@ public void Should_be_able_to_send_a_message_to_a_single_participant()

mediator.Send(new WriteMessage { Text = "hello world!" });

Assert.That(((AbstractParticipant)provider.GetRequiredService<IParticipant<WriteMessage>>()).CallCount,
Is.EqualTo(1));
Assert.That(((AbstractParticipant)provider.GetRequiredService<IParticipant<WriteMessage>>()).CallCount, Is.EqualTo(1));
}

[Test]
public async Task Should_be_able_to_send_a_message_to_a_single_participant_async()
{
var services = new ServiceCollection();

services.AddMediator(options =>
{
options.AddParticipant<WriteParticipant>();
});

var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();

await mediator.SendAsync(new WriteMessage { Text = "hello world!" });

Assert.That(((AbstractParticipant)provider.GetRequiredService<IAsyncParticipant<WriteMessage>>()).CallCount, Is.EqualTo(1));
}

[Test]
Expand All @@ -50,6 +68,28 @@ public void Should_be_able_send_a_message_to_multiple_participants()
}
}

[Test]
public async Task Should_be_able_send_a_message_to_multiple_participants_async()
{
var services = new ServiceCollection();

services.AddMediator(options =>
{
options.AddParticipant<WrittenParticipantA>();
options.AddParticipant<WrittenParticipantB>();
});

var provider = services.BuildServiceProvider();
var mediator = new Mediator(provider);

await mediator.SendAsync(new MessageWritten { Text = "hello participants!" });

foreach (var participant in provider.GetServices<IAsyncParticipant<MessageWritten>>())
{
Assert.That(((AbstractParticipant)participant).CallCount, Is.EqualTo(1));
}
}

[Test]
public void Should_be_able_to_perform_pipeline_processing()
{
Expand Down Expand Up @@ -104,5 +144,60 @@ public void Should_be_able_to_perform_pipeline_processing()
Console.WriteLine(text);
}
}

[Test]
public async Task Should_be_able_to_perform_pipeline_processing_async()
{
var services = new ServiceCollection();

var beforeA = new BeforeRegisterParticipant();
var beforeB = new BeforeRegisterParticipant();
var registerA = new RegisterParticipant();
var registerB = new RegisterParticipant();
var afterA = new AfterRegisterParticipant();
var afterB = new AfterRegisterParticipant();

var participants = new List<IAsyncParticipant<RegisterMessage>>
{
beforeA,
beforeB,
registerA,
registerB,
afterA,
afterB
};

services.AddMediator(options =>
{
foreach (var participant in participants)
{
options.AddParticipant(participant);
}
});

var provider = services.BuildServiceProvider();
var mediator = new Mediator(provider);
var message = new RegisterMessage();

await mediator.SendAsync(message);

Assert.That(message.Messages.Count(), Is.EqualTo(6));

foreach (var participant in participants)
{
Assert.That(((AbstractParticipant)participant).CallCount, Is.EqualTo(1));
}

Assert.That(beforeB.WhenCalled, Is.GreaterThan(beforeA.WhenCalled));
Assert.That(registerA.WhenCalled, Is.GreaterThan(beforeB.WhenCalled));
Assert.That(registerB.WhenCalled, Is.GreaterThan(registerA.WhenCalled));
Assert.That(afterA.WhenCalled, Is.GreaterThan(registerB.WhenCalled));
Assert.That(afterB.WhenCalled, Is.GreaterThan(afterA.WhenCalled));

foreach (var text in message.Messages)
{
Console.WriteLine(text);
}
}
}
}
20 changes: 20 additions & 0 deletions Shuttle.Core.Mediator.Tests/MessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;
using System.Linq;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Mediator.Tests;

public class MessageTracker : IMessageTracker
{
private readonly List<object> _messagesReceived = new();

public void Received(object message)
{
_messagesReceived.Add(Guard.AgainstNull(message, nameof(message)));
}

public int MessageTypeCount<T>()
{
return _messagesReceived.Count(item => item.GetType() == typeof(T));
}
}
5 changes: 5 additions & 0 deletions Shuttle.Core.Mediator.Tests/MultipleParticipantMessageA.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Shuttle.Core.Mediator.Tests;

public class MultipleParticipantMessageA
{
}
5 changes: 5 additions & 0 deletions Shuttle.Core.Mediator.Tests/MultipleParticipantMessageB.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Shuttle.Core.Mediator.Tests;

public class MultipleParticipantMessageB
{
}
56 changes: 31 additions & 25 deletions Shuttle.Core.Mediator.Tests/MultipleParticipants.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Mediator.Tests
namespace Shuttle.Core.Mediator.Tests;

public class MultipleParticipants :
IParticipant<MultipleParticipantMessageA>,
IParticipant<MultipleParticipantMessageB>,
IAsyncParticipant<MultipleParticipantMessageA>,
IAsyncParticipant<MultipleParticipantMessageB>
{
public class MultipleParticipants :
IParticipant<MultipleParticipantMessageA>,
IParticipant<MultipleParticipantMessageB>
private readonly IMessageTracker _messageTracker;

public MultipleParticipants(IMessageTracker messageTracker)
{
_messageTracker = Guard.AgainstNull(messageTracker, nameof(messageTracker));
}

public async Task ProcessMessageAsync(IParticipantContext<MultipleParticipantMessageA> context)
{
private static readonly List<object> _messagesReceived = new List<object>();

public int MessageTypeCount(Type type)
{
return _messagesReceived.Count(item => item.GetType() == type);
}

public void ProcessMessage(IParticipantContext<MultipleParticipantMessageA> context)
{
_messagesReceived.Add(context.Message);
}

public void ProcessMessage(IParticipantContext<MultipleParticipantMessageB> context)
{
_messagesReceived.Add(context.Message);
}
_messageTracker.Received(context.Message);

await Task.CompletedTask.ConfigureAwait(false);
}

public async Task ProcessMessageAsync(IParticipantContext<MultipleParticipantMessageB> context)
{
_messageTracker.Received(context.Message);

await Task.CompletedTask.ConfigureAwait(false);
}

public class MultipleParticipantMessageB
public void ProcessMessage(IParticipantContext<MultipleParticipantMessageA> context)
{
_messageTracker.Received(context.Message);
}

public class MultipleParticipantMessageA
public void ProcessMessage(IParticipantContext<MultipleParticipantMessageB> context)
{
_messageTracker.Received(context.Message);
}
}
12 changes: 11 additions & 1 deletion Shuttle.Core.Mediator.Tests/RegisterParticipant.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System.Threading.Tasks;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Mediator.Tests
{
public class RegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>
public class RegisterParticipant : AbstractParticipant, IParticipant<RegisterMessage>, IAsyncParticipant<RegisterMessage>
{
public void ProcessMessage(IParticipantContext<RegisterMessage> context)
{
Expand All @@ -13,5 +14,14 @@ public void ProcessMessage(IParticipantContext<RegisterMessage> context)

Call();
}

public async Task ProcessMessageAsync(IParticipantContext<RegisterMessage> context)
{
Guard.AgainstNull(context, nameof(context));

context.Message.Touch($"[proper] : {Id}");

await CallAsync();
}
}
}
Loading

0 comments on commit 58f2466

Please sign in to comment.