diff --git a/docs/plugin_outbox.md b/docs/plugin_outbox.md index 6f73a48f..46d4b479 100644 --- a/docs/plugin_outbox.md +++ b/docs/plugin_outbox.md @@ -213,4 +213,10 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe - Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table. -- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table. \ No newline at end of file +- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table. + +## Important note + +As the outbox can be processed by instance of the application that did not originally process it, it is important to ensure that all active instances maintian the same message registrations (and compatible JSON schema definitions). + +A message that fails to deserialize will be flagged as invalid by setting the associated `DeliveryAborted` field in the `Outbox` table, to `1`. It is safe to manually reset this field value to `0` once the version incompatibility has been resolved. \ No newline at end of file diff --git a/docs/plugin_outbox.t.md b/docs/plugin_outbox.t.md index 946fb2df..fa7dd5db 100644 --- a/docs/plugin_outbox.t.md +++ b/docs/plugin_outbox.t.md @@ -145,4 +145,10 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe - Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table. -- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table. \ No newline at end of file +- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table. + +## Important note + +As the outbox can be processed by instance of the application that did not originally process it, it is important to ensure that all active instances maintian the same message registrations (and compatible JSON schema definitions). + +A message that fails to deserialize will be flagged as invalid by setting the associated `DeliveryAborted` field in the `Outbox` table, to `1`. It is safe to manually reset this field value to `0` once the version incompatibility has been resolved. \ No newline at end of file diff --git a/src/.editorconfig b/src/.editorconfig index fa0c35ca..693b8598 100644 --- a/src/.editorconfig +++ b/src/.editorconfig @@ -178,6 +178,10 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent dotnet_style_allow_statement_immediately_after_block_experimental = true:silent dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion dotnet_diagnostic.CA1859.severity = silent +dotnet_style_qualification_for_field = false:suggestion +dotnet_style_qualification_for_property = false:suggestion +dotnet_style_qualification_for_method = false:suggestion +dotnet_style_qualification_for_event = false:suggestion [*.{csproj,xml}] indent_style = space diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SlimMessageBus.Host.Outbox.Sql.csproj b/src/SlimMessageBus.Host.Outbox.Sql/SlimMessageBus.Host.Outbox.Sql.csproj index f9b17fb5..364f8ac5 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SlimMessageBus.Host.Outbox.Sql.csproj +++ b/src/SlimMessageBus.Host.Outbox.Sql/SlimMessageBus.Host.Outbox.Sql.csproj @@ -16,4 +16,10 @@ + + + <_Parameter1>SlimMessageBus.Host.Outbox.Sql.Test + + + diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs index d5601b4d..60b827e5 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs @@ -28,7 +28,7 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id; cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = message.Timestamp; cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = message.BusName; - cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = Settings.MessageTypeResolver.ToName(message.MessageType); + cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = message.MessageType; cmd.Parameters.Add("@MessagePayload", SqlDbType.VarBinary).Value = message.MessagePayload; cmd.Parameters.Add("@Headers", SqlDbType.NVarChar).Value = message.Headers != null ? JsonSerializer.Serialize(message.Headers, _jsonOptions) : DBNull.Value; cmd.Parameters.Add("@Path", SqlDbType.NVarChar).Value = message.Path; @@ -51,49 +51,39 @@ public async Task> LockAndSelect(string insta cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize; cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds; - using var reader = await cmd.ExecuteReaderAsync(token); + return await ReadMessages(cmd, token).ConfigureAwait(false); + } - var idOrdinal = reader.GetOrdinal("Id"); - var timestampOrdinal = reader.GetOrdinal("Timestamp"); - var busNameOrdinal = reader.GetOrdinal("BusName"); - var typeOrdinal = reader.GetOrdinal("MessageType"); - var payloadOrdinal = reader.GetOrdinal("MessagePayload"); - var headersOrdinal = reader.GetOrdinal("Headers"); - var pathOrdinal = reader.GetOrdinal("Path"); - var instanceIdOrdinal = reader.GetOrdinal("InstanceId"); - var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId"); - var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn"); - var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt"); - var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete"); - var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted"); + public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken token) + { + if (ids.Count == 0) + { + return; + } - var items = new List(); - while (await reader.ReadAsync(token).ConfigureAwait(false)) + await EnsureConnection(); + + var table = new DataTable(); + table.Columns.Add("Id", typeof(Guid)); + foreach (var guid in ids) { - var id = reader.GetGuid(idOrdinal); - var messageType = reader.GetString(typeOrdinal); - var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal); - var message = new OutboxMessage + table.Rows.Add(guid); + } + + var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, + _sqlTemplate.SqlOutboxMessageAbortDelivery, + cmd => { - Id = id, - Timestamp = reader.GetDateTime(timestampOrdinal), - BusName = reader.GetString(busNameOrdinal), - MessageType = Settings.MessageTypeResolver.ToType(messageType) ?? throw new MessageBusException($"Outbox message with Id {id} - the MessageType {messageType} is not recognized. The type might have been renamed or moved namespaces."), - MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value, - Headers = headers == null ? null : JsonSerializer.Deserialize>(headers, _jsonOptions), - Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal), - InstanceId = reader.GetString(instanceIdOrdinal), - LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal), - LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal), - DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal), - DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal), - DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal) - }; + var param = cmd.Parameters.Add("@Ids", SqlDbType.Structured); + param.TypeName = _sqlTemplate.OutboxIdTypeQualified; + param.Value = table; + }, + token: token); - items.Add(message); + if (affected != ids.Count) + { + throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected"); } - - return items; } public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token) @@ -171,10 +161,11 @@ public async Task DeleteSent(DateTime olderThan, CancellationToken token) { await EnsureConnection(); - var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageDeleteSent, cmd => - { - cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan; - }, token); + var affected = await ExecuteNonQuery( + Settings.SqlSettings.OperationRetry, + _sqlTemplate.SqlOutboxMessageDeleteSent, + cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan, + token); Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected); } @@ -190,4 +181,60 @@ public async Task RenewLock(string instanceId, TimeSpan lockDuration, Canc return await cmd.ExecuteNonQueryAsync(token) > 0; } + + internal async Task> GetAllMessages(CancellationToken cancellationToken) + { + await EnsureConnection(); + + using var cmd = CreateCommand(); + cmd.CommandText = _sqlTemplate.SqlOutboxAllMessages; + + return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false); + } + + private async Task> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken) + { + using var reader = await cmd.ExecuteReaderAsync(cancellationToken); + + var idOrdinal = reader.GetOrdinal("Id"); + var timestampOrdinal = reader.GetOrdinal("Timestamp"); + var busNameOrdinal = reader.GetOrdinal("BusName"); + var typeOrdinal = reader.GetOrdinal("MessageType"); + var payloadOrdinal = reader.GetOrdinal("MessagePayload"); + var headersOrdinal = reader.GetOrdinal("Headers"); + var pathOrdinal = reader.GetOrdinal("Path"); + var instanceIdOrdinal = reader.GetOrdinal("InstanceId"); + var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId"); + var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn"); + var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt"); + var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete"); + var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted"); + + var items = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + var id = reader.GetGuid(idOrdinal); + var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal); + var message = new OutboxMessage + { + Id = id, + Timestamp = reader.GetDateTime(timestampOrdinal), + BusName = reader.GetString(busNameOrdinal), + MessageType = reader.GetString(typeOrdinal), + MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value, + Headers = headers == null ? null : JsonSerializer.Deserialize>(headers, _jsonOptions), + Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal), + InstanceId = reader.GetString(instanceIdOrdinal), + LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal), + LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal), + DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal), + DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal), + DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal) + }; + + items.Add(message); + } + + return items; + } } diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs index 6a62618c..b50d6fa4 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs @@ -11,8 +11,14 @@ public class SqlOutboxTemplate public string SqlOutboxMessageLockTableAndSelect { get; } public string SqlOutboxMessageUpdateSent { get; } public string SqlOutboxMessageIncrementDeliveryAttempt { get; } + public string SqlOutboxMessageAbortDelivery { get; } public string SqlOutboxMessageRenewLock { get; } + /// + /// Used by tests only. + /// + internal string SqlOutboxAllMessages { get; } + public SqlOutboxTemplate(SqlOutboxSettings settings) { OutboxIdTypeQualified = $"[{settings.SqlSettings.DatabaseSchemaName}].[{settings.SqlSettings.DatabaseOutboxTypeName}]"; @@ -114,6 +120,13 @@ SELECT TOP (@BatchSize) Id WHERE [Id] IN (SELECT [Id] from @Ids); """; + SqlOutboxMessageAbortDelivery = $""" + UPDATE {TableNameQualified} + SET [DeliveryAttempt] = DeliveryAttempt + 1, + [DeliveryAborted] = 1 + WHERE [Id] IN (SELECT [Id] from @Ids); + """; + SqlOutboxMessageRenewLock = $""" UPDATE {TableNameQualified} SET LockExpiresOn = DATEADD(SECOND, @LockDuration, GETUTCDATE()) @@ -122,5 +135,22 @@ SELECT TOP (@BatchSize) Id AND DeliveryComplete = 0 AND DeliveryAborted = 0 """; + + SqlOutboxAllMessages = $""" + SELECT Id + , Timestamp + , BusName + , MessageType + , MessagePayload + , Headers + , Path + , InstanceId + , LockInstanceId + , LockExpiresOn + , DeliveryAttempt + , DeliveryComplete + , DeliveryAborted + FROM {TableNameQualified} + """; } } diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs index 9ca5b28e..71c55872 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs @@ -16,7 +16,8 @@ public sealed class OutboxForwardingPublishInterceptor( ILogger logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, - IOutboxNotificationService outboxNotificationService) + IOutboxNotificationService outboxNotificationService, + OutboxSettings outboxSettings) : OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor, IDisposable where T : class { static readonly internal string SkipOutboxHeader = "__SkipOutbox"; @@ -25,6 +26,7 @@ public sealed class OutboxForwardingPublishInterceptor( private readonly IOutboxRepository _outboxRepository = outboxRepository; private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider; private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService; + private readonly OutboxSettings _outboxSettings = outboxSettings; private bool _notifyOutbox = false; @@ -71,12 +73,12 @@ public async Task OnHandle(T message, Func next, IProducerContext context) BusName = busMaster.Name, Headers = context.Headers, Path = context.Path, - MessageType = messageType, + MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType), MessagePayload = messagePayload, InstanceId = _instanceIdProvider.GetInstanceId() }; await _outboxRepository.Save(outboxMessage, context.CancellationToken); - + // a message was sent, notify outbox service to poll on dispose (post transaction) _notifyOutbox = true; } diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs index c0798d26..a8772916 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs @@ -4,6 +4,7 @@ public interface IOutboxRepository { Task Save(OutboxMessage message, CancellationToken token); Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token); + Task AbortDelivery (IReadOnlyCollection ids, CancellationToken token); Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token); Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token); Task DeleteSent(DateTime olderThan, CancellationToken token); diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs index 80bf6f56..01aedcdb 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs @@ -5,7 +5,7 @@ public class OutboxMessage public Guid Id { get; set; } = Guid.NewGuid(); public DateTime Timestamp { get; set; } = DateTime.UtcNow; public string BusName { get; set; } - public Type MessageType { get; set; } + public string MessageType { get; set; } public byte[] MessagePayload { get; set; } public string Path { get; set; } public IDictionary Headers { get; set; } diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index 50d52ea0..aef24589 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -72,7 +72,6 @@ protected Task Start() _loopCts = new CancellationTokenSource(); _loopTask = Run(); - } return Task.CompletedTask; } @@ -208,10 +207,8 @@ private async Task Run() } } - internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) + async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken) { - const int defaultBatchSize = 50; - var lockDuration = TimeSpan.FromSeconds(Math.Min(Math.Max(_outboxSettings.LockExpiration.TotalSeconds, 5), 30)); if (lockDuration != _outboxSettings.LockExpiration) { @@ -231,7 +228,7 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR var messageBus = serviceProvider.GetRequiredService(); var lockRenewalTimerFactory = serviceProvider.GetRequiredService(); - using var lockRenewalTimer = lockRenewalTimerFactory.CreateRenewalTimer(lockDuration, lockInterval, ex => { cts.Cancel(); }, cts.Token); + using var lockRenewalTimer = lockRenewalTimerFactory.CreateRenewalTimer(lockDuration, lockInterval, _ => cts.Cancel(), cts.Token); var compositeMessageBus = messageBus as ICompositeMessageBus; var messageBusTarget = messageBus as IMessageBusTarget; @@ -242,67 +239,93 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR { _asyncManualResetEvent.Reset(); lockRenewalTimer.Start(); + var outboxMessages = await outboxRepository.LockAndSelect(lockRenewalTimer.InstanceId, _outboxSettings.PollBatchSize, _outboxSettings.MaintainSequence, lockRenewalTimer.LockDuration, cts.Token); - runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize; - foreach (var group in outboxMessages.GroupBy(x => x.BusName)) + var result = await ProcessMessages(outboxRepository, outboxMessages, compositeMessageBus, messageBusTarget, cts.Token); + runAgain = result.RunAgain; + count += result.Count; + + lockRenewalTimer.Stop(); + } while (!cts.Token.IsCancellationRequested && runAgain); + + return count; + } + + async internal Task<(bool RunAgain, int Count)> ProcessMessages(IOutboxRepository outboxRepository, IReadOnlyCollection outboxMessages, ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, CancellationToken cancellationToken) + { + const int defaultBatchSize = 50; + + var runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize; + var count = 0; + + var abortedIds = new List(_outboxSettings.PollBatchSize); + foreach (var busGroup in outboxMessages.GroupBy(x => x.BusName)) + { + var busName = busGroup.Key; + var bus = GetBus(compositeMessageBus, messageBusTarget, busName); + var bulkProducer = bus as IMessageBusBulkProducer; + if (bus == null || bulkProducer == null) { - var busName = group.Key; - var bus = GetBus(compositeMessageBus, messageBusTarget, busName); - var bulkProducer = bus as IMessageBusBulkProducer; - if (bus == null || bulkProducer == null) + foreach (var outboxMessage in busGroup) { - var ids = new List(_outboxSettings.PollBatchSize); - foreach (var outboxMessage in group) + if (bus == null) { - if (bus == null) - { - _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to pathGroup {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); - } - else - { - _logger.LogWarning("Bus provider for the outbox message with Id {MessageId} of type {MessageType} to pathGroup {Path} using {BusName} bus does not support bulk processing. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); - } - - ids.Add(outboxMessage.Id); + _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); + } + else + { + _logger.LogWarning("Bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus does not support bulk processing. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); } - await outboxRepository.IncrementDeliveryAttempt(ids, _outboxSettings.MaxDeliveryAttempts, cts.Token); - runAgain = true; - continue; + abortedIds.Add(outboxMessage.Id); } - foreach (var pathGroup in group.GroupBy(x => x.Path)) + continue; + } + + foreach (var pathGroup in busGroup.GroupBy(x => x.Path)) + { + if (cancellationToken.IsCancellationRequested) { - if (cts.Token.IsCancellationRequested) - { - break; - } + break; + } - var path = pathGroup.Key; - var batches = pathGroup.Select( + var path = pathGroup.Key; + var batches = pathGroup.Select( outboxMessage => { - var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload); - return new OutboxBulkMessage(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary()); + var messageType = _outboxSettings.MessageTypeResolver.ToType(outboxMessage.MessageType); + if (messageType == null) + { + abortedIds.Add(outboxMessage.Id); + _logger.LogError("Outbox message with Id {id} - the MessageType {messageType} is not recognized. The type might have been renamed or moved namespaces.", outboxMessage.Id, outboxMessage.MessageType); + return null; + } + + var message = bus.Serializer.Deserialize(messageType, outboxMessage.MessagePayload); + return new OutboxBulkMessage(outboxMessage.Id, message, messageType, outboxMessage.Headers ?? new Dictionary()); }) - .Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize); + .Where(x => x != null) + .Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize); - foreach (var batch in batches) - { - var (Success, PublishedCount) = await DispatchBatchAsync(outboxRepository, bulkProducer, messageBusTarget, batch, busName, path, cts.Token); - runAgain |= !Success; - count += PublishedCount; - } + foreach (var batch in batches) + { + var result = await DispatchBatch(outboxRepository, bulkProducer, messageBusTarget, batch, busName, path, cancellationToken); + runAgain |= !result.Success; + count += result.Published; } } + } - lockRenewalTimer.Stop(); - } while (!cts.Token.IsCancellationRequested && runAgain); + if (abortedIds.Count > 0) + { + await outboxRepository.AbortDelivery(abortedIds, cancellationToken); + } - return count; + return (runAgain, count); } - internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) + async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) { _logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName); diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index b3980f1f..9927c82e 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -262,7 +262,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{1A71BB05 ..\build\tasks.ps1 = ..\build\tasks.ps1 EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SecretStore.Test", "Tests\SecretStore.Test\SecretStore.Test.csproj", "{969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecretStore.Test", "Tests\SecretStore.Test\SecretStore.Test.csproj", "{969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Outbox.Sql.Test", "Tests\SlimMessageBus.Host.Outbox.Sql.Test\SlimMessageBus.Host.Outbox.Sql.Test.csproj", "{CDF578D6-FE85-4A44-A99A-32490F047FDA}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Nats", "SlimMessageBus.Host.Nats\SlimMessageBus.Host.Nats.csproj", "{57290E47-603D-46D0-BF13-AC1D6481380A}" EndProject @@ -803,6 +805,14 @@ Global {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|Any CPU.Build.0 = Release|Any CPU {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|x86.ActiveCfg = Release|Any CPU {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9}.Release|x86.Build.0 = Release|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Debug|x86.ActiveCfg = Debug|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Debug|x86.Build.0 = Debug|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Release|Any CPU.Build.0 = Release|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Release|x86.ActiveCfg = Release|Any CPU + {CDF578D6-FE85-4A44-A99A-32490F047FDA}.Release|x86.Build.0 = Release|Any CPU {57290E47-603D-46D0-BF13-AC1D6481380A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {57290E47-603D-46D0-BF13-AC1D6481380A}.Debug|Any CPU.Build.0 = Debug|Any CPU {57290E47-603D-46D0-BF13-AC1D6481380A}.Debug|x86.ActiveCfg = Debug|Any CPU @@ -904,6 +914,7 @@ Global {DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {AD05234E-A925-44C0-977E-FEAC2A75B98C} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {969AAB37-AEFC-40F9-9F89-B4B5E45E13C9} = {D3D6FD9A-968A-45BB-86C7-4527C72A057E} + {CDF578D6-FE85-4A44-A99A-32490F047FDA} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {57290E47-603D-46D0-BF13-AC1D6481380A} = {9291D340-B4FA-44A3-8060-C14743FB1712} {9C464F95-B620-4BDF-B9AC-D95C465D9793} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {46C40625-D1AC-4EA1-9562-4F1837D417CE} = {A5B15524-93B8-4CCE-AC6D-A22984498BA0} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs new file mode 100644 index 00000000..cb51dc21 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs @@ -0,0 +1,79 @@ +namespace SlimMessageBus.Host.Outbox.Sql.Test; + +public class BaseSqlOutboxRepositoryTest : BaseSqlTest +{ + protected readonly Fixture _fixture = new(); + + protected SqlConnection _connection; + protected SqlOutboxMigrationService _migrationService; + protected SqlOutboxSettings _settings; + protected SqlOutboxRepository _target; + protected SqlOutboxTemplate _template; + protected ISqlTransactionService _transactionService; + + public override async Task InitializeAsync() + { + await base.InitializeAsync(); + + _settings = new SqlOutboxSettings(); + _connection = new SqlConnection(GetConnectionString()); + _transactionService = new SqlTransactionService(_connection, _settings.SqlSettings); + _template = new SqlOutboxTemplate(_settings); + _target = new SqlOutboxRepository(NullLogger.Instance, _settings, _template, _connection, _transactionService); + _migrationService = new SqlOutboxMigrationService(NullLogger.Instance, _target, _transactionService, _settings); + + await _migrationService.Migrate(CancellationToken.None); + } + + public override Task DisposeAsync() + { + _connection.Dispose(); + return base.DisposeAsync(); + } + + protected async Task> SeedOutbox(int count, Action action = null, CancellationToken cancellationToken = default) + { + var messages = CreateOutboxMessages(count); + for (var i = 0; i < messages.Count; i++) + { + var message = messages[i]; + action?.Invoke(i, message); + await _target.Save(message, cancellationToken); + } + + return messages; + } + + protected IReadOnlyList CreateOutboxMessages(int count) + { + return Enumerable + .Range(0, count) + .Select(_ => + { + // Create a sample object for MessagePayload + var samplePayload = new { Key = _fixture.Create(), Number = _fixture.Create() }; + var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(samplePayload); + + // Generate Headers dictionary with simple types + var headers = new Dictionary + { + { "Header1", _fixture.Create() }, + { "Header2", _fixture.Create() }, + { "Header3", _fixture.Create() } + }; + + // Configure fixture to use the generated values + _fixture.Customize(om => om + .With(x => x.MessagePayload, jsonPayload) + .With(x => x.Headers, headers) + .With(x => x.LockExpiresOn, DateTime.MinValue) + .With(x => x.LockInstanceId, string.Empty) + .With(x => x.DeliveryAborted, false) + .With(x => x.DeliveryAttempt, 0) + .With(x => x.DeliveryComplete, false)); + + return _fixture.Create(); + }) + .ToList(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlTest.cs new file mode 100644 index 00000000..3298807b --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlTest.cs @@ -0,0 +1,29 @@ +namespace SlimMessageBus.Host.Outbox.Sql.Test; + +public abstract class BaseSqlTest : IAsyncLifetime +{ + private readonly MsSqlContainer _sqlContainer; + + protected BaseSqlTest() + { + _sqlContainer = new MsSqlBuilder() + .WithImage("mcr.microsoft.com/mssql/server:2022-CU13-ubuntu-22.04") + .WithAutoRemove(true) + .Build(); + } + + public virtual async Task DisposeAsync() + { + await _sqlContainer.DisposeAsync(); + } + + public virtual async Task InitializeAsync() + { + await _sqlContainer.StartAsync(); + } + + protected string GetConnectionString() + { + return _sqlContainer.GetConnectionString(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/GlobalUsings.cs new file mode 100644 index 00000000..dc6a3e07 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/GlobalUsings.cs @@ -0,0 +1,15 @@ +global using System.Text.Json; +global using System.Threading.Tasks; + +global using AutoFixture; + +global using FluentAssertions; + +global using Microsoft.Data.SqlClient; +global using Microsoft.Extensions.Logging.Abstractions; + +global using SlimMessageBus.Host.Sql.Common; + +global using Testcontainers.MsSql; + +global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj new file mode 100644 index 00000000..e128dafb --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs new file mode 100644 index 00000000..381e1199 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs @@ -0,0 +1,259 @@ +namespace SlimMessageBus.Host.Outbox.Sql.Test; + +public static class SqlOutboxRepositoryTests +{ + public class SaveTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task SavedMessage_IsPersisted() + { + // arrange + var message = CreateOutboxMessages(1).Single(); + + // act + await _target.Save(message, CancellationToken.None); + var actual = await _target.GetAllMessages(CancellationToken.None); + + // assert + actual.Count.Should().Be(1); + actual.Single().Should().BeEquivalentTo(message); + } + } + + public class AbortDeliveryTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task ShouldUpdateStatus() + { + // arrange + var seed = await SeedOutbox(5); + var expected = seed.Select(x => x.Id).Take(3).ToList(); + + // act + await _target.AbortDelivery(expected, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + var actual = messages.Where(x => x.DeliveryAborted).Select(x => x.Id).ToList(); + actual.Should().BeEquivalentTo(expected); + } + } + + public class DeleteSentTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task ExpiredItems_AreDeleted() + { + // arrange + var active = new DateTime(2000, 1, 1); + var expired = active.AddDays(-1); + + var seedMessages = await SeedOutbox(10, (i, x) => + { + x.DeliveryAttempt = 1; + x.DeliveryComplete = true; + x.Timestamp = i < 5 ? expired : active; + }); + + // act + await _target.DeleteSent(active, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => x.Timestamp == active); + } + } + + public class LockAndSelectTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task TableLock_RestrictsConcurrentLocks() + { + const int batchSize = 10; + + const string instance1 = "1"; + const string instance2 = "2"; + + await SeedOutbox(batchSize * 2); + + var items1 = await _target.LockAndSelect(instance1, batchSize, true, TimeSpan.FromMinutes(1), CancellationToken.None); + var items2 = await _target.LockAndSelect(instance2, batchSize, true, TimeSpan.FromMinutes(1), CancellationToken.None); + + items1.Count.Should().Be(batchSize); + items2.Count.Should().Be(0); + } + + [Fact] + public async Task NoTableLock_AllowsConcurrentLocks() + { + const int batchSize = 10; + + const string instance1 = "1"; + const string instance2 = "2"; + + await SeedOutbox(batchSize * 2); + + var items1 = await _target.LockAndSelect(instance1, batchSize, false, TimeSpan.FromMinutes(1), CancellationToken.None); + var items2 = await _target.LockAndSelect(instance2, batchSize, false, TimeSpan.FromMinutes(1), CancellationToken.None); + + items1.Count.Should().Be(batchSize); + items2.Count.Should().Be(batchSize); + } + + [Fact] + public async Task AbortedMessages_AreNotIncluded() + { + // arrange + var seed = await SeedOutbox(5); + var abortedIds = seed.Select(x => x.Id).Take(3).ToList(); + + await _target.AbortDelivery(abortedIds, CancellationToken.None); + + // act + var actual = await _target.LockAndSelect("123", 10, false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Select(x => x.Id).Should().NotContain(abortedIds); + } + + [Fact] + public async Task SentMessages_AreNotIncluded() + { + // arrange + var seed = await SeedOutbox(5); + var sentIds = seed.Select(x => x.Id).Take(3).ToList(); + + await _target.UpdateToSent(sentIds, CancellationToken.None); + + // act + var actual = await _target.LockAndSelect("123", 10, false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Select(x => x.Id).Should().NotContain(sentIds); + } + } + + public class IncrementDeliveryAttemptTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task WithinMaxAttempts_DoesNotAbortDelivery() + { + // arrange + const int maxAttempts = 2; + var seed = await SeedOutbox(5); + var ids = seed.Select(x => x.Id).Take(3).ToList(); + + // act + await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => !x.DeliveryComplete); + messages.Should().OnlyContain(x => !x.DeliveryAborted); + messages.Where(x => !ids.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 0); + messages.Where(x => ids.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 1); + } + + [Fact] + public async Task BreachingMaxAttempts_AbortsDelivery() + { + // arrange + const int maxAttempts = 1; + var seed = await SeedOutbox(5); + var ids = seed.Select(x => x.Id).Take(3).ToList(); + + // act + await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); + await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => !x.DeliveryComplete); + + var attempted = messages.Where(x => ids.Contains(x.Id)).ToList(); + attempted.Should().OnlyContain(x => x.DeliveryAttempt == 2); + attempted.Should().OnlyContain(x => x.DeliveryAborted); + + var notAttempted = messages.Where(x => !ids.Contains(x.Id)).ToList(); + notAttempted.Should().OnlyContain(x => x.DeliveryAttempt == 0); + notAttempted.Should().OnlyContain(x => !x.DeliveryAborted); + } + } + + public class UpdateToSentTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task ShouldUpdateStatus() + { + // arrange + var seed = await SeedOutbox(5); + var expected = seed.Select(x => x.Id).Take(3).ToList(); + + // act + await _target.UpdateToSent(expected, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + var actual = messages.Where(x => x.DeliveryComplete).Select(x => x.Id).ToList(); + actual.Should().BeEquivalentTo(expected); + } + } + + public class RenewLockTests : BaseSqlOutboxRepositoryTest + { + [Fact] + public async Task WithinLock_ExtendsLockTimeout() + { + // arrange + const int batchSize = 10; + const string instanceId = "1"; + await SeedOutbox(batchSize); + + var lockedItems = await _target.LockAndSelect(instanceId, batchSize, true, TimeSpan.FromSeconds(10), CancellationToken.None); + var lockedIds = lockedItems.Select(x => x.Id).ToList(); + + var before = await _target.GetAllMessages(CancellationToken.None); + var originalLock = before.Min(x => x.LockExpiresOn); + + // act + await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + var after = await _target.GetAllMessages(CancellationToken.None); + var actual = after.Where(x => lockedIds.Contains(x.Id)); + + actual.Should().OnlyContain(x => x.LockExpiresOn > originalLock); + } + + [Fact] + public async Task HasLockedItemsToRenew_ReturnsTrue() + { + // arrange + const int batchSize = 10; + const string instanceId = "1"; + await SeedOutbox(batchSize); + + await _target.LockAndSelect(instanceId, batchSize, true, TimeSpan.FromSeconds(10), CancellationToken.None); + + // act + var actual = await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Should().BeTrue(); + } + + [Fact] + public async Task HasNoLockedItemsToRenew_ReturnsFalse() + { + // arrange + const string instanceId = "1"; + await SeedOutbox(10); + + // act + var actual = await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Should().BeFalse(); + } + } +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Usings.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalUsings.cs similarity index 50% rename from src/Tests/SlimMessageBus.Host.Outbox.Test/Usings.cs rename to src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalUsings.cs index 422195b1..e9767d66 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Usings.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/GlobalUsings.cs @@ -1,8 +1,12 @@ global using FluentAssertions; + global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Logging.Abstractions; + global using Moq; -global using Xunit; -global using Xunit.Abstractions; global using SlimMessageBus.Host.Interceptor; -global using SlimMessageBus.Host.Outbox.Services; \ No newline at end of file +global using SlimMessageBus.Host.Outbox.Services; +global using SlimMessageBus.Host.Serialization; + +global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs index 605676f0..b13f9dd8 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxForwardingPublishInterceptorTests.cs @@ -17,9 +17,10 @@ public void OutboxForwardingPublisher_MustBeLastInPipeline() var mockOutboxRepository = new Mock(); var mockInstanceIdProvider = new Mock(); var mockOutboxNotificationService = new Mock(); + var mockOutboxSettings = new Mock(); // act - var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxRepository.Object, mockInstanceIdProvider.Object, mockOutboxNotificationService.Object); + var target = new OutboxForwardingPublishInterceptor(mockLogger.Object, mockOutboxRepository.Object, mockInstanceIdProvider.Object, mockOutboxNotificationService.Object, mockOutboxSettings.Object); var actual = target.Order; // assert @@ -45,6 +46,7 @@ public class OnHandleTests private readonly Mock _mockSerializer; private readonly Mock _mockMasterMessageBus; private readonly Mock _mockOutboxNotificationService; + private readonly Mock _mockOutboxSettings; private Mock _mockTargetBus; private Mock _mockProducerContext; @@ -54,6 +56,7 @@ public OnHandleTests() _mockOutboxRepository = new Mock(); _mockInstanceIdProvider = new Mock(); _mockOutboxNotificationService = new Mock(); + _mockOutboxSettings = new Mock(); _mockSerializer = new Mock(); @@ -82,7 +85,7 @@ public async Task SkipOutboxHeader_IsPresent_PromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); @@ -111,7 +114,7 @@ public async Task SkipOutboxHeader_IsNotPresent_DoNotPromoteToNext() }; // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), next, _mockProducerContext.Object); target.Dispose(); @@ -129,7 +132,7 @@ public async Task SkipOutboxHeader_IsPresent_DoNotRaiseOutboxNotification() _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); @@ -149,7 +152,7 @@ public async Task SkipOutboxHeader_IsNotPresent_RaiseOutboxNotification() _mockOutboxNotificationService.Setup(x => x.Notify()).Verifiable(); // act - var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object); + var target = new OutboxForwardingPublishInterceptor(_mockLogger.Object, _mockOutboxRepository.Object, _mockInstanceIdProvider.Object, _mockOutboxNotificationService.Object, _mockOutboxSettings.Object); await target.OnHandle(new object(), () => Task.CompletedTask, _mockProducerContext.Object); target.Dispose(); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs deleted file mode 100644 index ab2466e5..00000000 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs +++ /dev/null @@ -1,88 +0,0 @@ - -namespace SlimMessageBus.Host.Outbox.Test.Interceptors; - -using Microsoft.Extensions.Logging.Abstractions; - -using static SlimMessageBus.Host.Outbox.Services.OutboxSendingTask; - -public class OutboxSendingTaskTests -{ - private readonly ILoggerFactory _loggerFactory; - private readonly Mock _outboxRepositoryMock; - private readonly Mock _producerMock; - private readonly Mock _messageBusTargetMock; - private readonly OutboxSettings _outboxSettings; - private readonly IServiceProvider _serviceProvider; - private readonly OutboxSendingTask _sut; - - public OutboxSendingTaskTests() - { - _outboxRepositoryMock = new Mock(); - _producerMock = new Mock(); - _messageBusTargetMock = new Mock(); - _outboxSettings = new OutboxSettings { MaxDeliveryAttempts = 5 }; - _serviceProvider = Mock.Of(); - _loggerFactory = new NullLoggerFactory(); - - _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, _serviceProvider); - } - - [Fact] - public async Task DispatchBatchAsync_ShouldReturnSuccess_WhenAllMessagesArePublished() - { - var batch = new List - { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) - }.AsReadOnly(); - - var results = new ProduceToTransportBulkResult(batch, null); - - _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(results); - - var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); - - success.Should().BeTrue(); - published.Should().Be(batch.Count); - } - - [Fact] - public async Task DispatchBatchAsync_ShouldReturnFailure_WhenNotAllMessagesArePublished() - { - var batch = new List - { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) - }.AsReadOnly(); - - var results = new ProduceToTransportBulkResult([batch.First()], null); - - _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(results); - - var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); - - success.Should().BeFalse(); - published.Should().Be(1); - } - - [Fact] - public async Task DispatchBatchAsync_ShouldIncrementDeliveryAttempts_WhenNotAllMessagesArePublished() - { - var batch = new List - { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) - }.AsReadOnly(); - - var results = new ProduceToTransportBulkResult([batch.First()], null); - - _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(results); - - await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); - - _outboxRepositoryMock.Verify(x => x.IncrementDeliveryAttempt(It.Is>(ids => ids.Contains(batch[1].Id)), _outboxSettings.MaxDeliveryAttempts, CancellationToken.None), Times.Once); - } -} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs new file mode 100644 index 00000000..aa2a71e2 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs @@ -0,0 +1,256 @@ +namespace SlimMessageBus.Host.Outbox.Test.Services; + +using static SlimMessageBus.Host.Outbox.Services.OutboxSendingTask; + +public sealed class OutboxSendingTaskTests +{ + public class DispatchBatchTests + { + private readonly ILoggerFactory _loggerFactory; + private readonly Mock _outboxRepositoryMock; + private readonly Mock _producerMock; + private readonly Mock _messageBusTargetMock; + private readonly OutboxSettings _outboxSettings; + private readonly IServiceProvider _serviceProvider; + private readonly OutboxSendingTask _sut; + + public DispatchBatchTests() + { + _outboxRepositoryMock = new Mock(); + _producerMock = new Mock(); + _messageBusTargetMock = new Mock(); + _outboxSettings = new OutboxSettings { MaxDeliveryAttempts = 5 }; + _serviceProvider = Mock.Of(); + _loggerFactory = new NullLoggerFactory(); + + _sut = new OutboxSendingTask(_loggerFactory, _outboxSettings, _serviceProvider); + } + + [Fact] + public async Task DispatchBatch_ShouldReturnSuccess_WhenAllMessagesArePublished() + { + var batch = new List + { + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + }.AsReadOnly(); + + var results = new ProduceToTransportBulkResult(batch, null); + + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(results); + + var (success, published) = await _sut.DispatchBatch(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); + + success.Should().BeTrue(); + published.Should().Be(batch.Count); + } + + [Fact] + public async Task DispatchBatch_ShouldReturnFailure_WhenNotAllMessagesArePublished() + { + var batch = new List + { + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + }.AsReadOnly(); + + var results = new ProduceToTransportBulkResult([batch.First()], null); + + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(results); + + var (success, published) = await _sut.DispatchBatch(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); + + success.Should().BeFalse(); + published.Should().Be(1); + } + + [Fact] + public async Task DispatchBatch_ShouldIncrementDeliveryAttempts_WhenNotAllMessagesArePublished() + { + var batch = new List + { + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + }.AsReadOnly(); + + var results = new ProduceToTransportBulkResult([batch.First()], null); + + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(results); + + await _sut.DispatchBatch(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); + + _outboxRepositoryMock.Verify(x => x.IncrementDeliveryAttempt(It.Is>(ids => ids.Contains(batch[1].Id)), _outboxSettings.MaxDeliveryAttempts, CancellationToken.None), Times.Once); + } + } + + public class ProcessMessagesTests + { + private readonly Mock _mockOutboxRepository; + private readonly Mock _mockCompositeMessageBus; + private readonly Mock _mockMessageBusTarget; + private readonly Mock _mockMasterMessageBus; + private readonly Mock _mockMessageBusBulkProducer; + private readonly Mock> _mockLogger; + private readonly OutboxSettings _outboxSettings; + private readonly OutboxSendingTask _sut; + + public ProcessMessagesTests() + { + _mockOutboxRepository = new Mock(); + _mockCompositeMessageBus = new Mock(); + _mockMessageBusTarget = new Mock(); + _mockMasterMessageBus = new Mock(); + _mockMessageBusBulkProducer = _mockMasterMessageBus.As(); + _mockLogger = new Mock>(); + + _outboxSettings = new OutboxSettings + { + PollBatchSize = 50, + MessageTypeResolver = new Mock().Object + }; + + _sut = new OutboxSendingTask(NullLoggerFactory.Instance, _outboxSettings, null); + } + + [Fact] + public async Task ProcessMessages_ShouldReturnCorrectValues_WhenOutboxMessagesProcessedSuccessfully() + { + // Arrange + var outboxMessages = CreateOutboxMessages(30); + var cancellationToken = CancellationToken.None; + + _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); + _mockMessageBusBulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns(10); + _mockMasterMessageBus.Setup(x => x.Serializer).Returns(new Mock().Object); + + _mockMessageBusBulkProducer.Setup(x => x.ProduceToTransportBulk(It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync((IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) => new ProduceToTransportBulkResult(envelopes, null)); + + var mockMessageTypeResolver = new Mock(); + mockMessageTypeResolver.Setup(x => x.ToType(It.IsAny())).Returns(typeof(object)); + _outboxSettings.MessageTypeResolver = mockMessageTypeResolver.Object; + + // Act + var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, cancellationToken); + + // Assert + result.RunAgain.Should().BeFalse(); + result.Count.Should().Be(30); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Exactly(3)); + } + + [Fact] + public async Task ProcessMessages_ShouldReturnRunAgainTrue_WhenOutboxMessagesCountEqualsPollBatchSize() + { + // Arrange + var outboxMessages = CreateOutboxMessages(50); + var cancellationToken = CancellationToken.None; + + _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); + _mockMessageBusBulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns(10); + _mockMasterMessageBus.Setup(x => x.Serializer).Returns(new Mock().Object); + + _mockMessageBusBulkProducer.Setup(x => x.ProduceToTransportBulk(It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync((IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) => new ProduceToTransportBulkResult(envelopes, null)); + + var mockMessageTypeResolver = new Mock(); + mockMessageTypeResolver.Setup(x => x.ToType(It.IsAny())).Returns(typeof(object)); + _outboxSettings.MessageTypeResolver = mockMessageTypeResolver.Object; + + // Act + var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, cancellationToken); + + // Assert + result.RunAgain.Should().BeTrue(); + result.Count.Should().Be(50); + } + + [Fact] + public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised() + { + // Arrange + const int MessageCount = 10; + + var outboxMessages = CreateOutboxMessages(MessageCount); + outboxMessages[0].BusName = null; + outboxMessages[7].BusName = null; + + var knownBusCount = outboxMessages.Count(x => x.BusName != null); + + _mockMessageBusTarget.SetupGet(x => x.Target).Returns((IMessageBusProducer)null); + + _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); + _mockMessageBusBulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns(10); + _mockMasterMessageBus.Setup(x => x.Serializer).Returns(new Mock().Object); + + _mockMessageBusBulkProducer.Setup(x => x.ProduceToTransportBulk(It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync((IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) => new ProduceToTransportBulkResult(envelopes, null)); + + var mockMessageTypeResolver = new Mock(); + mockMessageTypeResolver.Setup(x => x.ToType(It.IsAny())).Returns((string type) => typeof(object)); + _outboxSettings.MessageTypeResolver = mockMessageTypeResolver.Object; + + // Act + var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, CancellationToken.None); + + // Assert + _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); + result.RunAgain.Should().BeFalse(); + result.Count.Should().Be(knownBusCount); + } + + [Fact] + public async Task ProcessMessages_ShouldAbortDelivery_WhenMessageTypeIsNotRecognized() + { + // Arrange + const string UnknownMessageType = "Unknown"; + const int MessageCount = 10; + + var outboxMessages = CreateOutboxMessages(MessageCount); + outboxMessages[0].MessageType = UnknownMessageType; + outboxMessages[7].MessageType = UnknownMessageType; + + var knownMessageCount = outboxMessages.Count(x => !x.MessageType.Equals(UnknownMessageType)); + + _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); + _mockMessageBusBulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns(10); + _mockMasterMessageBus.Setup(x => x.Serializer).Returns(new Mock().Object); + + _mockMessageBusBulkProducer.Setup(x => x.ProduceToTransportBulk(It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync((IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) => new ProduceToTransportBulkResult(envelopes, null)); + + var mockMessageTypeResolver = new Mock(); + mockMessageTypeResolver.Setup(x => x.ToType(It.IsAny())).Returns((string type) => type == UnknownMessageType ? (Type)null : typeof(object)); + _outboxSettings.MessageTypeResolver = mockMessageTypeResolver.Object; + + // Act + var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, CancellationToken.None); + + // Assert + _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); + result.RunAgain.Should().BeFalse(); + result.Count.Should().Be(knownMessageCount); + } + + private static List CreateOutboxMessages(int count) + { + return Enumerable + .Range(0, count) + .Select( + _ => new OutboxMessage + { + Id = Guid.NewGuid(), + MessageType = "TestType", + MessagePayload = [], + BusName = "TestBus", + Path = "TestPath" + }) + .ToList(); + } + } +} \ No newline at end of file