Skip to content

Commit

Permalink
#284 Abort unknown message types
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa authored and zarusz committed Aug 4, 2024
1 parent 8bbbf3a commit feb75cb
Show file tree
Hide file tree
Showing 20 changed files with 897 additions and 191 deletions.
8 changes: 7 additions & 1 deletion docs/plugin_outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,10 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.

## Important note

As the outbox can be processed by instance of the application that did not originally process it, it is important to ensure that all active instances maintian the same message registrations (and compatible JSON schema definitions).

A message that fails to deserialize will be flagged as invalid by setting the associated `DeliveryAborted` field in the `Outbox` table, to `1`. It is safe to manually reset this field value to `0` once the version incompatibility has been resolved.
8 changes: 7 additions & 1 deletion docs/plugin_outbox.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,10 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.

## Important note

As the outbox can be processed by instance of the application that did not originally process it, it is important to ensure that all active instances maintian the same message registrations (and compatible JSON schema definitions).

A message that fails to deserialize will be flagged as invalid by setting the associated `DeliveryAborted` field in the `Outbox` table, to `1`. It is safe to manually reset this field value to `0` once the version incompatibility has been resolved.
4 changes: 4 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_diagnostic.CA1859.severity = silent
dotnet_style_qualification_for_field = false:suggestion
dotnet_style_qualification_for_property = false:suggestion
dotnet_style_qualification_for_method = false:suggestion
dotnet_style_qualification_for_event = false:suggestion

[*.{csproj,xml}]
indent_style = space
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>SlimMessageBus.Host.Outbox.Sql.Test</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

</Project>
131 changes: 89 additions & 42 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo
cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id;
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = message.Timestamp;
cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = message.BusName;
cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = Settings.MessageTypeResolver.ToName(message.MessageType);
cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = message.MessageType;
cmd.Parameters.Add("@MessagePayload", SqlDbType.VarBinary).Value = message.MessagePayload;
cmd.Parameters.Add("@Headers", SqlDbType.NVarChar).Value = message.Headers != null ? JsonSerializer.Serialize(message.Headers, _jsonOptions) : DBNull.Value;
cmd.Parameters.Add("@Path", SqlDbType.NVarChar).Value = message.Path;
Expand All @@ -51,49 +51,39 @@ public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string insta
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

using var reader = await cmd.ExecuteReaderAsync(token);
return await ReadMessages(cmd, token).ConfigureAwait(false);
}

var idOrdinal = reader.GetOrdinal("Id");
var timestampOrdinal = reader.GetOrdinal("Timestamp");
var busNameOrdinal = reader.GetOrdinal("BusName");
var typeOrdinal = reader.GetOrdinal("MessageType");
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
var headersOrdinal = reader.GetOrdinal("Headers");
var pathOrdinal = reader.GetOrdinal("Path");
var instanceIdOrdinal = reader.GetOrdinal("InstanceId");
var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId");
var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn");
var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt");
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");
public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken token)
{
if (ids.Count == 0)
{
return;
}

var items = new List<OutboxMessage>();
while (await reader.ReadAsync(token).ConfigureAwait(false))
await EnsureConnection();

var table = new DataTable();
table.Columns.Add("Id", typeof(Guid));
foreach (var guid in ids)
{
var id = reader.GetGuid(idOrdinal);
var messageType = reader.GetString(typeOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
table.Rows.Add(guid);
}

var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageAbortDelivery,
cmd =>
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = Settings.MessageTypeResolver.ToType(messageType) ?? throw new MessageBusException($"Outbox message with Id {id} - the MessageType {messageType} is not recognized. The type might have been renamed or moved namespaces."),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
};
var param = cmd.Parameters.Add("@Ids", SqlDbType.Structured);
param.TypeName = _sqlTemplate.OutboxIdTypeQualified;
param.Value = table;
},
token: token);

items.Add(message);
if (affected != ids.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
}

return items;
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
Expand Down Expand Up @@ -171,10 +161,11 @@ public async Task DeleteSent(DateTime olderThan, CancellationToken token)
{
await EnsureConnection();

var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageDeleteSent, cmd =>
{
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan;
}, token);
var affected = await ExecuteNonQuery(
Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageDeleteSent,
cmd => cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan,
token);

Logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected);
}
Expand All @@ -190,4 +181,60 @@ public async Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, Canc

return await cmd.ExecuteNonQueryAsync(token) > 0;
}

internal async Task<IReadOnlyCollection<OutboxMessage>> GetAllMessages(CancellationToken cancellationToken)
{
await EnsureConnection();

using var cmd = CreateCommand();
cmd.CommandText = _sqlTemplate.SqlOutboxAllMessages;

return await ReadMessages(cmd, cancellationToken).ConfigureAwait(false);
}

