Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Message Properties #63

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async ValueTask DisposeAsync()
await _session.CloseConsumer(ConsumerId);
}

public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken)
public async ValueTask<Message> ReceiveMessageAsync(CancellationToken cancellationToken)
{
return await _reader.ReadAsync(cancellationToken);
}
Expand Down
9 changes: 9 additions & 0 deletions src/ArtemisNetCoreClient/Exceptions/ActiveMQException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace ActiveMQ.Artemis.Core.Client.Exceptions;

public class ActiveMQException : Exception
{
public ActiveMQException(int code, string message) : base(message)
{
// TODO: Handle Exception Code as Exception Type
}
}
28 changes: 16 additions & 12 deletions src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
using System.Diagnostics;

namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class ActiveMQExceptionMessage : Packet
internal readonly struct ActiveMQExceptionMessage : IIncomingPacket
{
public const byte Type = 20;

public int Code { get; private set; }
public string? Message { get; set; }
public readonly int Code;
public readonly string? Message;
public readonly long CorrelationId;

public virtual void Encode(ByteBuffer buffer)
{
}

public virtual void Decode(ByteBuffer buffer)
public ActiveMQExceptionMessage(ReadOnlySpan<byte> buffer)
{
Code = buffer.ReadInt();
Message = buffer.ReadNullableString();
var readBytes = 0;
readBytes += ArtemisBinaryConverter.ReadInt32(buffer, out Code);
readBytes += ArtemisBinaryConverter.ReadNullableString(buffer[readBytes..], out Message);
if (buffer.Length - readBytes >= sizeof(long))
{
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out CorrelationId);
}

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
}
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Framing/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/// <summary>
/// The message properties
/// </summary>
public IDictionary<string, object> Properties { get; set; }
public IDictionary<string, object?> Properties { get; set; }

Check warning on line 13 in src/ArtemisNetCoreClient/Framing/Message.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Properties' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 13 in src/ArtemisNetCoreClient/Framing/Message.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Properties' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

/// <summary>
/// The message body (payload)
Expand Down
14 changes: 11 additions & 3 deletions src/ArtemisNetCoreClient/Framing/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,25 @@ private static int DecodeHeaders(ReadOnlySpan<byte> buffer, out Headers value)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IDictionary<string, object> value)
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IDictionary<string, object?> value)
{
var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
if (isNotNull == DataConstants.NotNull)
{
value = new Dictionary<string, object>();
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count);
value = new Dictionary<string, object?>(count);
for (var i = 0; i < count; i++)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key);
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
value.Add(key, obj);
}

return readBytes;
}
else
{
value = ReadOnlyDictionary<string, object>.Empty;
value = ReadOnlyDictionary<string, object?>.Empty;
return readBytes;
}
}
Expand Down
34 changes: 29 additions & 5 deletions src/ArtemisNetCoreClient/Framing/SessionSendMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@ public int GetRequiredBufferSize()
byteCount += sizeof(long); // Expiration
byteCount += sizeof(long); // Timestamp
byteCount += sizeof(byte); // Priority
byteCount += sizeof(byte); // Properties size nullability

byteCount += sizeof(byte); // Properties nullability
if (Message.Properties?.Count > 0)
{
byteCount += sizeof(int); // Properties count
foreach (var (key, value) in Message.Properties)
{
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(key);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount(value);
}
}

byteCount += sizeof(bool); // RequiresResponse
byteCount += sizeof(long); // CorrelationId
byteCount += sizeof(int); // ProducerId
Expand Down Expand Up @@ -85,12 +96,25 @@ private int EncodeHeaders(Span<byte> buffer)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int EncodeProperties(Span<byte> buffer)
private int EncodeProperties(Span<byte> buffer)
{
var offset = 0;

offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null);


if (Message.Properties?.Count > 0)
{
offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.NotNull);
offset += ArtemisBinaryConverter.WriteInt32(ref buffer.GetOffset(offset), Message.Properties.Count);
foreach (var (key, value) in Message.Properties)
{
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), key);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), value);
}
}
else
{
offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetReference(), DataConstants.Null);
}

return offset;
}
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ namespace ActiveMQ.Artemis.Core.Client;

