diff --git a/src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs b/src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs index 8ec9a5a..7cb303b 100644 --- a/src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs +++ b/src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs @@ -560,8 +560,8 @@ public static int GetNullableObjectByteCount(object? value) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int ReadNullableObject(in ReadOnlySpan source, out object? value) { - var readBytes = ReadByte(source, out var isNotNull); - switch (isNotNull) + var readBytes = ReadByte(source, out var type); + switch (type) { case DataConstants.Null: value = null; @@ -607,7 +607,7 @@ public static int ReadNullableObject(in ReadOnlySpan source, out object? v value = longValue; break; default: - throw new NotSupportedException($"Unsupported object type: {isNotNull}"); + throw new NotSupportedException($"Unsupported object type: {type}"); } return readBytes; diff --git a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs index 224364b..de715f0 100644 --- a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan buffer) readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration); readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp); readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority); - readBytes += DecodeProperties(buffer[readBytes..], out var properties); + readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType); readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId); readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount); @@ -40,6 +40,7 @@ public SessionReceiveMessage(ReadOnlySpan buffer) Priority = priority, Properties = properties, MessageDelivery = new MessageDelivery(ConsumerId, messageId), + RoutingType = routingType }; Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}"); @@ -60,28 +61,39 @@ private static int DecodeMessageBody(ReadOnlySpan buffer, out ReadOnlyMemo } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDictionary value) + private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDictionary properties, out RoutingType? routingType) { + routingType = null; + properties = ReadOnlyDictionary.Empty; + var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull); if (isNotNull == DataConstants.NotNull) { readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count); - var properties = new Dictionary(count); + var mutableProperties = new Dictionary(count); for (var i = 0; i < count; i++) { readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key); - readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj); - properties.Add(key, obj); + + if (key == MessageHeaders.RoutingType) + { + readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type); + if (type == DataConstants.Byte) + { + readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var routingTypeByte); + routingType = (RoutingType) routingTypeByte; + } + } + else + { + readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj); + mutableProperties.Add(key, obj); + } } - value = properties.ToFrozenDictionary(); - - return readBytes; - } - else - { - value = ReadOnlyDictionary.Empty; - return readBytes; + properties = mutableProperties.ToFrozenDictionary(); } + + return readBytes; } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ReceivedMessage.cs b/src/ArtemisNetCoreClient/ReceivedMessage.cs index 7d85311..ba29976 100644 --- a/src/ArtemisNetCoreClient/ReceivedMessage.cs +++ b/src/ArtemisNetCoreClient/ReceivedMessage.cs @@ -50,4 +50,9 @@ public class ReceivedMessage /// even if the message has been disposed or discarded. /// public required MessageDelivery MessageDelivery { get; init; } + + /// + /// The routing type used when sending the message. + /// + public required RoutingType? RoutingType { get; init; } } \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs b/test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs index e1c61d3..bc2ba7b 100644 --- a/test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs @@ -1,4 +1,3 @@ -using ActiveMQ.Artemis.Core.Client.InternalUtilities; using ActiveMQ.Artemis.Core.Client.Tests.Utils; using NScenario; using Xunit; @@ -185,6 +184,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken) Assert.All(messages, message => { Assert.NotNull(message); + Assert.Equal(RoutingType.Anycast, message.RoutingType); Assert.Equal("anycast_msg"u8.ToArray(), message.Body.ToArray()); }); }); @@ -222,6 +222,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken) Assert.All(messages, message => { Assert.NotNull(message); + Assert.Equal(RoutingType.Multicast, message.RoutingType); Assert.Equal("multicast_msg"u8.ToArray(), message.Body.ToArray()); }); });