diff --git a/infrastructure.ps1 b/infrastructure.ps1
new file mode 100644
index 00000000..545a7ede
--- /dev/null
+++ b/infrastructure.ps1
@@ -0,0 +1 @@
+docker compose -f src/Infrastructure/docker-compose.yml up --force-recreate -V
\ No newline at end of file
diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml
index ffce6c5b..038b631a 100644
--- a/src/Host.Plugin.Properties.xml
+++ b/src/Host.Plugin.Properties.xml
@@ -4,7 +4,7 @@
- 2.3.5
+ 2.4.0-rc1
\ No newline at end of file
diff --git a/src/Infrastructure/README.md b/src/Infrastructure/README.md
index ba4b9d51..caa4f7b5 100644
--- a/src/Infrastructure/README.md
+++ b/src/Infrastructure/README.md
@@ -8,12 +8,18 @@ No volumes have been configured so as to provide clean instances on each run.
src/Infrastructure> docker compose up --force-recreate -V
```
-or
+or Unix
```
> ./infrastructure.sh
```
+or Windows
+
+```
+> .\infrastructure.ps1
+```
+
## Configuration
Personal instances of Azure resources are required as containers are not available for those services.
@@ -26,7 +32,7 @@ Personal instances of Azure resources are required as containers are not availab
### Azure Event Hub
-The transport/plug-in does not provide topology provisioning (just yet).
+The transport/plug-in does not provide topology provisioning ([just yet](https://github.com/zarusz/SlimMessageBus/issues/111)).
The below event hubs and consumer groups will need to be manually added to the Azure Event Hub instance for the integration tests to run.
| Event Hub | Consumer Group |
diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
index 8ee49042..a9e76996 100644
--- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
+++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
@@ -109,7 +109,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
responseConsumer: this,
messagePayloadProvider: m => m.Body.ToArray());
- AddConsumerFrom(topicSubscription, messageProcessor, new[] { Settings.RequestResponse });
+ AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]);
}
}
diff --git a/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs b/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs
index 1b5b4baf..ac5d73c1 100644
--- a/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs
+++ b/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs
@@ -2,8 +2,13 @@
public enum MessageBusLifecycleEventType
{
+ ///
+ /// Invoked when the master bus is created.
+ /// Can be used to initalize any resource before the messages are produced or consumed.
+ ///
+ Created,
Starting,
Started,
Stopping,
- Stopped,
+ Stopped
}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
index be05119a..7a5b4537 100644
--- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
+++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
@@ -14,6 +14,12 @@ public class OutboxSendingTask(
private CancellationTokenSource _loopCts;
private Task _loopTask;
+
+ private readonly object _migrateSchemaTaskLock = new();
+ private Task _migrateSchemaTask;
+ private Task _startBusTask;
+ private Task _stopBusTask;
+
private int _busStartCount;
private DateTime? _cleanupNextRun;
@@ -75,35 +81,63 @@ protected async Task Stop()
public Task OnBusLifecycle(MessageBusLifecycleEventType eventType, IMessageBus bus)
{
+ if (eventType == MessageBusLifecycleEventType.Created)
+ {
+ return EnsureMigrateSchema(_serviceProvider, default);
+ }
if (eventType == MessageBusLifecycleEventType.Started)
{
// The first started bus starts this outbox task
if (Interlocked.Increment(ref _busStartCount) == 1)
{
- return Start();
+ _startBusTask = Start();
}
+ return _startBusTask;
}
if (eventType == MessageBusLifecycleEventType.Stopping)
{
// The last stopped bus stops this outbox task
if (Interlocked.Decrement(ref _busStartCount) == 0)
{
- return Stop();
+ _stopBusTask = Stop();
}
+ return _stopBusTask;
}
return Task.CompletedTask;
}
+ private static async Task MigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken)
+ {
+ try
+ {
+ var outboxMigrationService = serviceProvider.GetRequiredService();
+ await outboxMigrationService.Migrate(cancellationToken);
+ }
+ catch (Exception e)
+ {
+ throw new MessageBusException("Outbox schema migration failed", e);
+ }
+ }
+
+ private Task EnsureMigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken)
+ {
+ lock (_migrateSchemaTaskLock)
+ {
+ // We optimize to ever only run once the schema migration, regardless if it was triggered from 1) bus created lifecycle or 2) outbox sending loop.
+ return _migrateSchemaTask ??= MigrateSchema(serviceProvider, cancellationToken);
+ }
+ }
+
private async Task Run()
{
try
{
_logger.LogInformation("Outbox loop started");
var scope = _serviceProvider.CreateScope();
+
try
{
- var outboxMigrationService = scope.ServiceProvider.GetRequiredService();
- await outboxMigrationService.Migrate(_loopCts.Token);
+ await EnsureMigrateSchema(scope.ServiceProvider, _loopCts.Token);
var outboxRepository = scope.ServiceProvider.GetRequiredService();
@@ -187,17 +221,17 @@ private async Task SendMessages(IServiceProvider serviceProvider, IOutboxR
for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++)
{
var outboxMessage = outboxMessages[i];
-
- var now = DateTime.UtcNow;
- if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
- {
- _logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
- hasMore = false;
- break;
- }
-
- var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
- if (bus == null)
+
+ var now = DateTime.UtcNow;
+ if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
+ {
+ _logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
+ hasMore = false;
+ break;
+ }
+
+ var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
+ if (bus == null)
{
_logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
continue;
diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs
index 1b4c22e8..a447b176 100644
--- a/src/SlimMessageBus.Host/MessageBusBase.cs
+++ b/src/SlimMessageBus.Host/MessageBusBase.cs
@@ -144,23 +144,27 @@ protected void OnBuildProvider()
{
ValidationService.AssertSettings();
- Build();
-
- if (Settings.AutoStartConsumers)
- {
- // Fire and forget start
- _ = Task.Run(async () =>
- {
- try
- {
- await Start().ConfigureAwait(false);
- }
- catch (Exception e)
- {
- _logger.LogError(e, "Could not auto start consumers");
- }
+ Build();
+
+ // Notify the bus has been created - before any message can be produced
+ AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created));
+
+ // Auto start consumers if enabled
+ if (Settings.AutoStartConsumers)
+ {
+ // Fire and forget start
+ _ = Task.Run(async () =>
+ {
+ try
+ {
+ await Start().ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Could not auto start consumers");
+ }
});
- }
+ }
}
protected virtual void Build()
@@ -196,7 +200,7 @@ private Dictionary BuildProducerByBaseMessageType()
private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
{
- _lifecycleInterceptors ??= Settings.ServiceProvider?.GetService>();
+ _lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices();
if (_lifecycleInterceptors != null)
{
foreach (var i in _lifecycleInterceptors)
diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln
index 1c2a380a..18b45c9d 100644
--- a/src/SlimMessageBus.sln
+++ b/src/SlimMessageBus.sln
@@ -241,11 +241,21 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Infrastructure", "Infrastructure", "{137BFD32-CD0A-47CA-8884-209CD49DEE8C}"
ProjectSection(SolutionItems) = preProject
Infrastructure\docker-compose.yml = Infrastructure\docker-compose.yml
+ ..\infrastructure.ps1 = ..\infrastructure.ps1
..\infrastructure.sh = ..\infrastructure.sh
Infrastructure\mosquitto.conf = Infrastructure\mosquitto.conf
Infrastructure\README.md = Infrastructure\README.md
EndProjectSection
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{1A71BB05-58ED-4B27-B4A4-A03D9E608C1C}"
+ ProjectSection(SolutionItems) = preProject
+ ..\build\do_build.ps1 = ..\build\do_build.ps1
+ ..\build\do_package.ps1 = ..\build\do_package.ps1
+ ..\build\do_test.ps1 = ..\build\do_test.ps1
+ ..\build\do_test_ci.ps1 = ..\build\do_test_ci.ps1
+ ..\build\tasks.ps1 = ..\build\tasks.ps1
+ EndProjectSection
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs
index b1e2c281..9a192d9d 100644
--- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs
+++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs
@@ -37,6 +37,8 @@ protected override void SetupServices(ServiceCollection services, IConfiguration
}
await next();
};
+ cfg.PrefetchCount = 100;
+ cfg.MaxConcurrentSessions = 20;
});
mbb.AddServicesFromAssemblyContaining();
mbb.AddJsonSerializer();
@@ -68,7 +70,6 @@ private static void MessageModifierWithSession(PingMessage message, ServiceBusMe
[Fact]
public async Task BasicPubSubOnTopic()
{
- var concurrency = 2;
var subscribers = 2;
var topic = "test-ping";
@@ -82,7 +83,7 @@ public async Task BasicPubSubOnTopic()
.SubscriptionName($"subscriber-{i}") // ensure subscription exists on the ServiceBus topic
.WithConsumer()
.WithConsumer()
- .Instances(concurrency));
+ .Instances(20));
}));
});
@@ -94,13 +95,12 @@ public async Task BasicPubSubOnTopic()
}
};
- await BasicPubSub(concurrency, subscribers, subscribers);
+ await BasicPubSub(subscribers);
}
[Fact]
public async Task BasicPubSubOnQueue()
{
- var concurrency = 2;
var queue = "test-ping-queue";
AddBusConfiguration(mbb =>
@@ -111,7 +111,7 @@ public async Task BasicPubSubOnQueue()
.Queue(queue)
.WithConsumer()
.WithConsumer()
- .Instances(concurrency));
+ .Instances(20));
});
CleanTopology = async client =>
@@ -122,13 +122,12 @@ public async Task BasicPubSubOnQueue()
}
};
- await BasicPubSub(concurrency, 1, 1);
+ await BasicPubSub(1);
}
[Fact]
public async Task BasicPubSubWithCustomConsumerOnQueue()
{
- var concurrency = 2;
var queue = "test-ping-queue";
AddBusConfiguration(mbb =>
@@ -139,7 +138,7 @@ public async Task BasicPubSubWithCustomConsumerOnQueue()
.Queue(queue)
.WithConsumer(typeof(CustomPingConsumer), nameof(CustomPingConsumer.Handle))
.WithConsumer(typeof(CustomPingConsumer), typeof(PingDerivedMessage), nameof(CustomPingConsumer.Handle))
- .Instances(concurrency));
+ .Instances(20));
});
CleanTopology = async client =>
@@ -150,7 +149,7 @@ public async Task BasicPubSubWithCustomConsumerOnQueue()
}
};
- await BasicPubSub(concurrency, 1, 1);
+ await BasicPubSub(1);
}
private static string GetMessageId(PingMessage message) => $"ID_{message.Counter}";
@@ -161,7 +160,7 @@ public class TestData
public IReadOnlyCollection ConsumedMessages { get; set; }
}
- private async Task BasicPubSub(int concurrency, int subscribers, int expectedMessageCopies, Action additionalAssertion = null)
+ private async Task BasicPubSub(int expectedMessageCopies, Action additionalAssertion = null)
{
// arrange
var testMetric = ServiceProvider.GetRequiredService();
@@ -186,7 +185,7 @@ private async Task BasicPubSub(int concurrency, int subscribers, int expectedMes
}
stopwatch.Stop();
- Logger.LogInformation("Published {0} messages in {1}", producedMessages.Count, stopwatch.Elapsed);
+ Logger.LogInformation("Published {Count} messages in {Elapsed}", producedMessages.Count, stopwatch.Elapsed);
// consume
stopwatch.Restart();
@@ -234,7 +233,7 @@ public async Task BasicReqRespOnTopic()
.Handle(x => x.Topic(topic)
.SubscriptionName("handler")
.WithHandler()
- .Instances(2))
+ .Instances(20))
.ExpectRequestResponses(x =>
{
x.ReplyToTopic("test-echo-resp");
@@ -259,7 +258,7 @@ public async Task BasicReqRespOnQueue()
})
.Handle(x => x.Queue(queue)
.WithHandler()
- .Instances(2))
+ .Instances(20))
.ExpectRequestResponses(x =>
{
x.ReplyToQueue("test-echo-queue-resp");
@@ -293,7 +292,7 @@ private async Task BasicReqResp()
await Task.WhenAll(responseTasks).ConfigureAwait(false);
stopwatch.Stop();
- Logger.LogInformation("Published and received {0} messages in {1}", responses.Count, stopwatch.Elapsed);
+ Logger.LogInformation("Published and received {Count} messages in {Elapsed}", responses.Count, stopwatch.Elapsed);
// assert
@@ -305,7 +304,6 @@ private async Task BasicReqResp()
[Fact]
public async Task FIFOUsingSessionsOnQueue()
{
- var concurrency = 1;
var queue = "test-session-queue";
AddBusConfiguration(mbb =>
@@ -316,10 +314,10 @@ public async Task FIFOUsingSessionsOnQueue()
.Queue(queue)
.WithConsumer()
.WithConsumer()
- .Instances(concurrency)
+ .Instances(1)
.EnableSession(x => x.MaxConcurrentSessions(10).SessionIdleTimeout(TimeSpan.FromSeconds(5))));
});
- await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder);
+ await BasicPubSub(1, CheckMessagesWithinSameSessionAreInOrder);
}
private static void CheckMessagesWithinSameSessionAreInOrder(TestData testData)
@@ -337,7 +335,6 @@ private static void CheckMessagesWithinSameSessionAreInOrder(TestData testData)
[Fact]
public async Task FIFOUsingSessionsOnTopic()
{
- var concurrency = 1;
var queue = "test-session-topic";
AddBusConfiguration(mbb =>
@@ -347,12 +344,12 @@ public async Task FIFOUsingSessionsOnTopic()
.Topic(queue)
.WithConsumer()
.WithConsumer()
- .Instances(concurrency)
+ .Instances(1)
.SubscriptionName($"subscriber") // ensure subscription exists on the ServiceBus topic
.EnableSession(x => x.MaxConcurrentSessions(10).SessionIdleTimeout(TimeSpan.FromSeconds(5))));
});
- await BasicPubSub(concurrency, 1, 1, CheckMessagesWithinSameSessionAreInOrder);
+ await BasicPubSub(1, CheckMessagesWithinSameSessionAreInOrder);
}
}
diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs
index b6d36696..cd52a4cb 100644
--- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs
+++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs
@@ -6,6 +6,7 @@
using Azure.Messaging.ServiceBus;
using SlimMessageBus.Host;
+using SlimMessageBus.Host.Interceptor;
using SlimMessageBus.Host.Serialization;
public class ServiceBusMessageBusTests : IDisposable
@@ -22,6 +23,7 @@ public ServiceBusMessageBusTests()
serviceProviderMock.Setup(x => x.GetService(It.Is(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns(Enumerable.Empty