diff --git a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs index de715f0..e325e97 100644 --- a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan buffer) readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration); readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp); readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority); - readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType); + readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType, out var groupId); readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId); readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount); @@ -40,7 +40,8 @@ public SessionReceiveMessage(ReadOnlySpan buffer) Priority = priority, Properties = properties, MessageDelivery = new MessageDelivery(ConsumerId, messageId), - RoutingType = routingType + RoutingType = routingType, + GroupId = groupId, }; Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}"); @@ -61,9 +62,13 @@ private static int DecodeMessageBody(ReadOnlySpan buffer, out ReadOnlyMemo } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDictionary properties, out RoutingType? routingType) + private static int DecodeProperties(ReadOnlySpan buffer, + out IReadOnlyDictionary properties, + out RoutingType? routingType, + out string? groupId) { routingType = null; + groupId = null; properties = ReadOnlyDictionary.Empty; var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull); @@ -84,6 +89,14 @@ private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDict routingType = (RoutingType) routingTypeByte; } } + else if (key == MessageHeaders.GroupId) + { + readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type); + if (type == DataConstants.String) + { + readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out groupId); + } + } else { readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj); diff --git a/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs b/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs index d6e047f..4ec3b9e 100644 --- a/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs @@ -48,6 +48,12 @@ public int GetRequiredBufferSize() byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.RoutingType); byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) RoutingType); } + + if (Message.GroupId != null) + { + byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.GroupId); + byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(Message.GroupId); + } } byteCount += sizeof(bool); // RequiresResponse @@ -137,6 +143,12 @@ private int EncodeProperties(Span buffer) offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.RoutingType); offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) RoutingType.Value); } + + if (Message.GroupId != null) + { + offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.GroupId); + offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), Message.GroupId); + } } else { diff --git a/src/ArtemisNetCoreClient/Message.cs b/src/ArtemisNetCoreClient/Message.cs index b4d8370..4c34e04 100644 --- a/src/ArtemisNetCoreClient/Message.cs +++ b/src/ArtemisNetCoreClient/Message.cs @@ -44,4 +44,10 @@ public class Message /// The message body (payload) /// public ReadOnlyMemory Body { get; set; } + + /// + /// The Group ID used when sending the message. This is used for message grouping feature. Messages with the same message group + /// are always consumed by the same consumer, even if multiple consumers are listening on the same queue. + /// + public string? GroupId { get; set; } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/MessageHeaders.cs b/src/ArtemisNetCoreClient/MessageHeaders.cs index 147f24f..bc57285 100644 --- a/src/ArtemisNetCoreClient/MessageHeaders.cs +++ b/src/ArtemisNetCoreClient/MessageHeaders.cs @@ -3,4 +3,5 @@ namespace ActiveMQ.Artemis.Core.Client; internal static class MessageHeaders { public const string RoutingType = "_AMQ_ROUTING_TYPE"; + public const string GroupId = "_AMQ_GROUP_ID"; } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ReceivedMessage.cs b/src/ArtemisNetCoreClient/ReceivedMessage.cs index ba29976..0816e1b 100644 --- a/src/ArtemisNetCoreClient/ReceivedMessage.cs +++ b/src/ArtemisNetCoreClient/ReceivedMessage.cs @@ -55,4 +55,9 @@ public class ReceivedMessage /// The routing type used when sending the message. /// public required RoutingType? RoutingType { get; init; } + + /// + /// The Group ID used when sending the message. + /// + public required string? GroupId { get; init; } } \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/MessageGroupingSpec.cs b/test/ArtemisNetCoreClient.Tests/MessageGroupingSpec.cs new file mode 100644 index 0000000..81af624 --- /dev/null +++ b/test/ArtemisNetCoreClient.Tests/MessageGroupingSpec.cs @@ -0,0 +1,68 @@ +using ActiveMQ.Artemis.Core.Client.Tests.Utils; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Artemis.Core.Client.Tests; + +public class MessageGroupingSpec(ITestOutputHelper testOutputHelper) +{ + [Fact] + public async Task Should_deliver_messages_with_the_same_GroupId_to_the_same_consumer() + { + await using var testFixture = await TestFixture.CreateAsync(testOutputHelper); + await using var connection = await testFixture.CreateConnectionAsync(); + await using var session = await connection.CreateSessionAsync(); + + var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast); + var queueName = await testFixture.CreateQueueAsync(addressName); + + await using var producer = await session.CreateProducerAsync(new ProducerConfiguration + { + Address = addressName + }); + + await using var consumer1 = await session.CreateConsumerAsync(new ConsumerConfiguration + { + QueueName = queueName + }); + await using var consumer2 = await session.CreateConsumerAsync(new ConsumerConfiguration + { + QueueName = queueName + }); + await using var consumer3 = await session.CreateConsumerAsync(new ConsumerConfiguration + { + QueueName = queueName + }); + + await SendMessagesToGroup(producer, "group1", 5); + await SendMessagesToGroup(producer, "group2", 5); + await SendMessagesToGroup(producer, "group3", 5); + + await AssertReceivedAllMessagesWithTheSameGroupId(consumer1, 5); + await AssertReceivedAllMessagesWithTheSameGroupId(consumer2, 5); + await AssertReceivedAllMessagesWithTheSameGroupId(consumer3, 5); + } + + private static async Task SendMessagesToGroup(IProducer producer, string groupId, int count) + { + for (int i = 1; i <= count; i++) + { + await producer.SendMessageAsync(new Message + { + GroupId = groupId, + }); + } + } + + private async Task AssertReceivedAllMessagesWithTheSameGroupId(IConsumer consumer, int count) + { + var messages = new List(); + for (int i = 1; i <= count; i++) + { + var message = await consumer.ReceiveMessageAsync(); + messages.Add(message); + } + + Assert.Single(messages.GroupBy(x => x.GroupId)); + } +} \ No newline at end of file