Skip to content

Commit

Permalink
Parse routing type for the ReceivedMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Jul 16, 2024
1 parent cbc4764 commit 10c2bc2
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
6 changes: 3 additions & 3 deletions src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ public static int GetNullableObjectByteCount(object? value)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int ReadNullableObject(in ReadOnlySpan<byte> source, out object? value)
{
var readBytes = ReadByte(source, out var isNotNull);
switch (isNotNull)
var readBytes = ReadByte(source, out var type);
switch (type)
{
case DataConstants.Null:
value = null;
Expand Down Expand Up @@ -607,7 +607,7 @@ public static int ReadNullableObject(in ReadOnlySpan<byte> source, out object? v
value = longValue;
break;
default:
throw new NotSupportedException($"Unsupported object type: {isNotNull}");
throw new NotSupportedException($"Unsupported object type: {type}");
}

return readBytes;
Expand Down
38 changes: 25 additions & 13 deletions src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority);
readBytes += DecodeProperties(buffer[readBytes..], out var properties);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId);
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount);

Expand All @@ -40,6 +40,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
Priority = priority,
Properties = properties,
MessageDelivery = new MessageDelivery(ConsumerId, messageId),
RoutingType = routingType
};

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
Expand All @@ -60,28 +61,39 @@ private static int DecodeMessageBody(ReadOnlySpan<byte> buffer, out ReadOnlyMemo
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> value)
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> properties, out RoutingType? routingType)
{
routingType = null;
properties = ReadOnlyDictionary<string, object?>.Empty;

var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
if (isNotNull == DataConstants.NotNull)
{
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count);
var properties = new Dictionary<string, object?>(count);
var mutableProperties = new Dictionary<string, object?>(count);
for (var i = 0; i < count; i++)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key);
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
properties.Add(key, obj);

if (key == MessageHeaders.RoutingType)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type);
if (type == DataConstants.Byte)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var routingTypeByte);
routingType = (RoutingType) routingTypeByte;
}
}
else
{
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
mutableProperties.Add(key, obj);
}
}

value = properties.ToFrozenDictionary();

return readBytes;
}
else
{
value = ReadOnlyDictionary<string, object?>.Empty;
return readBytes;
properties = mutableProperties.ToFrozenDictionary();
}

return readBytes;
}
}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public class ReceivedMessage
/// even if the message has been disposed or discarded.
/// </summary>
public required MessageDelivery MessageDelivery { get; init; }

/// <summary>
/// The routing type used when sending the message.
/// </summary>
public required RoutingType? RoutingType { get; init; }
}
3 changes: 2 additions & 1 deletion test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using ActiveMQ.Artemis.Core.Client.InternalUtilities;
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using NScenario;
using Xunit;
Expand Down Expand Up @@ -185,6 +184,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
Assert.All(messages, message =>
{
Assert.NotNull(message);
Assert.Equal(RoutingType.Anycast, message.RoutingType);
Assert.Equal("anycast_msg"u8.ToArray(), message.Body.ToArray());
});
});
Expand Down Expand Up @@ -222,6 +222,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
Assert.All(messages, message =>
{
Assert.NotNull(message);
Assert.Equal(RoutingType.Multicast, message.RoutingType);
Assert.Equal("multicast_msg"u8.ToArray(), message.Body.ToArray());
});
});
Expand Down

0 comments on commit 10c2bc2

Please sign in to comment.