Skip to content

Commit

Permalink
Support Message Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed May 2, 2024
1 parent f7f5205 commit 1473c2c
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async ValueTask DisposeAsync()
await _session.CloseConsumer(ConsumerId);
}

public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken)
public async ValueTask<Message> ReceiveMessageAsync(CancellationToken cancellationToken)
{
return await _reader.ReadAsync(cancellationToken);
}
Expand Down
9 changes: 9 additions & 0 deletions src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace ActiveMQ.Artemis.Core.Client.Exceptions;

public class ActiveMQException : Exception
{
public ActiveMQException(int code, string message) : base(message)
{
// TODO: Handle Exception Code as Exception Type
}
}
28 changes: 16 additions & 12 deletions src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
using System.Diagnostics;

namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class ActiveMQExceptionMessage : Packet
internal readonly struct ActiveMQExceptionMessage : IIncomingPacket
{
public const byte Type = 20;

public int Code { get; private set; }
public string? Message { get; set; }
public readonly int Code;
public readonly string? Message;
public readonly long CorrelationId;

public virtual void Encode(ByteBuffer buffer)
{
}

public virtual void Decode(ByteBuffer buffer)
public ActiveMQExceptionMessage(ReadOnlySpan<byte> buffer)
{
Code = buffer.ReadInt();
Message = buffer.ReadNullableString();
var readBytes = 0;
readBytes += ArtemisBinaryConverter.ReadInt32(buffer, out Code);
readBytes += ArtemisBinaryConverter.ReadNullableString(buffer[readBytes..], out Message);
if (buffer.Length - readBytes >= sizeof(long))
{
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out CorrelationId);
}

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
}
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Framing/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class Message
/// <summary>
/// The message properties
/// </summary>
public IDictionary<string, object> Properties { get; set; }
public IDictionary<string, object?> Properties { get; set; }

Check warning on line 13 in src/ArtemisNetCoreClient/Framing/Message.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Properties' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 13 in src/ArtemisNetCoreClient/Framing/Message.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Properties' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

/// <summary>
/// The message body (payload)
Expand Down
14 changes: 11 additions & 3 deletions src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,25 @@ private static int DecodeHeaders(ReadOnlySpan<byte> buffer, out Headers value)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IDictionary<string, object> value)
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IDictionary<string, object?> value)
{
var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
if (isNotNull == DataConstants.NotNull)
{
value = new Dictionary<string, object>();
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count);
value = new Dictionary<string, object?>(count);
for (var i = 0; i < count; i++)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key);
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
value.Add(key, obj);
}

return readBytes;
}
else
{
value = ReadOnlyDictionary<string, object>.Empty;
value = ReadOnlyDictionary<string, object?>.Empty;
return readBytes;
}
}
Expand Down
32 changes: 28 additions & 4 deletions src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@ public int GetRequiredBufferSize()
byteCount += sizeof(long); // Expiration
byteCount += sizeof(long); // Timestamp
byteCount += sizeof(byte); // Priority
byteCount += sizeof(byte); // Properties size nullability

byteCount += sizeof(byte); // Properties nullability
if (Message.Properties.Count > 0)
{
byteCount += sizeof(int); // Properties count
foreach (var (key, value) in Message.Properties)
{
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(key);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount(value);
}
}

byteCount += sizeof(bool); // RequiresResponse
byteCount += sizeof(long); // CorrelationId
byteCount += sizeof(int); // ProducerId
Expand Down Expand Up @@ -85,11 +96,24 @@ private int EncodeHeaders(Span<byte> buffer)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int EncodeProperties(Span<byte> buffer)
private int EncodeProperties(Span<byte> buffer)
{
var offset = 0;

offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null);

if (Message.Properties.Count == 0)
{
offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null);
}
else
{
offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.NotNull);
offset += ArtemisBinaryConverter.WriteInt32(ref buffer.GetOffset(offset), Message.Properties.Count);
foreach (var (key, value) in Message.Properties)
{
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), key);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), value);
}
}

return offset;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ namespace ActiveMQ.Artemis.Core.Client;

