From 4daace1a921537ffda11e1a5f45665f7d904d692 Mon Sep 17 00:00:00 2001 From: Anne Erdtsieck Date: Sun, 10 Dec 2023 14:28:30 +0100 Subject: [PATCH 1/2] #647 Await forwarded event actions from integration tests --- docs/.vitepress/cache/deps/_metadata.json | 6 +- .../guide/durability/marten/event-sourcing.md | 75 +++++++++++++++++++ docs/guide/http/metadata.md | 4 +- .../rabbitmq/conventional-routing.md | 4 +- .../transports/rabbitmq/deadletterqueues.md | 6 +- .../messaging/transports/rabbitmq/index.md | 30 +++++++- .../transports/rabbitmq/interoperability.md | 4 +- .../transports/rabbitmq/listening.md | 4 +- .../transports/rabbitmq/object-management.md | 6 +- .../transports/rabbitmq/publishing.md | 6 +- .../Marten/event_streaming.cs | 54 ++++++++++--- .../MartenTestingExtensions.cs | 49 ++++++++++++ 12 files changed, 216 insertions(+), 32 deletions(-) create mode 100644 src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs diff --git a/docs/.vitepress/cache/deps/_metadata.json b/docs/.vitepress/cache/deps/_metadata.json index 86ebb7afc..9f3e1bde3 100644 --- a/docs/.vitepress/cache/deps/_metadata.json +++ b/docs/.vitepress/cache/deps/_metadata.json @@ -1,11 +1,11 @@ { - "hash": "d9310bc8", - "browserHash": "53be324b", + "hash": "69c24641", + "browserHash": "b1a93afb", "optimized": { "vue": { "src": "../../../../node_modules/vue/dist/vue.runtime.esm-bundler.js", "file": "vue.js", - "fileHash": "b506e708", + "fileHash": "f3cb9093", "needsInterop": false } }, diff --git a/docs/guide/durability/marten/event-sourcing.md b/docs/guide/durability/marten/event-sourcing.md index fadd0bd34..7741e072e 100644 --- a/docs/guide/durability/marten/event-sourcing.md +++ b/docs/guide/durability/marten/event-sourcing.md @@ -461,3 +461,78 @@ builder.Host.UseWolverine(opts => ``` snippet source | anchor + +This forwarding of events is using an outbox that can be awaited in your tests using this extension method: + + + +```cs +public static Task SaveInMartenAndWaitForOutgoingMessagesAsync(this IHost host, Action action, int timeoutInMilliseconds = 5000) +{ + var factory = host.Services.GetRequiredService(); + + return host.ExecuteAndWaitAsync(async context => + { + var session = factory.OpenSession(context); + action(session); + await session.SaveChangesAsync(); + + // Shouldn't be necessary, but real life says do it anyway + await context.As().FlushOutgoingMessagesAsync(); + }, timeoutInMilliseconds); +} +``` +snippet source | anchor + + +To be used in your tests such as this: + + + +```cs +[Fact] +public async Task execution_of_forwarded_events_can_be_awaited_from_tests() +{ + var host = await Host.CreateDefaultBuilder() + .UseWolverine() + .ConfigureServices(services => + { + services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine().EventForwardingToWolverine(opts => + { + opts.SubscribeToEvent().TransformedTo(e => + new SecondMessage(e.StreamId, e.Sequence)); + }); + }).StartAsync(); + + var aggregateId = Guid.NewGuid(); + await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + { + session.Events.Append(aggregateId, new SecondEvent()); + }, 100_000); + + using var store = host.Services.GetRequiredService(); + await using var session = store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(aggregateId); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); +} +``` +snippet source | anchor + + +Where the result contains `FourthEvent` because `SecondEvent` was forwarded as `SecondMessage` and that persisted `FourthEvent` in a handler such as: + + + + +```cs +public static Task HandleAsync(SecondMessage message, IDocumentSession session) +{ + session.Events.Append(message.AggregateId, new FourthEvent()); + return session.SaveChangesAsync(); +} +``` +snippet source | anchor + diff --git a/docs/guide/http/metadata.md b/docs/guide/http/metadata.md index 28d378a4d..ed086c584 100644 --- a/docs/guide/http/metadata.md +++ b/docs/guide/http/metadata.md @@ -45,7 +45,7 @@ public static void Configure(HttpChain chain) chain.Metadata.Add(builder => { // Adding and modifying data - builder.Metadata.Add(new ProducesResponseTypeMetadata { StatusCode = 202, Type = null }); + builder.Metadata.Add(new WolverineProducesResponseTypeMetadata { StatusCode = 202, Type = null }); builder.RemoveStatusCodeResponse(200); }); } @@ -76,7 +76,7 @@ public record CreationResponse(string Url) : IHttpAware builder.RemoveStatusCodeResponse(200); var create = new MethodCall(method.DeclaringType!, method).Creates.FirstOrDefault()?.VariableType; - var metadata = new ProducesResponseTypeMetadata { Type = create, StatusCode = 201 }; + var metadata = new WolverineProducesResponseTypeMetadata { Type = create, StatusCode = 201 }; builder.Metadata.Add(metadata); } diff --git a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md index b1c4bc175..9a6a6bd8e 100644 --- a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md +++ b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md @@ -20,7 +20,7 @@ using var host = await Host.CreateDefaultBuilder() .UseConventionalRouting(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With the defaults from above, for each message that the application can handle @@ -73,7 +73,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md index a03b19de0..e4ebd12bb 100644 --- a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md +++ b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md @@ -36,7 +36,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ::: warning @@ -71,7 +71,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor And lastly, if you don't particularly want to have any Rabbit MQ dead letter queues and you quite like the [database backed @@ -100,7 +100,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md index 10535d21c..b4fa9b3e6 100644 --- a/docs/guide/messaging/transports/rabbitmq/index.md +++ b/docs/guide/messaging/transports/rabbitmq/index.md @@ -61,7 +61,35 @@ for request/reply invocations (`IMessageBus.InvokeAsync()` when used remotely have permissions with your Rabbit MQ broker to create queues, you may encounter errors. Not to worry, you can disable that Wolverine system queue creation with: -snippet: sample_disable_rabbit_mq_system_queue + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // *A* way to configure Rabbit MQ using their Uri schema + // documented here: https://www.rabbitmq.com/uri-spec.html + opts.UseRabbitMq(new Uri("amqp://localhost")) + + // Stop Wolverine from trying to create a reply queue + // for this node if your process does not have permission to + // do so against your Rabbit MQ broker + .DisableSystemRequestReplyQueueDeclaration(); + + + + // Set up a listener for a queue, but also + // fine-tune the queue characteristics if Wolverine + // will be governing the queue setup + opts.ListenToRabbitQueue("incoming2", q => + { + q.PurgeOnStartup = true; + q.TimeToLive(5.Minutes()); + }); + }).StartAsync(); +``` +snippet source | anchor + Of course, doing so means that you will not be able to do request/reply through Rabbit MQ with your Wolverine application. diff --git a/docs/guide/messaging/transports/rabbitmq/interoperability.md b/docs/guide/messaging/transports/rabbitmq/interoperability.md index 600527322..a66cc2c21 100644 --- a/docs/guide/messaging/transports/rabbitmq/interoperability.md +++ b/docs/guide/messaging/transports/rabbitmq/interoperability.md @@ -36,7 +36,7 @@ using var host = await Host.CreateDefaultBuilder() .DefaultIncomingMessage(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With this setting, there is **no other required headers** for Wolverine to process incoming messages. However, Wolverine will be @@ -128,7 +128,7 @@ using var host = await Host.CreateDefaultBuilder() .DefaultIncomingMessage(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/listening.md b/docs/guide/messaging/transports/rabbitmq/listening.md index 2d2940e83..e9d4dddd1 100644 --- a/docs/guide/messaging/transports/rabbitmq/listening.md +++ b/docs/guide/messaging/transports/rabbitmq/listening.md @@ -38,7 +38,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor To optimize and tune the message processing, you may want to read more about the [Rabbit MQ prefetch count and prefetch @@ -80,5 +80,5 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/object-management.md b/docs/guide/messaging/transports/rabbitmq/object-management.md index 2bc6785d2..67b213873 100644 --- a/docs/guide/messaging/transports/rabbitmq/object-management.md +++ b/docs/guide/messaging/transports/rabbitmq/object-management.md @@ -30,7 +30,7 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitExchange("exchange1"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor At development time -- or occasionally in production systems -- you may want to have the messaging @@ -47,7 +47,7 @@ using var host = await Host.CreateDefaultBuilder() .AutoPurgeOnStartup(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Or you can be more selective and only have certain queues of volatile messages purged @@ -64,7 +64,7 @@ using var host = await Host.CreateDefaultBuilder() .DeclareQueue("queue2", q => q.PurgeOnStartup = true); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Wolverine's Rabbit MQ integration also supports the [Oakton stateful resource](https://jasperfx.github.io/oakton/guide/host/resources.html) model, diff --git a/docs/guide/messaging/transports/rabbitmq/publishing.md b/docs/guide/messaging/transports/rabbitmq/publishing.md index 07502e499..cfdfd9feb 100644 --- a/docs/guide/messaging/transports/rabbitmq/publishing.md +++ b/docs/guide/messaging/transports/rabbitmq/publishing.md @@ -23,7 +23,7 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitQueue("special", queue => { queue.IsExclusive = true; }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Publish to an Exchange @@ -55,7 +55,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Publish to a Routing Key @@ -84,6 +84,6 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitExchange("exchange1"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/src/Persistence/PersistenceTests/Marten/event_streaming.cs b/src/Persistence/PersistenceTests/Marten/event_streaming.cs index 5a049474b..8d2bd253d 100644 --- a/src/Persistence/PersistenceTests/Marten/event_streaming.cs +++ b/src/Persistence/PersistenceTests/Marten/event_streaming.cs @@ -1,7 +1,8 @@ using IntegrationTests; -using JasperFx.Core; using Marten; using Marten.Events; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Oakton.Resources; using Shouldly; @@ -11,7 +12,6 @@ using Wolverine.Marten; using Wolverine.Tracking; using Wolverine.Transports.Tcp; -using Wolverine.Util; using Xunit; namespace PersistenceTests.Marten; @@ -76,12 +76,6 @@ public async Task DisposeAsync() await theSender.StopAsync(); } - public void Dispose() - { - theReceiver?.Dispose(); - theSender?.Dispose(); - } - [Fact] public void preview_routes() { @@ -106,6 +100,37 @@ public async Task event_should_be_published_from_sender_to_receiver() results.Executed.SingleMessage().ShouldNotBeNull(); } + + #region sample_execution_of_forwarded_events_can_be_awaited_from_tests + [Fact] + public async Task execution_of_forwarded_events_can_be_awaited_from_tests() + { + var host = await Host.CreateDefaultBuilder() + .UseWolverine() + .ConfigureServices(services => + { + services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine().EventForwardingToWolverine(opts => + { + opts.SubscribeToEvent().TransformedTo(e => + new SecondMessage(e.StreamId, e.Sequence)); + }); + }).StartAsync(); + + var aggregateId = Guid.NewGuid(); + await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + { + session.Events.Append(aggregateId, new SecondEvent()); + }, 100_000); + + using var store = host.Services.GetRequiredService(); + await using var session = store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(aggregateId); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } + #endregion } @@ -127,7 +152,7 @@ public void Handle(ThirdEvent e) } } -public record SecondMessage(long Sequence); +public record SecondMessage(Guid AggregateId, long Sequence); public class SecondEvent { @@ -135,6 +160,7 @@ public class SecondEvent } public class ThirdEvent{} +public class FourthEvent{} public class TriggeredEvent { @@ -150,6 +176,12 @@ public void Handle(TriggeredEvent message) { _source.SetResult(message); } - - public void Handle(SecondMessage message){} + + #region sample_execution_of_forwarded_events_second_message_to_fourth_event + public static Task HandleAsync(SecondMessage message, IDocumentSession session) + { + session.Events.Append(message.AggregateId, new FourthEvent()); + return session.SaveChangesAsync(); + } + #endregion } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs b/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs new file mode 100644 index 000000000..5f4766947 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs @@ -0,0 +1,49 @@ +using JasperFx.Core.Reflection; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Marten.Publishing; +using Wolverine.Runtime; +using Wolverine.Tracking; + +namespace Wolverine.Marten; + +public static class MartenTestingExtensions +{ + /// + /// Saves changes to the Marten document session and waits for all outgoing messages to be flushed. + /// + /// + /// This method provides an extension for IHost that initializes a new document session within the + /// given host's service scope, applies the provided actions to the session, and ensures all changes are saved. + /// After saving, it explicitly flushes any outgoing messages to guarantee that all message side-effects + /// are completed before the task completes. This method should be used in testing environments where + /// immediate consistency of the session and the outgoing message pipeline is required. + /// + /// + /// Here is how you can use the extension method within your tests: + /// + /// await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + /// { + /// // Perform actions on the session such as saving events + /// session.Events.Append(_request.ConsultatieId, new QuestStarted()); + /// }); + /// + /// + #region sample_save_in_martend_and_wait_for_outgoing_messages + public static Task SaveInMartenAndWaitForOutgoingMessagesAsync(this IHost host, Action action, int timeoutInMilliseconds = 5000) + { + var factory = host.Services.GetRequiredService(); + + return host.ExecuteAndWaitAsync(async context => + { + var session = factory.OpenSession(context); + action(session); + await session.SaveChangesAsync(); + + // Shouldn't be necessary, but real life says do it anyway + await context.As().FlushOutgoingMessagesAsync(); + }, timeoutInMilliseconds); + } + #endregion +} \ No newline at end of file From d80626222180130146e94be779f9566d9c088e65 Mon Sep 17 00:00:00 2001 From: Anne Erdtsieck Date: Sun, 10 Dec 2023 14:28:30 +0100 Subject: [PATCH 2/2] #647 Await forwarded event actions from integration tests --- docs/.vitepress/cache/deps/_metadata.json | 6 +- .../guide/durability/marten/event-sourcing.md | 75 +++++++++++++++++++ docs/guide/http/metadata.md | 4 +- .../rabbitmq/conventional-routing.md | 4 +- .../transports/rabbitmq/deadletterqueues.md | 6 +- .../messaging/transports/rabbitmq/index.md | 30 +++++++- .../transports/rabbitmq/interoperability.md | 4 +- .../transports/rabbitmq/listening.md | 4 +- .../transports/rabbitmq/object-management.md | 6 +- .../transports/rabbitmq/publishing.md | 6 +- .../Marten/event_streaming.cs | 56 +++++++++++--- .../MartenTestingExtensions.cs | 49 ++++++++++++ 12 files changed, 217 insertions(+), 33 deletions(-) create mode 100644 src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs diff --git a/docs/.vitepress/cache/deps/_metadata.json b/docs/.vitepress/cache/deps/_metadata.json index 86ebb7afc..9f3e1bde3 100644 --- a/docs/.vitepress/cache/deps/_metadata.json +++ b/docs/.vitepress/cache/deps/_metadata.json @@ -1,11 +1,11 @@ { - "hash": "d9310bc8", - "browserHash": "53be324b", + "hash": "69c24641", + "browserHash": "b1a93afb", "optimized": { "vue": { "src": "../../../../node_modules/vue/dist/vue.runtime.esm-bundler.js", "file": "vue.js", - "fileHash": "b506e708", + "fileHash": "f3cb9093", "needsInterop": false } }, diff --git a/docs/guide/durability/marten/event-sourcing.md b/docs/guide/durability/marten/event-sourcing.md index fadd0bd34..7741e072e 100644 --- a/docs/guide/durability/marten/event-sourcing.md +++ b/docs/guide/durability/marten/event-sourcing.md @@ -461,3 +461,78 @@ builder.Host.UseWolverine(opts => ``` snippet source | anchor + +This forwarding of events is using an outbox that can be awaited in your tests using this extension method: + + + +```cs +public static Task SaveInMartenAndWaitForOutgoingMessagesAsync(this IHost host, Action action, int timeoutInMilliseconds = 5000) +{ + var factory = host.Services.GetRequiredService(); + + return host.ExecuteAndWaitAsync(async context => + { + var session = factory.OpenSession(context); + action(session); + await session.SaveChangesAsync(); + + // Shouldn't be necessary, but real life says do it anyway + await context.As().FlushOutgoingMessagesAsync(); + }, timeoutInMilliseconds); +} +``` +snippet source | anchor + + +To be used in your tests such as this: + + + +```cs +[Fact] +public async Task execution_of_forwarded_events_can_be_awaited_from_tests() +{ + var host = await Host.CreateDefaultBuilder() + .UseWolverine() + .ConfigureServices(services => + { + services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine().EventForwardingToWolverine(opts => + { + opts.SubscribeToEvent().TransformedTo(e => + new SecondMessage(e.StreamId, e.Sequence)); + }); + }).StartAsync(); + + var aggregateId = Guid.NewGuid(); + await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + { + session.Events.Append(aggregateId, new SecondEvent()); + }, 100_000); + + using var store = host.Services.GetRequiredService(); + await using var session = store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(aggregateId); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); +} +``` +snippet source | anchor + + +Where the result contains `FourthEvent` because `SecondEvent` was forwarded as `SecondMessage` and that persisted `FourthEvent` in a handler such as: + + + + +```cs +public static Task HandleAsync(SecondMessage message, IDocumentSession session) +{ + session.Events.Append(message.AggregateId, new FourthEvent()); + return session.SaveChangesAsync(); +} +``` +snippet source | anchor + diff --git a/docs/guide/http/metadata.md b/docs/guide/http/metadata.md index 28d378a4d..ed086c584 100644 --- a/docs/guide/http/metadata.md +++ b/docs/guide/http/metadata.md @@ -45,7 +45,7 @@ public static void Configure(HttpChain chain) chain.Metadata.Add(builder => { // Adding and modifying data - builder.Metadata.Add(new ProducesResponseTypeMetadata { StatusCode = 202, Type = null }); + builder.Metadata.Add(new WolverineProducesResponseTypeMetadata { StatusCode = 202, Type = null }); builder.RemoveStatusCodeResponse(200); }); } @@ -76,7 +76,7 @@ public record CreationResponse(string Url) : IHttpAware builder.RemoveStatusCodeResponse(200); var create = new MethodCall(method.DeclaringType!, method).Creates.FirstOrDefault()?.VariableType; - var metadata = new ProducesResponseTypeMetadata { Type = create, StatusCode = 201 }; + var metadata = new WolverineProducesResponseTypeMetadata { Type = create, StatusCode = 201 }; builder.Metadata.Add(metadata); } diff --git a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md index b1c4bc175..9a6a6bd8e 100644 --- a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md +++ b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md @@ -20,7 +20,7 @@ using var host = await Host.CreateDefaultBuilder() .UseConventionalRouting(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With the defaults from above, for each message that the application can handle @@ -73,7 +73,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md index a03b19de0..e4ebd12bb 100644 --- a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md +++ b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md @@ -36,7 +36,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ::: warning @@ -71,7 +71,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor And lastly, if you don't particularly want to have any Rabbit MQ dead letter queues and you quite like the [database backed @@ -100,7 +100,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md index 10535d21c..b4fa9b3e6 100644 --- a/docs/guide/messaging/transports/rabbitmq/index.md +++ b/docs/guide/messaging/transports/rabbitmq/index.md @@ -61,7 +61,35 @@ for request/reply invocations (`IMessageBus.InvokeAsync()` when used remotely have permissions with your Rabbit MQ broker to create queues, you may encounter errors. Not to worry, you can disable that Wolverine system queue creation with: -snippet: sample_disable_rabbit_mq_system_queue + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // *A* way to configure Rabbit MQ using their Uri schema + // documented here: https://www.rabbitmq.com/uri-spec.html + opts.UseRabbitMq(new Uri("amqp://localhost")) + + // Stop Wolverine from trying to create a reply queue + // for this node if your process does not have permission to + // do so against your Rabbit MQ broker + .DisableSystemRequestReplyQueueDeclaration(); + + + + // Set up a listener for a queue, but also + // fine-tune the queue characteristics if Wolverine + // will be governing the queue setup + opts.ListenToRabbitQueue("incoming2", q => + { + q.PurgeOnStartup = true; + q.TimeToLive(5.Minutes()); + }); + }).StartAsync(); +``` +snippet source | anchor + Of course, doing so means that you will not be able to do request/reply through Rabbit MQ with your Wolverine application. diff --git a/docs/guide/messaging/transports/rabbitmq/interoperability.md b/docs/guide/messaging/transports/rabbitmq/interoperability.md index 600527322..a66cc2c21 100644 --- a/docs/guide/messaging/transports/rabbitmq/interoperability.md +++ b/docs/guide/messaging/transports/rabbitmq/interoperability.md @@ -36,7 +36,7 @@ using var host = await Host.CreateDefaultBuilder() .DefaultIncomingMessage(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With this setting, there is **no other required headers** for Wolverine to process incoming messages. However, Wolverine will be @@ -128,7 +128,7 @@ using var host = await Host.CreateDefaultBuilder() .DefaultIncomingMessage(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/listening.md b/docs/guide/messaging/transports/rabbitmq/listening.md index 2d2940e83..e9d4dddd1 100644 --- a/docs/guide/messaging/transports/rabbitmq/listening.md +++ b/docs/guide/messaging/transports/rabbitmq/listening.md @@ -38,7 +38,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor To optimize and tune the message processing, you may want to read more about the [Rabbit MQ prefetch count and prefetch @@ -80,5 +80,5 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/rabbitmq/object-management.md b/docs/guide/messaging/transports/rabbitmq/object-management.md index 2bc6785d2..67b213873 100644 --- a/docs/guide/messaging/transports/rabbitmq/object-management.md +++ b/docs/guide/messaging/transports/rabbitmq/object-management.md @@ -30,7 +30,7 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitExchange("exchange1"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor At development time -- or occasionally in production systems -- you may want to have the messaging @@ -47,7 +47,7 @@ using var host = await Host.CreateDefaultBuilder() .AutoPurgeOnStartup(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Or you can be more selective and only have certain queues of volatile messages purged @@ -64,7 +64,7 @@ using var host = await Host.CreateDefaultBuilder() .DeclareQueue("queue2", q => q.PurgeOnStartup = true); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Wolverine's Rabbit MQ integration also supports the [Oakton stateful resource](https://jasperfx.github.io/oakton/guide/host/resources.html) model, diff --git a/docs/guide/messaging/transports/rabbitmq/publishing.md b/docs/guide/messaging/transports/rabbitmq/publishing.md index 07502e499..cfdfd9feb 100644 --- a/docs/guide/messaging/transports/rabbitmq/publishing.md +++ b/docs/guide/messaging/transports/rabbitmq/publishing.md @@ -23,7 +23,7 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitQueue("special", queue => { queue.IsExclusive = true; }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Publish to an Exchange @@ -55,7 +55,7 @@ using var host = await Host.CreateDefaultBuilder() }); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Publish to a Routing Key @@ -84,6 +84,6 @@ using var host = await Host.CreateDefaultBuilder() opts.PublishAllMessages().ToRabbitExchange("exchange1"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/src/Persistence/PersistenceTests/Marten/event_streaming.cs b/src/Persistence/PersistenceTests/Marten/event_streaming.cs index 5a049474b..608d43552 100644 --- a/src/Persistence/PersistenceTests/Marten/event_streaming.cs +++ b/src/Persistence/PersistenceTests/Marten/event_streaming.cs @@ -1,7 +1,8 @@ using IntegrationTests; -using JasperFx.Core; using Marten; using Marten.Events; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Oakton.Resources; using Shouldly; @@ -11,7 +12,6 @@ using Wolverine.Marten; using Wolverine.Tracking; using Wolverine.Transports.Tcp; -using Wolverine.Util; using Xunit; namespace PersistenceTests.Marten; @@ -61,7 +61,7 @@ public async Task InitializeAsync() services.AddMarten(Servers.PostgresConnectionString) .IntegrateWithWolverine("sender").EventForwardingToWolverine(opts => { - opts.SubscribeToEvent().TransformedTo(e => new SecondMessage(e.Sequence)); + opts.SubscribeToEvent().TransformedTo(e => new SecondMessage(e.StreamId, e.Sequence)); }); services.AddResourceSetupOnStartup(); @@ -76,12 +76,6 @@ public async Task DisposeAsync() await theSender.StopAsync(); } - public void Dispose() - { - theReceiver?.Dispose(); - theSender?.Dispose(); - } - [Fact] public void preview_routes() { @@ -106,6 +100,37 @@ public async Task event_should_be_published_from_sender_to_receiver() results.Executed.SingleMessage().ShouldNotBeNull(); } + + #region sample_execution_of_forwarded_events_can_be_awaited_from_tests + [Fact] + public async Task execution_of_forwarded_events_can_be_awaited_from_tests() + { + var host = await Host.CreateDefaultBuilder() + .UseWolverine() + .ConfigureServices(services => + { + services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine().EventForwardingToWolverine(opts => + { + opts.SubscribeToEvent().TransformedTo(e => + new SecondMessage(e.StreamId, e.Sequence)); + }); + }).StartAsync(); + + var aggregateId = Guid.NewGuid(); + await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + { + session.Events.Append(aggregateId, new SecondEvent()); + }, 100_000); + + using var store = host.Services.GetRequiredService(); + await using var session = store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(aggregateId); + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } + #endregion } @@ -127,7 +152,7 @@ public void Handle(ThirdEvent e) } } -public record SecondMessage(long Sequence); +public record SecondMessage(Guid AggregateId, long Sequence); public class SecondEvent { @@ -135,6 +160,7 @@ public class SecondEvent } public class ThirdEvent{} +public class FourthEvent{} public class TriggeredEvent { @@ -150,6 +176,12 @@ public void Handle(TriggeredEvent message) { _source.SetResult(message); } - - public void Handle(SecondMessage message){} + + #region sample_execution_of_forwarded_events_second_message_to_fourth_event + public static Task HandleAsync(SecondMessage message, IDocumentSession session) + { + session.Events.Append(message.AggregateId, new FourthEvent()); + return session.SaveChangesAsync(); + } + #endregion } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs b/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs new file mode 100644 index 000000000..5f4766947 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/MartenTestingExtensions.cs @@ -0,0 +1,49 @@ +using JasperFx.Core.Reflection; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Marten.Publishing; +using Wolverine.Runtime; +using Wolverine.Tracking; + +namespace Wolverine.Marten; + +public static class MartenTestingExtensions +{ + /// + /// Saves changes to the Marten document session and waits for all outgoing messages to be flushed. + /// + /// + /// This method provides an extension for IHost that initializes a new document session within the + /// given host's service scope, applies the provided actions to the session, and ensures all changes are saved. + /// After saving, it explicitly flushes any outgoing messages to guarantee that all message side-effects + /// are completed before the task completes. This method should be used in testing environments where + /// immediate consistency of the session and the outgoing message pipeline is required. + /// + /// + /// Here is how you can use the extension method within your tests: + /// + /// await host.SaveInMartenAndWaitForOutgoingMessagesAsync(session => + /// { + /// // Perform actions on the session such as saving events + /// session.Events.Append(_request.ConsultatieId, new QuestStarted()); + /// }); + /// + /// + #region sample_save_in_martend_and_wait_for_outgoing_messages + public static Task SaveInMartenAndWaitForOutgoingMessagesAsync(this IHost host, Action action, int timeoutInMilliseconds = 5000) + { + var factory = host.Services.GetRequiredService(); + + return host.ExecuteAndWaitAsync(async context => + { + var session = factory.OpenSession(context); + action(session); + await session.SaveChangesAsync(); + + // Shouldn't be necessary, but real life says do it anyway + await context.As().FlushOutgoingMessagesAsync(); + }, timeoutInMilliseconds); + } + #endregion +} \ No newline at end of file