Skip to content

Commit

Permalink
Create Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 28, 2024
1 parent 17e413b commit 1b99c25
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 2 deletions.
1 change: 1 addition & 0 deletions ArtemisNetCoreClient.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=MQ/@EntryIndexedValue">MQ</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Anycast/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
17 changes: 17 additions & 0 deletions src/ArtemisNetCoreClient/Consumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

internal class Consumer(Session session) : IConsumer
{
public required long ConsumerId { get; init; }

public async ValueTask DisposeAsync()
{
var request = new SessionConsumerCloseMessage
{
ConsumerId = ConsumerId
};
_ = await session.SendBlockingAsync<SessionConsumerCloseMessage, NullResponse>(request, default);
}
}
19 changes: 19 additions & 0 deletions src/ArtemisNetCoreClient/Framing/ActiveMQExceptionMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class ActiveMQExceptionMessage : Packet
{
public const byte Type = 20;

public int Code { get; private set; }
public string? Message { get; set; }

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
Code = buffer.ReadInt();
Message = buffer.ReadNullableString();
}
}
3 changes: 3 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionBindingQueryMessage => SessionBindingQueryMessage.Type,
CreateQueueMessageV2 => CreateQueueMessageV2.Type,
SessionQueueQueryMessage => SessionQueueQueryMessage.Type,
SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type,
SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand All @@ -37,6 +39,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer)
NullResponse.Type => new NullResponse(),
SessionBindingQueryResponseMessageV5.Type => new SessionBindingQueryResponseMessageV5(),
SessionQueueQueryResponseMessageV3.Type => new SessionQueueQueryResponseMessageV3(),
ActiveMQExceptionMessage.Type => new ActiveMQExceptionMessage(),
_ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding")
};

Expand Down
17 changes: 17 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionConsumerCloseMessage : Packet
{
public const byte Type = 74;

public long ConsumerId { get; set; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteLong(ConsumerId);
}

public override void Decode(ByteBuffer buffer)
{
}
}
28 changes: 28 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionCreateConsumerMessage : Packet
{
public const byte Type = 40;

public required long Id { get; init; }
public required string QueueName { get; init; }
public string? FilterString { get; init; }
public required int Priority { get; init; }
public required bool BrowseOnly { get; init; }
public required bool RequiresResponse { get; init; }


public override void Encode(ByteBuffer buffer)
{
buffer.WriteLong(Id);
buffer.WriteByteString(QueueName);
buffer.WriteNullableByteString(FilterString);
buffer.WriteBool(BrowseOnly);
buffer.WriteBool(RequiresResponse);
buffer.WriteInt(Priority);
}

public override void Decode(ByteBuffer buffer)
{
}
}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace ActiveMQ.Artemis.Core.Client;

public interface IConsumer : IAsyncDisposable
{
}
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ public interface ISession : IAsyncDisposable
Task<AddressInfo?> GetAddressInfo(string address, CancellationToken cancellationToken);
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
Task<QueueInfo?> GetQueueInfo(string queueName, CancellationToken cancellationToken);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken);
}

public class ConsumerConfiguration
{
public required string QueueName { get; init; }
}
22 changes: 20 additions & 2 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio
RequiresResponse = true,
Address = queueConfiguration.Address,
QueueName = queueConfiguration.Name,
RoutingType = queueConfiguration.RoutingType
RoutingType = queueConfiguration.RoutingType,
MaxConsumers = -1
};
_ = await SendBlockingAsync<CreateQueueMessageV2, NullResponse>(createQueueMessage, cancellationToken);
}
Expand All @@ -118,14 +119,31 @@ public async Task CreateQueue(QueueConfiguration queueConfiguration, Cancellatio
return null;
}

public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken)
{
var request = new SessionCreateConsumerMessage
{
Id = 0,
QueueName = consumerConfiguration.QueueName,
Priority = 0,
BrowseOnly = false,
RequiresResponse = true
};
_ = await SendBlockingAsync<SessionCreateConsumerMessage, SessionQueueQueryResponseMessageV3>(request, cancellationToken);
return new Consumer(this)
{
ConsumerId = request.Id
};
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(), default);
await _transport.DisposeAsync().ConfigureAwait(false);
}

private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
internal async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource<Packet>();

Expand Down
32 changes: 32 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,36 @@ public async Task should_not_return_queue_info_when_queue_does_not_exist()
// Assert
Assert.That(queueInfo, Is.Null);
}

[Test]
public async Task should_create_and_dispose_consumer()
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);

var queueName = Guid.NewGuid().ToString();
await session.CreateQueue(new QueueConfiguration
{
Address = addressName,
Name = queueName,
RoutingType = RoutingType.Multicast
}, default);

// Act
var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
}, default);

await consumer.DisposeAsync();
}
}

0 comments on commit 1b99c25

Please sign in to comment.