public interface IConsumer : IAsyncDisposable
{
ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken);
ValueTask<Message> ReceiveMessageAsync(CancellationToken cancellationToken);
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/PacketType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ internal enum PacketType : byte
SessionSendMessage = 71,
SessionConsumerFlowCreditMessage = 70,
SessionReceiveMessage = 75,
Exception = 20,
}
14 changes: 14 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Exceptions;
using ActiveMQ.Artemis.Core.Client.Framing;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -416,6 +417,19 @@ public void OnPacket(in InboundPacket packet)

break;
}
case PacketType.Exception:
{
var message = new ActiveMQExceptionMessage(packet.Payload);
if (_completionSources.TryRemove(message.CorrelationId, out var tcs))
{
tcs.TrySetException(new ActiveMQException(message.Code, message.Message ?? "ActiveMQ Exception with no message received"));
}
else
{
_logger.LogError("Received exception message with code {Code} and message {Message}", message.Code, message.Message);
}
break;
}
default:
{
_logger.LogWarning("Received unexpected packet type {PacketType}", packet.PacketType);
Expand Down
2 changes: 1 addition & 1 deletion test/ArtemisNetCoreClient.Tests/ConsumerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ await producer.SendMessageAsync(new Message
}, testFixture.CancellationToken);

// Act
var message = await consumer.ReceiveAsync(testFixture.CancellationToken);
var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);

// Assert
Assert.Equal("test_payload"u8.ToArray(), message.Body.ToArray());
Expand Down
82 changes: 82 additions & 0 deletions test/ArtemisNetCoreClient.Tests/MessagePropertiesSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using ActiveMQ.Artemis.Core.Client.Framing;
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class MessagePropertiesSpec(ITestOutputHelper testOutputHelper)
{
[Fact]
public async Task Should_send_and_receive_message_with_properties()
{
// Arrange
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var addressName = Guid.NewGuid().ToString();
await session.CreateAddressAsync(addressName, new [] { RoutingType.Multicast }, testFixture.CancellationToken);
await using var producer = await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName
}, testFixture.CancellationToken);
// create queue
var queueName = Guid.NewGuid().ToString();
await session.CreateQueueAsync(new QueueConfiguration
{
Address = addressName,
RoutingType = RoutingType.Multicast,
Name = queueName
}, testFixture.CancellationToken);

await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName,
}, testFixture.CancellationToken);

// Act
await producer.SendMessageAsync(new Message
{
Body = "test_payload"u8.ToArray(),
Headers = new Headers
{
Address = addressName
},
Properties = new Dictionary<string, object?>
{
["null_property"] = null,
["bool_property_true"] = true,
["bool_property_false"] = false,
["byte_property"] = (byte)42,
["bytes_property"] = new byte[] { 1, 2, 3 },
["short_property"] = (short)42,
["int_property"] = 43,
["long_property"] = 44L,
["float_property"] = 45.1F,
["double_property"] = 46.2D,
["string_property"] = "string_value",
["char_property"] = 'c',
}
}, testFixture.CancellationToken);


var receivedMessage = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);

// Assert
Assert.NotNull(receivedMessage);
Assert.Equal(12, receivedMessage.Properties.Count);
Assert.Null(receivedMessage.Properties["null_property"]);
Assert.True((bool)receivedMessage.Properties["bool_property_true"]!);
Assert.False((bool)receivedMessage.Properties["bool_property_false"]!);
Assert.Equal((byte)42, (byte)receivedMessage.Properties["byte_property"]!);
Assert.Equal([1, 2, 3], (byte[])receivedMessage.Properties["bytes_property"]!);
Assert.Equal((short)42, (short)receivedMessage.Properties["short_property"]!);
Assert.Equal(43, (int)receivedMessage.Properties["int_property"]!);
Assert.Equal(44L, (long)receivedMessage.Properties["long_property"]!);
Assert.Equal(45.1F, (float)receivedMessage.Properties["float_property"]!);
Assert.Equal(46.2D, (double)receivedMessage.Properties["double_property"]!);
Assert.Equal("string_value", (string)receivedMessage.Properties["string_property"]!);
Assert.Equal('c', (char)receivedMessage.Properties["char_property"]!);
}
}
Loading