Skip to content

Commit

Permalink
[Host.Outbox] Ensure SQL tables are provisioned without consumer star…
Browse files Browse the repository at this point in the history
…t and with first publish.

Also:
- Add baseline performance test for Outbox.
- Send Created bus lifecycle event to be able to hook into Master bus creation

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jun 15, 2024
1 parent 959bf28 commit e60b21f
Show file tree
Hide file tree
Showing 26 changed files with 470 additions and 145 deletions.
1 change: 1 addition & 0 deletions infrastructure.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker compose -f src/Infrastructure/docker-compose.yml up --force-recreate -V
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.3.5</Version>
<Version>2.4.0-rc1</Version>
</PropertyGroup>

</Project>
10 changes: 8 additions & 2 deletions src/Infrastructure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ No volumes have been configured so as to provide clean instances on each run.
src/Infrastructure> docker compose up --force-recreate -V
```

or
or Unix

```
> ./infrastructure.sh
```

or Windows

```
> .\infrastructure.ps1
```

## Configuration

Personal instances of Azure resources are required as containers are not available for those services.
Expand All @@ -26,7 +32,7 @@ Personal instances of Azure resources are required as containers are not availab

### Azure Event Hub

The transport/plug-in does not provide topology provisioning (just yet).
The transport/plug-in does not provide topology provisioning ([just yet](https://github.com/zarusz/SlimMessageBus/issues/111)).
The below event hubs and consumer groups will need to be manually added to the Azure Event Hub instance for the integration tests to run.

| Event Hub | Consumer Group |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
responseConsumer: this,
messagePayloadProvider: m => m.Body.ToArray());

AddConsumerFrom(topicSubscription, messageProcessor, new[] { Settings.RequestResponse });
AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

public enum MessageBusLifecycleEventType
{
/// <summary>
/// Invoked when the master bus is created.
/// Can be used to initalize any resource before the messages are produced or consumed.
/// </summary>
Created,
Starting,
Started,
Stopping,
Stopped,
Stopped
}
64 changes: 49 additions & 15 deletions src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public class OutboxSendingTask(

private CancellationTokenSource _loopCts;
private Task _loopTask;

private readonly object _migrateSchemaTaskLock = new();
private Task _migrateSchemaTask;
private Task _startBusTask;
private Task _stopBusTask;

private int _busStartCount;

private DateTime? _cleanupNextRun;
Expand Down Expand Up @@ -75,35 +81,63 @@ protected async Task Stop()

public Task OnBusLifecycle(MessageBusLifecycleEventType eventType, IMessageBus bus)
{
if (eventType == MessageBusLifecycleEventType.Created)
{
return EnsureMigrateSchema(_serviceProvider, default);
}
if (eventType == MessageBusLifecycleEventType.Started)
{
// The first started bus starts this outbox task
if (Interlocked.Increment(ref _busStartCount) == 1)
{
return Start();
_startBusTask = Start();
}
return _startBusTask;
}
if (eventType == MessageBusLifecycleEventType.Stopping)
{
// The last stopped bus stops this outbox task
if (Interlocked.Decrement(ref _busStartCount) == 0)
{
return Stop();
_stopBusTask = Stop();
}
return _stopBusTask;
}
return Task.CompletedTask;
}

private static async Task MigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
try
{
var outboxMigrationService = serviceProvider.GetRequiredService<IOutboxMigrationService>();
await outboxMigrationService.Migrate(cancellationToken);
}
catch (Exception e)
{
throw new MessageBusException("Outbox schema migration failed", e);
}
}

private Task EnsureMigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
lock (_migrateSchemaTaskLock)
{
// We optimize to ever only run once the schema migration, regardless if it was triggered from 1) bus created lifecycle or 2) outbox sending loop.
return _migrateSchemaTask ??= MigrateSchema(serviceProvider, cancellationToken);
}
}

private async Task Run()
{
try
{
_logger.LogInformation("Outbox loop started");
var scope = _serviceProvider.CreateScope();

try
{
var outboxMigrationService = scope.ServiceProvider.GetRequiredService<IOutboxMigrationService>();
await outboxMigrationService.Migrate(_loopCts.Token);
await EnsureMigrateSchema(scope.ServiceProvider, _loopCts.Token);

var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxRepository>();

Expand Down Expand Up @@ -187,17 +221,17 @@ private async Task<bool> SendMessages(IServiceProvider serviceProvider, IOutboxR
for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++)
{
var outboxMessage = outboxMessages[i];

var now = DateTime.UtcNow;
if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
{
_logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
hasMore = false;
break;
}

var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
if (bus == null)

var now = DateTime.UtcNow;
if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
{
_logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
hasMore = false;
break;
}

var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
if (bus == null)
{
_logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
continue;
Expand Down
38 changes: 21 additions & 17 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,27 @@ protected void OnBuildProvider()
{
ValidationService.AssertSettings();

Build();

if (Settings.AutoStartConsumers)
{
// Fire and forget start
_ = Task.Run(async () =>
{
try
{
await Start().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Could not auto start consumers");
}
Build();

// Notify the bus has been created - before any message can be produced
AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created));

// Auto start consumers if enabled
if (Settings.AutoStartConsumers)
{
// Fire and forget start
_ = Task.Run(async () =>
{
try
{
await Start().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Could not auto start consumers");
}
});
}
}
}

protected virtual void Build()
Expand Down Expand Up @@ -196,7 +200,7 @@ private Dictionary<Type, ProducerSettings> BuildProducerByBaseMessageType()

private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
{
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetService<IEnumerable<IMessageBusLifecycleInterceptor>>();
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices<IMessageBusLifecycleInterceptor>();
if (_lifecycleInterceptors != null)
{
foreach (var i in _lifecycleInterceptors)
Expand Down
10 changes: 10 additions & 0 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,21 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Infrastructure", "Infrastructure", "{137BFD32-CD0A-47CA-8884-209CD49DEE8C}"
ProjectSection(SolutionItems) = preProject
Infrastructure\docker-compose.yml = Infrastructure\docker-compose.yml
..\infrastructure.ps1 = ..\infrastructure.ps1
..\infrastructure.sh = ..\infrastructure.sh
Infrastructure\mosquitto.conf = Infrastructure\mosquitto.conf
Infrastructure\README.md = Infrastructure\README.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{1A71BB05-58ED-4B27-B4A4-A03D9E608C1C}"
ProjectSection(SolutionItems) = preProject
..\build\do_build.ps1 = ..\build\do_build.ps1
..\build\do_package.ps1 = ..\build\do_package.ps1
..\build\do_test.ps1 = ..\build\do_test.ps1
..\build\do_test_ci.ps1 = ..\build\do_test_ci.ps1
..\build\tasks.ps1 = ..\build\tasks.ps1
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
Loading

0 comments on commit e60b21f

Please sign in to comment.