Skip to content

Commit

Permalink
Create queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 24, 2024
1 parent b561a1e commit 4763d5f
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionCloseMessage => SessionCloseMessage.Type,
CreateAddressMessage => CreateAddressMessage.Type,
SessionBindingQueryMessage => SessionBindingQueryMessage.Type,
CreateQueueMessageV2 => CreateQueueMessageV2.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
66 changes: 66 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateQueueMessageV2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateQueueMessageV2 : Packet
{
public const byte Type = unchecked((byte) -12);

public required string Address { get; init; }
public required string QueueName { get; init; }
public string? FilterString { get; init; }
public bool Durable { get; init; }
public bool Temporary { get; init; }
public bool RequiresResponse { get; init; }
public bool AutoCreated { get; init; }
public required RoutingType RoutingType { get; init; }
public int MaxConsumers { get; init; }
public bool PurgeOnNoConsumers { get; init; }
public bool? Exclusive { get; init; }
public bool? LastValue { get; init; }
public string? LastValueKey { get; init; }
public bool? NonDestructive { get; init; }
public int? ConsumersBeforeDispatch { get; init; }
public long? DelayBeforeDispatch { get; init; }
public bool? GroupRebalance { get; init; }
public int? GroupBuckets { get; init; }
public bool? AutoDelete { get; init; }
public long? AutoDeleteDelay { get; init; }
public long? AutoDeleteMessageCount { get; init; }
public string? GroupFirstKey { get; init; }
public long? RingSize { get; init; }
public bool? Enabled { get; init; }
public bool? GroupRebalancePauseDispatch { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteAmqString(Address);
buffer.WriteAmqString(QueueName);
buffer.WriteNullableString(FilterString);
buffer.WriteBool(Durable);
buffer.WriteBool(Temporary);
buffer.WriteBool(RequiresResponse);
buffer.WriteBool(AutoCreated);
buffer.WriteByte((byte) RoutingType);
buffer.WriteInt(MaxConsumers);
buffer.WriteBool(PurgeOnNoConsumers);
buffer.WriteNullableBool(Exclusive);
buffer.WriteNullableBool(LastValue);
buffer.WriteNullableAmqString(LastValueKey);
buffer.WriteNullableBool(NonDestructive);
buffer.WriteNullableInt(ConsumersBeforeDispatch);
buffer.WriteNullableLong(DelayBeforeDispatch);
buffer.WriteNullableBool(GroupRebalance);
buffer.WriteNullableInt(GroupBuckets);
buffer.WriteNullableBool(AutoDelete);
buffer.WriteNullableLong(AutoDeleteDelay);
buffer.WriteNullableLong(AutoDeleteMessageCount);
buffer.WriteNullableAmqString(GroupFirstKey);
buffer.WriteNullableLong(RingSize);
buffer.WriteNullableBool(Enabled);
buffer.WriteNullableBool(GroupRebalancePauseDispatch);
}

public override void Decode(ByteBuffer buffer)
{
throw new NotImplementedException();
}
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ public interface ISession : IAsyncDisposable
{
Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, CancellationToken cancellationToken);
Task<AddressInfo> GetAddressInfo(string address, CancellationToken cancellationToken);
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
}
10 changes: 10 additions & 0 deletions src/ArtemisNetCoreClient/QueueConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public class QueueConfiguration
{
public required string Address { get; init; }
public required string Name { get; init; }
public required RoutingType RoutingType { get; init; }
}
14 changes: 13 additions & 1 deletion src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task<AddressInfo> GetAddressInfo(string address, CancellationToken
RoutingTypes = GetRoutingTypes(response).ToArray(),
};
}

private static IEnumerable<RoutingType> GetRoutingTypes(SessionBindingQueryResponseMessageV5 sessionBindingQueryResponseMessageV5)
{
if (sessionBindingQueryResponseMessageV5.SupportsAnycast)
Expand All @@ -80,6 +80,18 @@ private static IEnumerable<RoutingType> GetRoutingTypes(SessionBindingQueryRespo
}
}

public async Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken)
{
var createQueueMessage = new CreateQueueMessageV2
{
RequiresResponse = true,
Address = queueConfiguration.Address,
QueueName = queueConfiguration.Name,
RoutingType = queueConfiguration.RoutingType
};
_ = await SendBlockingAsync<CreateQueueMessageV2, NullResponse>(createQueueMessage, cancellationToken);
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
Expand Down
26 changes: 26 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,30 @@ public async Task should_create_address_with_selected_routing_type(RoutingType[]
Assert.That(addressInfo.Name, Is.EqualTo(addressName));
CollectionAssert.AreEqual(routingTypes, addressInfo.RoutingTypes);
}

[TestCase(RoutingType.Anycast)]
[TestCase(RoutingType.Multicast)]
public async Task should_create_queue_with_selected_routing_type(RoutingType routingType)
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
await session.CreateAddress(addressName, new[] { routingType }, default);

// Act
var queueName = $"{Guid.NewGuid().ToString()}";
await session.CreateQueue(new QueueConfiguration
{
Address = addressName,
Name = queueName,
RoutingType = routingType
}, default);
}
}

0 comments on commit 4763d5f

Please sign in to comment.