diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index 1e2f8c4..29b3437 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -14,6 +14,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) SessionCloseMessage => SessionCloseMessage.Type, CreateAddressMessage => CreateAddressMessage.Type, SessionBindingQueryMessage => SessionBindingQueryMessage.Type, + CreateQueueMessageV2 => CreateQueueMessageV2.Type, _ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding") }; buffer.WriteByte(type); diff --git a/src/ArtemisNetCoreClient/Framing/CreateQueueMessageV2.cs b/src/ArtemisNetCoreClient/Framing/CreateQueueMessageV2.cs new file mode 100644 index 0000000..82aee2b --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/CreateQueueMessageV2.cs @@ -0,0 +1,66 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class CreateQueueMessageV2 : Packet +{ + public const byte Type = unchecked((byte) -12); + + public required string Address { get; init; } + public required string QueueName { get; init; } + public string? FilterString { get; init; } + public bool Durable { get; init; } + public bool Temporary { get; init; } + public bool RequiresResponse { get; init; } + public bool AutoCreated { get; init; } + public required RoutingType RoutingType { get; init; } + public int MaxConsumers { get; init; } + public bool PurgeOnNoConsumers { get; init; } + public bool? Exclusive { get; init; } + public bool? LastValue { get; init; } + public string? LastValueKey { get; init; } + public bool? NonDestructive { get; init; } + public int? ConsumersBeforeDispatch { get; init; } + public long? DelayBeforeDispatch { get; init; } + public bool? GroupRebalance { get; init; } + public int? GroupBuckets { get; init; } + public bool? AutoDelete { get; init; } + public long? AutoDeleteDelay { get; init; } + public long? AutoDeleteMessageCount { get; init; } + public string? GroupFirstKey { get; init; } + public long? RingSize { get; init; } + public bool? Enabled { get; init; } + public bool? GroupRebalancePauseDispatch { get; init; } + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteAmqString(Address); + buffer.WriteAmqString(QueueName); + buffer.WriteNullableString(FilterString); + buffer.WriteBool(Durable); + buffer.WriteBool(Temporary); + buffer.WriteBool(RequiresResponse); + buffer.WriteBool(AutoCreated); + buffer.WriteByte((byte) RoutingType); + buffer.WriteInt(MaxConsumers); + buffer.WriteBool(PurgeOnNoConsumers); + buffer.WriteNullableBool(Exclusive); + buffer.WriteNullableBool(LastValue); + buffer.WriteNullableAmqString(LastValueKey); + buffer.WriteNullableBool(NonDestructive); + buffer.WriteNullableInt(ConsumersBeforeDispatch); + buffer.WriteNullableLong(DelayBeforeDispatch); + buffer.WriteNullableBool(GroupRebalance); + buffer.WriteNullableInt(GroupBuckets); + buffer.WriteNullableBool(AutoDelete); + buffer.WriteNullableLong(AutoDeleteDelay); + buffer.WriteNullableLong(AutoDeleteMessageCount); + buffer.WriteNullableAmqString(GroupFirstKey); + buffer.WriteNullableLong(RingSize); + buffer.WriteNullableBool(Enabled); + buffer.WriteNullableBool(GroupRebalancePauseDispatch); + } + + public override void Decode(ByteBuffer buffer) + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index 2b9d256..6ebf67d 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -6,4 +6,5 @@ public interface ISession : IAsyncDisposable { Task CreateAddress(string address, IEnumerable routingTypes, CancellationToken cancellationToken); Task GetAddressInfo(string address, CancellationToken cancellationToken); + Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken); } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/QueueConfiguration.cs b/src/ArtemisNetCoreClient/QueueConfiguration.cs new file mode 100644 index 0000000..aea4e4f --- /dev/null +++ b/src/ArtemisNetCoreClient/QueueConfiguration.cs @@ -0,0 +1,10 @@ +using ActiveMQ.Artemis.Core.Client.Framing; + +namespace ActiveMQ.Artemis.Core.Client; + +public class QueueConfiguration +{ + public required string Address { get; init; } + public required string Name { get; init; } + public required RoutingType RoutingType { get; init; } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index b1809c3..9808b9d 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -66,7 +66,7 @@ public async Task GetAddressInfo(string address, CancellationToken RoutingTypes = GetRoutingTypes(response).ToArray(), }; } - + private static IEnumerable GetRoutingTypes(SessionBindingQueryResponseMessageV5 sessionBindingQueryResponseMessageV5) { if (sessionBindingQueryResponseMessageV5.SupportsAnycast) @@ -80,6 +80,18 @@ private static IEnumerable GetRoutingTypes(SessionBindingQueryRespo } } + public async Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken) + { + var createQueueMessage = new CreateQueueMessageV2 + { + RequiresResponse = true, + Address = queueConfiguration.Address, + QueueName = queueConfiguration.Name, + RoutingType = queueConfiguration.RoutingType + }; + _ = await SendBlockingAsync(createQueueMessage, cancellationToken); + } + public async ValueTask DisposeAsync() { _ = await SendBlockingAsync(new SessionStop(), default); diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs index a2caca4..ac3bd08 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs @@ -48,4 +48,30 @@ public async Task should_create_address_with_selected_routing_type(RoutingType[] Assert.That(addressInfo.Name, Is.EqualTo(addressName)); CollectionAssert.AreEqual(routingTypes, addressInfo.RoutingTypes); } + + [TestCase(RoutingType.Anycast)] + [TestCase(RoutingType.Multicast)] + public async Task should_create_queue_with_selected_routing_type(RoutingType routingType) + { + // Arrange + var connectionFactory = new SessionFactory(); + await using var session = await connectionFactory.CreateAsync(new Endpoint + { + Host = "localhost", + Port = 5445, + User = "artemis", + Password = "artemis" + }); + var addressName = $"{Guid.NewGuid().ToString()}"; + await session.CreateAddress(addressName, new[] { routingType }, default); + + // Act + var queueName = $"{Guid.NewGuid().ToString()}"; + await session.CreateQueue(new QueueConfiguration + { + Address = addressName, + Name = queueName, + RoutingType = routingType + }, default); + } } \ No newline at end of file