public interface IConsumer : IAsyncDisposable
{
ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken);
ValueTask<Message> ReceiveMessageAsync(CancellationToken cancellationToken);
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/PacketType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ internal enum PacketType : byte
SessionSendMessage = 71,
SessionConsumerFlowCreditMessage = 70,
SessionReceiveMessage = 75,
Exception = 20,
}
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Exceptions;
using ActiveMQ.Artemis.Core.Client.Framing;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -416,6 +417,19 @@ public void OnPacket(in InboundPacket packet)

break;
}
case PacketType.Exception:
{
var message = new ActiveMQExceptionMessage(packet.Payload);
if (_completionSources.TryRemove(message.CorrelationId, out var tcs))
{
tcs.TrySetException(new ActiveMQException(message.Code, message.Message ?? "ActiveMQ Exception with no message received"));
}
else
{
_logger.LogError("Received exception message with code {Code} and message {Message}", message.Code, message.Message);
}
break;
}
default:
{
_logger.LogWarning("Received unexpected packet type {PacketType}", packet.PacketType);
Expand Down
2 changes: 1 addition & 1 deletion test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ await producer.SendMessageAsync(new Message
}, testFixture.CancellationToken);

// Act
var message = await consumer.ReceiveAsync(testFixture.CancellationToken);
var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);

// Assert
Assert.Equal("test_payload"u8.ToArray(), message.Body.ToArray());
Expand Down
82 changes: 82 additions & 0 deletions test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using ActiveMQ.Artemis.Core.Client.Framing;
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class MessagePropertiesSpec(ITestOutputHelper testOutputHelper)
{
[Fact]
public async Task Should_send_and_receive_message_with_properties()
{
// Arrange
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var addressName = Guid.NewGuid().ToString();
await session.CreateAddressAsync(addressName, new [] { RoutingType.Multicast }, testFixture.CancellationToken);
await using var producer = await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName
}, testFixture.CancellationToken);
// create queue
var queueName = Guid.NewGuid().ToString();
await session.CreateQueueAsync(new QueueConfiguration
{
Address = addressName,
RoutingType = RoutingType.Multicast,
Name = queueName
}, testFixture.CancellationToken);

await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName,
}, testFixture.CancellationToken);

// Act
await producer.SendMessageAsync(new Message
{
Body = "test_payload"u8.ToArray(),
Headers = new Headers
{
Address = addressName
},
Properties = new Dictionary<string, object?>
{
["null_property"] = null,
["bool_property_true"] = true,
["bool_property_false"] = false,
["byte_property"] = (byte)42,
["bytes_property"] = new byte[] { 1, 2, 3 },
["short_property"] = (short)42,
["int_property"] = 43,
["long_property"] = 44L,
["float_property"] = 45.1F,
["double_property"] = 46.2D,
["string_property"] = "string_value",
["char_property"] = 'c',
}
}, testFixture.CancellationToken);


var receivedMessage = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);

// Assert
Assert.NotNull(receivedMessage);
Assert.Equal(12, receivedMessage.Properties.Count);
Assert.Null(receivedMessage.Properties["null_property"]);
Assert.True((bool)receivedMessage.Properties["bool_property_true"]!);
Assert.False((bool)receivedMessage.Properties["bool_property_false"]!);
Assert.Equal((byte)42, (byte)receivedMessage.Properties["byte_property"]!);
Assert.Equal([1, 2, 3], (byte[])receivedMessage.Properties["bytes_property"]!);
Assert.Equal((short)42, (short)receivedMessage.Properties["short_property"]!);
Assert.Equal(43, (int)receivedMessage.Properties["int_property"]!);
Assert.Equal(44L, (long)receivedMessage.Properties["long_property"]!);
Assert.Equal(45.1F, (float)receivedMessage.Properties["float_property"]!);
Assert.Equal(46.2D, (double)receivedMessage.Properties["double_property"]!);
Assert.Equal("string_value", (string)receivedMessage.Properties["string_property"]!);
Assert.Equal('c', (char)receivedMessage.Properties["char_property"]!);
}
}

0 comments on commit 1473c2c

Please sign in to comment.