From 1473c2caeb68aeb004b1e290951d539a7ef1352f Mon Sep 17 00:00:00 2001 From: Havret Date: Thu, 2 May 2024 20:48:21 +0200 Subject: [PATCH] Support Message Properties --- src/ArtemisNetCoreClient/Consumer.cs | 2 +- .../Exceptions/ActiveMQException.cs | 9 ++ .../Framing/ActiveMQExceptionMessage.cs | 28 ++++--- src/ArtemisNetCoreClient/Framing/Message.cs | 2 +- .../Framing/SessionReceiveMessage.cs | 14 +++- .../Framing/SessionSendMessage.cs | 32 +++++++- src/ArtemisNetCoreClient/IConsumer.cs | 2 +- src/ArtemisNetCoreClient/PacketType.cs | 1 + src/ArtemisNetCoreClient/Session.cs | 14 ++++ .../ConsumerSpec.cs | 2 +- .../MessagePropertiesSpec.cs | 82 +++++++++++++++++++ 11 files changed, 165 insertions(+), 23 deletions(-) create mode 100644 src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs create mode 100644 test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs diff --git a/src/ArtemisNetCoreClient/Consumer.cs b/src/ArtemisNetCoreClient/Consumer.cs index a6e9f25..4c9a3c1 100644 --- a/src/ArtemisNetCoreClient/Consumer.cs +++ b/src/ArtemisNetCoreClient/Consumer.cs @@ -32,7 +32,7 @@ public async ValueTask DisposeAsync() await _session.CloseConsumer(ConsumerId); } - public async ValueTask ReceiveAsync(CancellationToken cancellationToken) + public async ValueTask ReceiveMessageAsync(CancellationToken cancellationToken) { return await _reader.ReadAsync(cancellationToken); } diff --git a/src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs b/src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs new file mode 100644 index 0000000..39ad7ab --- /dev/null +++ b/src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs @@ -0,0 +1,9 @@ +namespace ActiveMQ.Artemis.Core.Client.Exceptions; + +public class ActiveMQException : Exception +{ + public ActiveMQException(int code, string message) : base(message) + { + // TODO: Handle Exception Code as Exception Type + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs index 1652daf..5ebe15a 100644 --- a/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs @@ -1,19 +1,23 @@ +using System.Diagnostics; + namespace ActiveMQ.Artemis.Core.Client.Framing; -internal class ActiveMQExceptionMessage : Packet +internal readonly struct ActiveMQExceptionMessage : IIncomingPacket { - public const byte Type = 20; - - public int Code { get; private set; } - public string? Message { get; set; } + public readonly int Code; + public readonly string? Message; + public readonly long CorrelationId; - public virtual void Encode(ByteBuffer buffer) - { - } - - public virtual void Decode(ByteBuffer buffer) + public ActiveMQExceptionMessage(ReadOnlySpan buffer) { - Code = buffer.ReadInt(); - Message = buffer.ReadNullableString(); + var readBytes = 0; + readBytes += ArtemisBinaryConverter.ReadInt32(buffer, out Code); + readBytes += ArtemisBinaryConverter.ReadNullableString(buffer[readBytes..], out Message); + if (buffer.Length - readBytes >= sizeof(long)) + { + readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out CorrelationId); + } + + Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}"); } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Message.cs b/src/ArtemisNetCoreClient/Framing/Message.cs index 3ec79be..4520d06 100644 --- a/src/ArtemisNetCoreClient/Framing/Message.cs +++ b/src/ArtemisNetCoreClient/Framing/Message.cs @@ -10,7 +10,7 @@ public class Message /// /// The message properties /// - public IDictionary Properties { get; set; } + public IDictionary Properties { get; set; } /// /// The message body (payload) diff --git a/src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs index ee2dc15..1046b6a 100644 --- a/src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs @@ -71,17 +71,25 @@ private static int DecodeHeaders(ReadOnlySpan buffer, out Headers value) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int DecodeProperties(ReadOnlySpan buffer, out IDictionary value) + private static int DecodeProperties(ReadOnlySpan buffer, out IDictionary value) { var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull); if (isNotNull == DataConstants.NotNull) { - value = new Dictionary(); + readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count); + value = new Dictionary(count); + for (var i = 0; i < count; i++) + { + readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key); + readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj); + value.Add(key, obj); + } + return readBytes; } else { - value = ReadOnlyDictionary.Empty; + value = ReadOnlyDictionary.Empty; return readBytes; } } diff --git a/src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs index 655c83f..9188df2 100644 --- a/src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs @@ -27,7 +27,18 @@ public int GetRequiredBufferSize() byteCount += sizeof(long); // Expiration byteCount += sizeof(long); // Timestamp byteCount += sizeof(byte); // Priority - byteCount += sizeof(byte); // Properties size nullability + + byteCount += sizeof(byte); // Properties nullability + if (Message.Properties.Count > 0) + { + byteCount += sizeof(int); // Properties count + foreach (var (key, value) in Message.Properties) + { + byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(key); + byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount(value); + } + } + byteCount += sizeof(bool); // RequiresResponse byteCount += sizeof(long); // CorrelationId byteCount += sizeof(int); // ProducerId @@ -85,11 +96,24 @@ private int EncodeHeaders(Span buffer) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int EncodeProperties(Span buffer) + private int EncodeProperties(Span buffer) { var offset = 0; - - offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null); + + if (Message.Properties.Count == 0) + { + offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null); + } + else + { + offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.NotNull); + offset += ArtemisBinaryConverter.WriteInt32(ref buffer.GetOffset(offset), Message.Properties.Count); + foreach (var (key, value) in Message.Properties) + { + offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), key); + offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), value); + } + } return offset; } diff --git a/src/ArtemisNetCoreClient/IConsumer.cs b/src/ArtemisNetCoreClient/IConsumer.cs index 672951e..b8892f4 100644 --- a/src/ArtemisNetCoreClient/IConsumer.cs +++ b/src/ArtemisNetCoreClient/IConsumer.cs @@ -4,5 +4,5 @@ namespace ActiveMQ.Artemis.Core.Client; public interface IConsumer : IAsyncDisposable { - ValueTask ReceiveAsync(CancellationToken cancellationToken); + ValueTask ReceiveMessageAsync(CancellationToken cancellationToken); } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/PacketType.cs b/src/ArtemisNetCoreClient/PacketType.cs index 360dff4..11acb80 100644 --- a/src/ArtemisNetCoreClient/PacketType.cs +++ b/src/ArtemisNetCoreClient/PacketType.cs @@ -21,4 +21,5 @@ internal enum PacketType : byte SessionSendMessage = 71, SessionConsumerFlowCreditMessage = 70, SessionReceiveMessage = 75, + Exception = 20, } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index 2cf32d7..8b6cbb1 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using ActiveMQ.Artemis.Core.Client.Exceptions; using ActiveMQ.Artemis.Core.Client.Framing; using Microsoft.Extensions.Logging; @@ -416,6 +417,19 @@ public void OnPacket(in InboundPacket packet) break; } + case PacketType.Exception: + { + var message = new ActiveMQExceptionMessage(packet.Payload); + if (_completionSources.TryRemove(message.CorrelationId, out var tcs)) + { + tcs.TrySetException(new ActiveMQException(message.Code, message.Message ?? "ActiveMQ Exception with no message received")); + } + else + { + _logger.LogError("Received exception message with code {Code} and message {Message}", message.Code, message.Message); + } + break; + } default: { _logger.LogWarning("Received unexpected packet type {PacketType}", packet.PacketType); diff --git a/test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs b/test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs index 5823e79..986e905 100644 --- a/test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs @@ -47,7 +47,7 @@ await producer.SendMessageAsync(new Message }, testFixture.CancellationToken); // Act - var message = await consumer.ReceiveAsync(testFixture.CancellationToken); + var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken); // Assert Assert.Equal("test_payload"u8.ToArray(), message.Body.ToArray()); diff --git a/test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs b/test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs new file mode 100644 index 0000000..7acabf9 --- /dev/null +++ b/test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs @@ -0,0 +1,82 @@ +using ActiveMQ.Artemis.Core.Client.Framing; +using ActiveMQ.Artemis.Core.Client.Tests.Utils; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Artemis.Core.Client.Tests; + +public class MessagePropertiesSpec(ITestOutputHelper testOutputHelper) +{ + [Fact] + public async Task Should_send_and_receive_message_with_properties() + { + // Arrange + await using var testFixture = await TestFixture.CreateAsync(testOutputHelper); + await using var connection = await testFixture.CreateConnectionAsync(); + await using var session = await connection.CreateSessionAsync(); + + var addressName = Guid.NewGuid().ToString(); + await session.CreateAddressAsync(addressName, new [] { RoutingType.Multicast }, testFixture.CancellationToken); + await using var producer = await session.CreateProducerAsync(new ProducerConfiguration + { + Address = addressName + }, testFixture.CancellationToken); + // create queue + var queueName = Guid.NewGuid().ToString(); + await session.CreateQueueAsync(new QueueConfiguration + { + Address = addressName, + RoutingType = RoutingType.Multicast, + Name = queueName + }, testFixture.CancellationToken); + + await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration + { + QueueName = queueName, + }, testFixture.CancellationToken); + + // Act + await producer.SendMessageAsync(new Message + { + Body = "test_payload"u8.ToArray(), + Headers = new Headers + { + Address = addressName + }, + Properties = new Dictionary + { + ["null_property"] = null, + ["bool_property_true"] = true, + ["bool_property_false"] = false, + ["byte_property"] = (byte)42, + ["bytes_property"] = new byte[] { 1, 2, 3 }, + ["short_property"] = (short)42, + ["int_property"] = 43, + ["long_property"] = 44L, + ["float_property"] = 45.1F, + ["double_property"] = 46.2D, + ["string_property"] = "string_value", + ["char_property"] = 'c', + } + }, testFixture.CancellationToken); + + + var receivedMessage = await consumer.ReceiveMessageAsync(testFixture.CancellationToken); + + // Assert + Assert.NotNull(receivedMessage); + Assert.Equal(12, receivedMessage.Properties.Count); + Assert.Null(receivedMessage.Properties["null_property"]); + Assert.True((bool)receivedMessage.Properties["bool_property_true"]!); + Assert.False((bool)receivedMessage.Properties["bool_property_false"]!); + Assert.Equal((byte)42, (byte)receivedMessage.Properties["byte_property"]!); + Assert.Equal([1, 2, 3], (byte[])receivedMessage.Properties["bytes_property"]!); + Assert.Equal((short)42, (short)receivedMessage.Properties["short_property"]!); + Assert.Equal(43, (int)receivedMessage.Properties["int_property"]!); + Assert.Equal(44L, (long)receivedMessage.Properties["long_property"]!); + Assert.Equal(45.1F, (float)receivedMessage.Properties["float_property"]!); + Assert.Equal(46.2D, (double)receivedMessage.Properties["double_property"]!); + Assert.Equal("string_value", (string)receivedMessage.Properties["string_property"]!); + Assert.Equal('c', (char)receivedMessage.Properties["char_property"]!); + } +} \ No newline at end of file