diff --git a/ArtemisNetCoreClient.sln.DotSettings b/ArtemisNetCoreClient.sln.DotSettings
index d469c32..39ad036 100644
--- a/ArtemisNetCoreClient.sln.DotSettings
+++ b/ArtemisNetCoreClient.sln.DotSettings
@@ -1,2 +1,3 @@
+ MQ
True
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Consumer.cs b/src/ArtemisNetCoreClient/Consumer.cs
new file mode 100644
index 0000000..52ae74a
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Consumer.cs
@@ -0,0 +1,17 @@
+using ActiveMQ.Artemis.Core.Client.Framing;
+
+namespace ActiveMQ.Artemis.Core.Client;
+
+internal class Consumer(Session session) : IConsumer
+{
+ public required long ConsumerId { get; init; }
+
+ public async ValueTask DisposeAsync()
+ {
+ var request = new SessionConsumerCloseMessage
+ {
+ ConsumerId = ConsumerId
+ };
+ _ = await session.SendBlockingAsync(request, default);
+ }
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs
new file mode 100644
index 0000000..8fa16d0
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs
@@ -0,0 +1,19 @@
+namespace ActiveMQ.Artemis.Core.Client.Framing;
+
+internal class ActiveMQExceptionMessage : Packet
+{
+ public const byte Type = 20;
+
+ public int Code { get; private set; }
+ public string? Message { get; set; }
+
+ public override void Encode(ByteBuffer buffer)
+ {
+ }
+
+ public override void Decode(ByteBuffer buffer)
+ {
+ Code = buffer.ReadInt();
+ Message = buffer.ReadNullableString();
+ }
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs
index 81f41c8..bfd7c65 100644
--- a/src/ArtemisNetCoreClient/Framing/Codec.cs
+++ b/src/ArtemisNetCoreClient/Framing/Codec.cs
@@ -16,6 +16,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionBindingQueryMessage => SessionBindingQueryMessage.Type,
CreateQueueMessageV2 => CreateQueueMessageV2.Type,
SessionQueueQueryMessage => SessionQueueQueryMessage.Type,
+ SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type,
+ SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
@@ -37,6 +39,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer)
NullResponse.Type => new NullResponse(),
SessionBindingQueryResponseMessageV5.Type => new SessionBindingQueryResponseMessageV5(),
SessionQueueQueryResponseMessageV3.Type => new SessionQueueQueryResponseMessageV3(),
+ ActiveMQExceptionMessage.Type => new ActiveMQExceptionMessage(),
_ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding")
};
diff --git a/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs
new file mode 100644
index 0000000..7772bcd
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs
@@ -0,0 +1,17 @@
+namespace ActiveMQ.Artemis.Core.Client.Framing;
+
+internal class SessionConsumerCloseMessage : Packet
+{
+ public const byte Type = 74;
+
+ public long ConsumerId { get; set; }
+
+ public override void Encode(ByteBuffer buffer)
+ {
+ buffer.WriteLong(ConsumerId);
+ }
+
+ public override void Decode(ByteBuffer buffer)
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs
new file mode 100644
index 0000000..345afc5
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs
@@ -0,0 +1,28 @@
+namespace ActiveMQ.Artemis.Core.Client.Framing;
+
+internal class SessionCreateConsumerMessage : Packet
+{
+ public const byte Type = 40;
+
+ public required long Id { get; init; }
+ public required string QueueName { get; init; }
+ public string? FilterString { get; init; }
+ public required int Priority { get; init; }
+ public required bool BrowseOnly { get; init; }
+ public required bool RequiresResponse { get; init; }
+
+
+ public override void Encode(ByteBuffer buffer)
+ {
+ buffer.WriteLong(Id);
+ buffer.WriteByteString(QueueName);
+ buffer.WriteNullableByteString(FilterString);
+ buffer.WriteBool(BrowseOnly);
+ buffer.WriteBool(RequiresResponse);
+ buffer.WriteInt(Priority);
+ }
+
+ public override void Decode(ByteBuffer buffer)
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/IConsumer.cs b/src/ArtemisNetCoreClient/IConsumer.cs
new file mode 100644
index 0000000..8f7fb03
--- /dev/null
+++ b/src/ArtemisNetCoreClient/IConsumer.cs
@@ -0,0 +1,5 @@
+namespace ActiveMQ.Artemis.Core.Client;
+
+public interface IConsumer : IAsyncDisposable
+{
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs
index 8039824..bf59027 100644
--- a/src/ArtemisNetCoreClient/ISession.cs
+++ b/src/ArtemisNetCoreClient/ISession.cs
@@ -8,4 +8,10 @@ public interface ISession : IAsyncDisposable
Task GetAddressInfo(string address, CancellationToken cancellationToken);
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
Task GetQueueInfo(string queueName, CancellationToken cancellationToken);
+ Task CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken);
+}
+
+public class ConsumerConfiguration
+{
+ public required string QueueName { get; init; }
}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs
index ce0f5e9..8ff58ac 100644
--- a/src/ArtemisNetCoreClient/Session.cs
+++ b/src/ArtemisNetCoreClient/Session.cs
@@ -92,7 +92,8 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio
RequiresResponse = true,
Address = queueConfiguration.Address,
QueueName = queueConfiguration.Name,
- RoutingType = queueConfiguration.RoutingType
+ RoutingType = queueConfiguration.RoutingType,
+ MaxConsumers = -1
};
_ = await SendBlockingAsync(createQueueMessage, cancellationToken);
}
@@ -118,6 +119,23 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio
return null;
}
+ public async Task CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken)
+ {
+ var request = new SessionCreateConsumerMessage
+ {
+ Id = 0,
+ QueueName = consumerConfiguration.QueueName,
+ Priority = 0,
+ BrowseOnly = false,
+ RequiresResponse = true
+ };
+ _ = await SendBlockingAsync(request, cancellationToken);
+ return new Consumer(this)
+ {
+ ConsumerId = request.Id
+ };
+ }
+
public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync(new SessionStop(), default);
@@ -125,7 +143,7 @@ public async ValueTask DisposeAsync()
await _transport.DisposeAsync().ConfigureAwait(false);
}
- private async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
+ internal async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource();
diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs
index ff1200a..a3ea94e 100644
--- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs
+++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs
@@ -123,4 +123,36 @@ public async Task should_not_return_queue_info_when_queue_does_not_exist()
// Assert
Assert.That(queueInfo, Is.Null);
}
+
+ [Test]
+ public async Task should_create_and_dispose_consumer()
+ {
+ // Arrange
+ var connectionFactory = new SessionFactory();
+ await using var session = await connectionFactory.CreateAsync(new Endpoint
+ {
+ Host = "localhost",
+ Port = 5445,
+ User = "artemis",
+ Password = "artemis"
+ });
+ var addressName = $"{Guid.NewGuid().ToString()}";
+ await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);
+
+ var queueName = Guid.NewGuid().ToString();
+ await session.CreateQueue(new QueueConfiguration
+ {
+ Address = addressName,
+ Name = queueName,
+ RoutingType = RoutingType.Multicast
+ }, default);
+
+ // Act
+ var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
+ {
+ QueueName = queueName
+ }, default);
+
+ await consumer.DisposeAsync();
+ }
}
\ No newline at end of file