Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create queue #42

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionCloseMessage => SessionCloseMessage.Type,
CreateAddressMessage => CreateAddressMessage.Type,
SessionBindingQueryMessage => SessionBindingQueryMessage.Type,
CreateQueueMessageV2 => CreateQueueMessageV2.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
66 changes: 66 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateQueueMessageV2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateQueueMessageV2 : Packet
{
public const byte Type = unchecked((byte) -12);

public required string Address { get; init; }
public required string QueueName { get; init; }
public string? FilterString { get; init; }
public bool Durable { get; init; }
public bool Temporary { get; init; }
public bool RequiresResponse { get; init; }
public bool AutoCreated { get; init; }
public required RoutingType RoutingType { get; init; }
public int MaxConsumers { get; init; }
public bool PurgeOnNoConsumers { get; init; }
public bool? Exclusive { get; init; }
public bool? LastValue { get; init; }
public string? LastValueKey { get; init; }
public bool? NonDestructive { get; init; }
public int? ConsumersBeforeDispatch { get; init; }
public long? DelayBeforeDispatch { get; init; }
public bool? GroupRebalance { get; init; }
public int? GroupBuckets { get; init; }
public bool? AutoDelete { get; init; }
public long? AutoDeleteDelay { get; init; }
public long? AutoDeleteMessageCount { get; init; }
public string? GroupFirstKey { get; init; }
public long? RingSize { get; init; }
public bool? Enabled { get; init; }
public bool? GroupRebalancePauseDispatch { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteAmqString(Address);
buffer.WriteAmqString(QueueName);
buffer.WriteNullableString(FilterString);
buffer.WriteBool(Durable);
buffer.WriteBool(Temporary);
buffer.WriteBool(RequiresResponse);
buffer.WriteBool(AutoCreated);
buffer.WriteByte((byte) RoutingType);
buffer.WriteInt(MaxConsumers);
buffer.WriteBool(PurgeOnNoConsumers);
buffer.WriteNullableBool(Exclusive);
buffer.WriteNullableBool(LastValue);
buffer.WriteNullableAmqString(LastValueKey);
buffer.WriteNullableBool(NonDestructive);
buffer.WriteNullableInt(ConsumersBeforeDispatch);
buffer.WriteNullableLong(DelayBeforeDispatch);
buffer.WriteNullableBool(GroupRebalance);
buffer.WriteNullableInt(GroupBuckets);
buffer.WriteNullableBool(AutoDelete);
buffer.WriteNullableLong(AutoDeleteDelay);
buffer.WriteNullableLong(AutoDeleteMessageCount);
buffer.WriteNullableAmqString(GroupFirstKey);
buffer.WriteNullableLong(RingSize);
buffer.WriteNullableBool(Enabled);
buffer.WriteNullableBool(GroupRebalancePauseDispatch);
}

public override void Decode(ByteBuffer buffer)
{
throw new NotImplementedException();
}
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ public interface ISession : IAsyncDisposable
{
Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, CancellationToken cancellationToken);
Task<AddressInfo> GetAddressInfo(string address, CancellationToken cancellationToken);
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
}
10 changes: 10 additions & 0 deletions src/ArtemisNetCoreClient/QueueConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public class QueueConfiguration
{
public required string Address { get; init; }
public required string Name { get; init; }
public required RoutingType RoutingType { get; init; }
}
14 changes: 13 additions & 1 deletion src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task<AddressInfo> GetAddressInfo(string address, CancellationToken
RoutingTypes = GetRoutingTypes(response).ToArray(),
};
}

private static IEnumerable<RoutingType> GetRoutingTypes(SessionBindingQueryResponseMessageV5 sessionBindingQueryResponseMessageV5)
{
if (sessionBindingQueryResponseMessageV5.SupportsAnycast)
Expand All @@ -80,6 +80,18 @@ private static IEnumerable<RoutingType> GetRoutingTypes(SessionBindingQueryRespo
}
}

public async Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken)
{
var createQueueMessage = new CreateQueueMessageV2
{
RequiresResponse = true,
Address = queueConfiguration.Address,
QueueName = queueConfiguration.Name,
RoutingType = queueConfiguration.RoutingType
};
_ = await SendBlockingAsync<CreateQueueMessageV2, NullResponse>(createQueueMessage, cancellationToken);
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
Expand Down
26 changes: 26 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,30 @@ public async Task should_create_address_with_selected_routing_type(RoutingType[]
Assert.That(addressInfo.Name, Is.EqualTo(addressName));
CollectionAssert.AreEqual(routingTypes, addressInfo.RoutingTypes);
}

[TestCase(RoutingType.Anycast)]
[TestCase(RoutingType.Multicast)]
public async Task should_create_queue_with_selected_routing_type(RoutingType routingType)
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
await session.CreateAddress(addressName, new[] { routingType }, default);

// Act
var queueName = $"{Guid.NewGuid().ToString()}";
await session.CreateQueue(new QueueConfiguration
{
Address = addressName,
Name = queueName,
RoutingType = routingType
}, default);
}
}
Loading