private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand cmd, CancellationToken cancellationToken)
{
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

var idOrdinal = reader.GetOrdinal("Id");
var timestampOrdinal = reader.GetOrdinal("Timestamp");
var busNameOrdinal = reader.GetOrdinal("BusName");
var typeOrdinal = reader.GetOrdinal("MessageType");
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
var headersOrdinal = reader.GetOrdinal("Headers");
var pathOrdinal = reader.GetOrdinal("Path");
var instanceIdOrdinal = reader.GetOrdinal("InstanceId");
var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId");
var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn");
var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt");
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");

var items = new List<OutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
};

items.Add(message);
}

return items;
}
}
30 changes: 30 additions & 0 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ public class SqlOutboxTemplate
public string SqlOutboxMessageLockTableAndSelect { get; }
public string SqlOutboxMessageUpdateSent { get; }
public string SqlOutboxMessageIncrementDeliveryAttempt { get; }
public string SqlOutboxMessageAbortDelivery { get; }
public string SqlOutboxMessageRenewLock { get; }

/// <summary>
/// Used by tests only.
/// </summary>
internal string SqlOutboxAllMessages { get; }

public SqlOutboxTemplate(SqlOutboxSettings settings)
{
OutboxIdTypeQualified = $"[{settings.SqlSettings.DatabaseSchemaName}].[{settings.SqlSettings.DatabaseOutboxTypeName}]";
Expand Down Expand Up @@ -114,6 +120,13 @@ SELECT TOP (@BatchSize) Id
WHERE [Id] IN (SELECT [Id] from @Ids);
""";

SqlOutboxMessageAbortDelivery = $"""
UPDATE {TableNameQualified}
SET [DeliveryAttempt] = DeliveryAttempt + 1,
[DeliveryAborted] = 1
WHERE [Id] IN (SELECT [Id] from @Ids);
""";

SqlOutboxMessageRenewLock = $"""
UPDATE {TableNameQualified}
SET LockExpiresOn = DATEADD(SECOND, @LockDuration, GETUTCDATE())
Expand All @@ -122,5 +135,22 @@ SELECT TOP (@BatchSize) Id
AND DeliveryComplete = 0
AND DeliveryAborted = 0
""";

SqlOutboxAllMessages = $"""
SELECT Id
, Timestamp
, BusName
, MessageType
, MessagePayload
, Headers
, Path
, InstanceId
, LockInstanceId
, LockExpiresOn
, DeliveryAttempt
, DeliveryComplete
, DeliveryAborted
FROM {TableNameQualified}
""";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public sealed class OutboxForwardingPublishInterceptor<T>(
ILogger<OutboxForwardingPublishInterceptor> logger,
IOutboxRepository outboxRepository,
IInstanceIdProvider instanceIdProvider,
IOutboxNotificationService outboxNotificationService)
IOutboxNotificationService outboxNotificationService,
OutboxSettings outboxSettings)
: OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor<T>, IDisposable where T : class
{
static readonly internal string SkipOutboxHeader = "__SkipOutbox";
Expand All @@ -25,6 +26,7 @@ public sealed class OutboxForwardingPublishInterceptor<T>(
private readonly IOutboxRepository _outboxRepository = outboxRepository;
private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider;
private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService;
private readonly OutboxSettings _outboxSettings = outboxSettings;

private bool _notifyOutbox = false;

Expand Down Expand Up @@ -71,12 +73,12 @@ public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
BusName = busMaster.Name,
Headers = context.Headers,
Path = context.Path,
MessageType = messageType,
MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType),
MessagePayload = messagePayload,
InstanceId = _instanceIdProvider.GetInstanceId()
};
await _outboxRepository.Save(outboxMessage, context.CancellationToken);

// a message was sent, notify outbox service to poll on dispose (post transaction)
_notifyOutbox = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public interface IOutboxRepository
{
Task Save(OutboxMessage message, CancellationToken token);
Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token);
Task AbortDelivery (IReadOnlyCollection<Guid> ids, CancellationToken token);
Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token);
Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token);
Task DeleteSent(DateTime olderThan, CancellationToken token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public class OutboxMessage
public Guid Id { get; set; } = Guid.NewGuid();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public string BusName { get; set; }
public Type MessageType { get; set; }
public string MessageType { get; set; }
public byte[] MessagePayload { get; set; }
public string Path { get; set; }
public IDictionary<string, object> Headers { get; set; }
Expand Down
Loading

0 comments on commit feb75cb

Please sign in to comment.