Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support message grouping #141

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType, out var groupId);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId);
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount);

Expand All @@ -40,7 +40,8 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
Priority = priority,
Properties = properties,
MessageDelivery = new MessageDelivery(ConsumerId, messageId),
RoutingType = routingType
RoutingType = routingType,
GroupId = groupId,
};

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
Expand All @@ -61,9 +62,13 @@ private static int DecodeMessageBody(ReadOnlySpan<byte> buffer, out ReadOnlyMemo
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> properties, out RoutingType? routingType)
private static int DecodeProperties(ReadOnlySpan<byte> buffer,
out IReadOnlyDictionary<string, object?> properties,
out RoutingType? routingType,
out string? groupId)
{
routingType = null;
groupId = null;
properties = ReadOnlyDictionary<string, object?>.Empty;

var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
Expand All @@ -84,6 +89,14 @@ private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDict
routingType = (RoutingType) routingTypeByte;
}
}
else if (key == MessageHeaders.GroupId)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type);
if (type == DataConstants.String)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out groupId);
}
}
else
{
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
Expand Down
12 changes: 12 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public int GetRequiredBufferSize()
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.RoutingType);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) RoutingType);
}

if (Message.GroupId != null)
{
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.GroupId);
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(Message.GroupId);
}
}

byteCount += sizeof(bool); // RequiresResponse
Expand Down Expand Up @@ -137,6 +143,12 @@ private int EncodeProperties(Span<byte> buffer)
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.RoutingType);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) RoutingType.Value);
}

if (Message.GroupId != null)
{
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.GroupId);
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), Message.GroupId);
}
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public class Message
/// The message body (payload)
/// </summary>
public ReadOnlyMemory<byte> Body { get; set; }

/// <summary>
/// The Group ID used when sending the message. This is used for message grouping feature. Messages with the same message group
/// are always consumed by the same consumer, even if multiple consumers are listening on the same queue.
/// </summary>
public string? GroupId { get; set; }
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/MessageHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ namespace ActiveMQ.Artemis.Core.Client;
internal static class MessageHeaders
{
public const string RoutingType = "_AMQ_ROUTING_TYPE";
public const string GroupId = "_AMQ_GROUP_ID";
}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public class ReceivedMessage
/// The routing type used when sending the message.
/// </summary>
public required RoutingType? RoutingType { get; init; }

/// <summary>
/// The Group ID used when sending the message.
/// </summary>
public required string? GroupId { get; init; }
}
68 changes: 68 additions & 0 deletions test/ArtemisNetCoreClient.Tests/MessageGroupingSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class MessageGroupingSpec(ITestOutputHelper testOutputHelper)
{
[Fact]
public async Task Should_deliver_messages_with_the_same_GroupId_to_the_same_consumer()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName);

await using var producer = await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName
});

await using var consumer1 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});
await using var consumer2 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});
await using var consumer3 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});

await SendMessagesToGroup(producer, "group1", 5);
await SendMessagesToGroup(producer, "group2", 5);
await SendMessagesToGroup(producer, "group3", 5);

await AssertReceivedAllMessagesWithTheSameGroupId(consumer1, 5);
await AssertReceivedAllMessagesWithTheSameGroupId(consumer2, 5);
await AssertReceivedAllMessagesWithTheSameGroupId(consumer3, 5);
}

private static async Task SendMessagesToGroup(IProducer producer, string groupId, int count)
{
for (int i = 1; i <= count; i++)
{
await producer.SendMessageAsync(new Message
{
GroupId = groupId,
});
}
}

private async Task AssertReceivedAllMessagesWithTheSameGroupId(IConsumer consumer, int count)
{
var messages = new List<ReceivedMessage>();
for (int i = 1; i <= count; i++)
{
var message = await consumer.ReceiveMessageAsync();
messages.Add(message);
}

Assert.Single(messages.GroupBy(x => x.GroupId));
}
}
Loading