From 1b99c2517dfda50c94b58dc0ac9cfb3dad43056e Mon Sep 17 00:00:00 2001 From: Havret Date: Thu, 28 Mar 2024 23:15:45 +0100 Subject: [PATCH] Create Consumer --- ArtemisNetCoreClient.sln.DotSettings | 1 + src/ArtemisNetCoreClient/Consumer.cs | 17 ++++++++++ .../Framing/ActiveMQExceptionMessage.cs | 19 +++++++++++ src/ArtemisNetCoreClient/Framing/Codec.cs | 3 ++ .../Framing/SessionConsumerCloseMessage.cs | 17 ++++++++++ .../Framing/SessionCreateConsumerMessage.cs | 28 ++++++++++++++++ src/ArtemisNetCoreClient/IConsumer.cs | 5 +++ src/ArtemisNetCoreClient/ISession.cs | 6 ++++ src/ArtemisNetCoreClient/Session.cs | 22 +++++++++++-- .../SessionTests.cs | 32 +++++++++++++++++++ 10 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 src/ArtemisNetCoreClient/Consumer.cs create mode 100644 src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs create mode 100644 src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs create mode 100644 src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs create mode 100644 src/ArtemisNetCoreClient/IConsumer.cs diff --git a/ArtemisNetCoreClient.sln.DotSettings b/ArtemisNetCoreClient.sln.DotSettings index d469c32..39ad036 100644 --- a/ArtemisNetCoreClient.sln.DotSettings +++ b/ArtemisNetCoreClient.sln.DotSettings @@ -1,2 +1,3 @@  + MQ True \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Consumer.cs b/src/ArtemisNetCoreClient/Consumer.cs new file mode 100644 index 0000000..52ae74a --- /dev/null +++ b/src/ArtemisNetCoreClient/Consumer.cs @@ -0,0 +1,17 @@ +using ActiveMQ.Artemis.Core.Client.Framing; + +namespace ActiveMQ.Artemis.Core.Client; + +internal class Consumer(Session session) : IConsumer +{ + public required long ConsumerId { get; init; } + + public async ValueTask DisposeAsync() + { + var request = new SessionConsumerCloseMessage + { + ConsumerId = ConsumerId + }; + _ = await session.SendBlockingAsync(request, default); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs new file mode 100644 index 0000000..8fa16d0 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs @@ -0,0 +1,19 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class ActiveMQExceptionMessage : Packet +{ + public const byte Type = 20; + + public int Code { get; private set; } + public string? Message { get; set; } + + public override void Encode(ByteBuffer buffer) + { + } + + public override void Decode(ByteBuffer buffer) + { + Code = buffer.ReadInt(); + Message = buffer.ReadNullableString(); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index 81f41c8..bfd7c65 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -16,6 +16,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) SessionBindingQueryMessage => SessionBindingQueryMessage.Type, CreateQueueMessageV2 => CreateQueueMessageV2.Type, SessionQueueQueryMessage => SessionQueueQueryMessage.Type, + SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type, + SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type, _ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding") }; buffer.WriteByte(type); @@ -37,6 +39,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer) NullResponse.Type => new NullResponse(), SessionBindingQueryResponseMessageV5.Type => new SessionBindingQueryResponseMessageV5(), SessionQueueQueryResponseMessageV3.Type => new SessionQueueQueryResponseMessageV3(), + ActiveMQExceptionMessage.Type => new ActiveMQExceptionMessage(), _ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding") }; diff --git a/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs new file mode 100644 index 0000000..7772bcd --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs @@ -0,0 +1,17 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionConsumerCloseMessage : Packet +{ + public const byte Type = 74; + + public long ConsumerId { get; set; } + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteLong(ConsumerId); + } + + public override void Decode(ByteBuffer buffer) + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs new file mode 100644 index 0000000..345afc5 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs @@ -0,0 +1,28 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionCreateConsumerMessage : Packet +{ + public const byte Type = 40; + + public required long Id { get; init; } + public required string QueueName { get; init; } + public string? FilterString { get; init; } + public required int Priority { get; init; } + public required bool BrowseOnly { get; init; } + public required bool RequiresResponse { get; init; } + + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteLong(Id); + buffer.WriteByteString(QueueName); + buffer.WriteNullableByteString(FilterString); + buffer.WriteBool(BrowseOnly); + buffer.WriteBool(RequiresResponse); + buffer.WriteInt(Priority); + } + + public override void Decode(ByteBuffer buffer) + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/IConsumer.cs b/src/ArtemisNetCoreClient/IConsumer.cs new file mode 100644 index 0000000..8f7fb03 --- /dev/null +++ b/src/ArtemisNetCoreClient/IConsumer.cs @@ -0,0 +1,5 @@ +namespace ActiveMQ.Artemis.Core.Client; + +public interface IConsumer : IAsyncDisposable +{ +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index 8039824..bf59027 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -8,4 +8,10 @@ public interface ISession : IAsyncDisposable Task GetAddressInfo(string address, CancellationToken cancellationToken); Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken); Task GetQueueInfo(string queueName, CancellationToken cancellationToken); + Task CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken); +} + +public class ConsumerConfiguration +{ + public required string QueueName { get; init; } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index ce0f5e9..8ff58ac 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -92,7 +92,8 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio RequiresResponse = true, Address = queueConfiguration.Address, QueueName = queueConfiguration.Name, - RoutingType = queueConfiguration.RoutingType + RoutingType = queueConfiguration.RoutingType, + MaxConsumers = -1 }; _ = await SendBlockingAsync(createQueueMessage, cancellationToken); } @@ -118,6 +119,23 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio return null; } + public async Task CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken) + { + var request = new SessionCreateConsumerMessage + { + Id = 0, + QueueName = consumerConfiguration.QueueName, + Priority = 0, + BrowseOnly = false, + RequiresResponse = true + }; + _ = await SendBlockingAsync(request, cancellationToken); + return new Consumer(this) + { + ConsumerId = request.Id + }; + } + public async ValueTask DisposeAsync() { _ = await SendBlockingAsync(new SessionStop(), default); @@ -125,7 +143,7 @@ public async ValueTask DisposeAsync() await _transport.DisposeAsync().ConfigureAwait(false); } - private async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet + internal async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet { var tcs = new TaskCompletionSource(); diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs index ff1200a..a3ea94e 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs @@ -123,4 +123,36 @@ public async Task should_not_return_queue_info_when_queue_does_not_exist() // Assert Assert.That(queueInfo, Is.Null); } + + [Test] + public async Task should_create_and_dispose_consumer() + { + // Arrange + var connectionFactory = new SessionFactory(); + await using var session = await connectionFactory.CreateAsync(new Endpoint + { + Host = "localhost", + Port = 5445, + User = "artemis", + Password = "artemis" + }); + var addressName = $"{Guid.NewGuid().ToString()}"; + await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default); + + var queueName = Guid.NewGuid().ToString(); + await session.CreateQueue(new QueueConfiguration + { + Address = addressName, + Name = queueName, + RoutingType = RoutingType.Multicast + }, default); + + // Act + var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration + { + QueueName = queueName + }, default); + + await consumer.DisposeAsync(); + } } \ No newline at end of file