From ff4673932d69f2b3c7c8b345a295fccbe753de87 Mon Sep 17 00:00:00 2001 From: Havret Date: Wed, 19 Jun 2024 23:37:56 +0200 Subject: [PATCH] Do not mutate message object on send --- src/ArtemisNetCoreClient/AnonymousProducer.cs | 23 +++++++------------ .../Extensions/AnonymousProducerExtensions.cs | 2 +- .../Framing/Outgoing/SessionSendMessage.cs | 20 ++++++++-------- .../IAnonymousProducer.cs | 2 +- src/ArtemisNetCoreClient/IProducer.cs | 2 +- src/ArtemisNetCoreClient/Message.cs | 7 ------ src/ArtemisNetCoreClient/Producer.cs | 22 +++++++----------- src/ArtemisNetCoreClient/Session.cs | 8 +++++-- .../MessageFlowSpec.cs | 4 ++-- 9 files changed, 38 insertions(+), 52 deletions(-) diff --git a/src/ArtemisNetCoreClient/AnonymousProducer.cs b/src/ArtemisNetCoreClient/AnonymousProducer.cs index 7095b09..73310a5 100644 --- a/src/ArtemisNetCoreClient/AnonymousProducer.cs +++ b/src/ArtemisNetCoreClient/AnonymousProducer.cs @@ -11,23 +11,16 @@ public ValueTask DisposeAsync() public void SendMessage(string address, RoutingType? routingType, Message message) { - message.Address = address; - if (routingType != null) - { - message.RoutingType = routingType; - } - - session.SendMessage(message: message, producerId: ProducerId); + session.SendMessage(message: message, address: address, routingType: routingType, producerId: ProducerId); } - public async ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default) + public Task SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default) { - message.Address = address; - if (routingType != null) - { - message.RoutingType = routingType; - } - - await session.SendMessageAsync(message: message, producerId: ProducerId, cancellationToken: cancellationToken); + return session.SendMessageAsync(message: message, + address: address, + routingType: routingType, + producerId: ProducerId, + cancellationToken: cancellationToken + ); } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Extensions/AnonymousProducerExtensions.cs b/src/ArtemisNetCoreClient/Extensions/AnonymousProducerExtensions.cs index 6400953..457e465 100644 --- a/src/ArtemisNetCoreClient/Extensions/AnonymousProducerExtensions.cs +++ b/src/ArtemisNetCoreClient/Extensions/AnonymousProducerExtensions.cs @@ -39,7 +39,7 @@ public static void SendMessage(this IAnonymousProducer anonymousProducer, string /// The address to which the message should be sent. /// The message to send. /// The cancellation token. - public static ValueTask SendMessageAsync(this IAnonymousProducer anonymousProducer, string address, Message message, CancellationToken cancellationToken = default) + public static Task SendMessageAsync(this IAnonymousProducer anonymousProducer, string address, Message message, CancellationToken cancellationToken = default) { return anonymousProducer.SendMessageAsync(address, routingType: null, message, cancellationToken); } diff --git a/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs b/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs index 09f20cf..d6e047f 100644 --- a/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs @@ -11,7 +11,9 @@ namespace ActiveMQ.Artemis.Core.Client.Framing.Outgoing; public required bool RequiresResponse { get; init; } public required long CorrelationId { get; init; } public required int ProducerId { get; init; } - + public required string Address { get; init; } + public required RoutingType? RoutingType { get; init; } + public int GetRequiredBufferSize() { int byteCount = 0; @@ -20,7 +22,7 @@ public int GetRequiredBufferSize() byteCount += sizeof(int); // Message body length byteCount += Message.Body.Length; // Actual message body length byteCount += sizeof(long); // MessageId - byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(Message.Address); + byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(Address); byteCount += ArtemisBinaryConverter.GetNullableGuidByteCount(Message.UserId); byteCount += sizeof(byte); // Type byteCount += sizeof(bool); // Durable @@ -29,7 +31,7 @@ public int GetRequiredBufferSize() byteCount += sizeof(byte); // Priority byteCount += sizeof(byte); // Properties nullability - if (Message.Properties?.Count > 0 || Message.RoutingType.HasValue) + if (Message.Properties?.Count > 0 || RoutingType.HasValue) { byteCount += sizeof(int); // Properties count if (Message.Properties != null) @@ -41,10 +43,10 @@ public int GetRequiredBufferSize() } } - if (Message.RoutingType.HasValue) + if (RoutingType.HasValue) { byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.RoutingType); - byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) Message.RoutingType); + byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) RoutingType); } } @@ -93,7 +95,7 @@ private int EncodeHeaders(Span buffer) var offset = 0; offset += ArtemisBinaryConverter.WriteInt64(ref buffer.GetReference(), Message.MessageId); - offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), Message.Address); + offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), Address); offset += ArtemisBinaryConverter.WriteNullableGuid(ref buffer.GetOffset(offset), Message.UserId); offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetOffset(offset), Message.Type); offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), Message.Durable); @@ -110,7 +112,7 @@ private int EncodeProperties(Span buffer) var offset = 0; var propertiesCount = Message.Properties?.Count ?? 0; - if (Message.RoutingType.HasValue) + if (RoutingType.HasValue) { propertiesCount++; } @@ -129,11 +131,11 @@ private int EncodeProperties(Span buffer) } } - if (Message.RoutingType.HasValue) + if (RoutingType.HasValue) { // TODO: Maybe we can cache this string? offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.RoutingType); - offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) Message.RoutingType.Value); + offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) RoutingType.Value); } } else diff --git a/src/ArtemisNetCoreClient/IAnonymousProducer.cs b/src/ArtemisNetCoreClient/IAnonymousProducer.cs index 20a1555..fdfa82e 100644 --- a/src/ArtemisNetCoreClient/IAnonymousProducer.cs +++ b/src/ArtemisNetCoreClient/IAnonymousProducer.cs @@ -40,5 +40,5 @@ public interface IAnonymousProducer : IAsyncDisposable /// /// The message to send. /// The cancellation token. - ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default); + Task SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/IProducer.cs b/src/ArtemisNetCoreClient/IProducer.cs index c1d8c68..89f1547 100644 --- a/src/ArtemisNetCoreClient/IProducer.cs +++ b/src/ArtemisNetCoreClient/IProducer.cs @@ -29,5 +29,5 @@ public interface IProducer : IAsyncDisposable /// confirms the persistence of the message. For non-durable messages, the completion of the task /// indicates that the broker has received the message. /// - ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default); + Task SendMessageAsync(Message message, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Message.cs b/src/ArtemisNetCoreClient/Message.cs index 6e57227..b4d8370 100644 --- a/src/ArtemisNetCoreClient/Message.cs +++ b/src/ArtemisNetCoreClient/Message.cs @@ -7,8 +7,6 @@ namespace ActiveMQ.Artemis.Core.Client; public class Message { public long MessageId { get; set; } - - internal string? Address { get; set; } public Guid? UserId { get; set; } @@ -36,11 +34,6 @@ public class Message // TODO: Enum? public byte Priority { get; set; } - - /// - /// Gets or sets the Routing Type for this message. Ensures that this message is only routed to queues with matching routing type. - /// - internal RoutingType? RoutingType { get; set; } /// /// The message properties diff --git a/src/ArtemisNetCoreClient/Producer.cs b/src/ArtemisNetCoreClient/Producer.cs index 90a4074..940b6ab 100644 --- a/src/ArtemisNetCoreClient/Producer.cs +++ b/src/ArtemisNetCoreClient/Producer.cs @@ -13,22 +13,16 @@ public ValueTask DisposeAsync() public void SendMessage(Message message) { - message.Address = Address; - if (RoutingType.HasValue) - { - message.RoutingType = RoutingType.Value; - } - session.SendMessage(message: message, producerId: ProducerId); + session.SendMessage(message: message, address: Address, routingType: RoutingType, producerId: ProducerId); } - public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken) + public Task SendMessageAsync(Message message, CancellationToken cancellationToken) { - message.Address = Address; - if (RoutingType.HasValue) - { - message.RoutingType = RoutingType.Value; - } - - await session.SendMessageAsync(message: message, producerId: ProducerId, cancellationToken: cancellationToken); + return session.SendMessageAsync(message: message, + address: Address, + routingType: RoutingType, + producerId: ProducerId, + cancellationToken: cancellationToken + ); } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index b5e7e65..7e84f74 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -373,7 +373,7 @@ internal ValueTask RemoveProducerAsync(int producerId) return ValueTask.CompletedTask; } - internal void SendMessage(Message message, int producerId) + internal void SendMessage(Message message, string address, RoutingType? routingType, int producerId) { var request = new SessionSendMessage { @@ -381,11 +381,13 @@ internal void SendMessage(Message message, int producerId) ProducerId = producerId, RequiresResponse = false, CorrelationId = -1, + Address = address, + RoutingType = routingType }; connection.Send(request, ChannelId); } - internal async ValueTask SendMessageAsync(Message message, int producerId, CancellationToken cancellationToken) + internal async Task SendMessageAsync(Message message, string address, RoutingType? routingType, int producerId, CancellationToken cancellationToken) { var request = new SessionSendMessage { @@ -393,6 +395,8 @@ internal async ValueTask SendMessageAsync(Message message, int producerId, Cance ProducerId = producerId, RequiresResponse = true, CorrelationId = _correlationIdGenerator.GenerateId(), + Address = address, + RoutingType = routingType }; try { diff --git a/test/ArtemisNetCoreClient.Tests/MessageFlowSpec.cs b/test/ArtemisNetCoreClient.Tests/MessageFlowSpec.cs index bc951a8..a1268e0 100644 --- a/test/ArtemisNetCoreClient.Tests/MessageFlowSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/MessageFlowSpec.cs @@ -46,7 +46,7 @@ public async Task Should_receive_messages_in_the_same_order_as_they_were_sent() Address = addressName }, testFixture.CancellationToken); - var tasks = new List(numberOfMessages); + var tasks = new List(numberOfMessages); for (int i = 0; i < numberOfMessages; i++) { tasks.Add(producer.SendMessageAsync(new Message @@ -58,7 +58,7 @@ public async Task Should_receive_messages_in_the_same_order_as_they_were_sent() }, testFixture.CancellationToken)); } - await Task.WhenAll(tasks.Select(t => t.AsTask())); + await Task.WhenAll(tasks); }); var messages = await consumedMessagesTask.WaitAsync(testFixture.CancellationToken);