From bfb0ab44e1aba4f74d60f4591f75256d8cfc65f5 Mon Sep 17 00:00:00 2001 From: Havret Date: Wed, 20 Mar 2024 23:41:56 +0100 Subject: [PATCH] Create Address WIP --- ArtemisNetCoreClient.sln.DotSettings | 2 ++ src/ArtemisNetCoreClient/Framing/Codec.cs | 1 + .../Framing/CreateAddressMessage.cs | 22 +++++++++++++++++++ .../Framing/CreateSessionMessageV2.cs | 5 ----- .../Framing/RoutingType.cs | 7 ++++++ .../Framing/SessionStart.cs | 8 ------- src/ArtemisNetCoreClient/ISession.cs | 7 +++++- src/ArtemisNetCoreClient/Session.cs | 22 ++++++++++++++----- src/ArtemisNetCoreClient/Transport.cs | 6 ++++- .../SessionTests.cs | 19 ++++++++++++++++ 10 files changed, 79 insertions(+), 20 deletions(-) create mode 100644 ArtemisNetCoreClient.sln.DotSettings create mode 100644 src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs create mode 100644 src/ArtemisNetCoreClient/Framing/RoutingType.cs diff --git a/ArtemisNetCoreClient.sln.DotSettings b/ArtemisNetCoreClient.sln.DotSettings new file mode 100644 index 0000000..d469c32 --- /dev/null +++ b/ArtemisNetCoreClient.sln.DotSettings @@ -0,0 +1,2 @@ + + True \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index 4803026..9e11526 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -12,6 +12,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) SessionStart => SessionStart.Type, SessionStop => SessionStop.Type, SessionCloseMessage => SessionCloseMessage.Type, + CreateAddressMessage => CreateAddressMessage.Type, _ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding") }; buffer.WriteByte(type); diff --git a/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs b/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs new file mode 100644 index 0000000..cfdc045 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs @@ -0,0 +1,22 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class CreateAddressMessage : Packet +{ + public const byte Type = unchecked((byte) -11); + public required string Address { get; init; } + public required RoutingType[] RoutingTypes { get; init; } + public required bool RequiresResponse { get; init; } + public required bool AutoCreated { get; init; } + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteString(Address); + buffer.WriteInt(RoutingTypes.Length); + foreach (var routingType in RoutingTypes) + { + buffer.WriteByte((byte) routingType); + } + buffer.WriteBool(RequiresResponse); + buffer.WriteBool(AutoCreated); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs b/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs index 52f85c2..fa027e1 100644 --- a/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs +++ b/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs @@ -33,9 +33,4 @@ public override void Encode(ByteBuffer buffer) buffer.WriteNullableString(DefaultAddress); buffer.WriteNullableString(ClientId); } - - public override void Decode(ByteBuffer buffer) - { - throw new NotImplementedException(); - } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/RoutingType.cs b/src/ArtemisNetCoreClient/Framing/RoutingType.cs new file mode 100644 index 0000000..58d3274 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/RoutingType.cs @@ -0,0 +1,7 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +public enum RoutingType : byte +{ + Multicast = 0, + Anycast = 1 +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionStart.cs b/src/ArtemisNetCoreClient/Framing/SessionStart.cs index ae35ea8..a514c18 100644 --- a/src/ArtemisNetCoreClient/Framing/SessionStart.cs +++ b/src/ArtemisNetCoreClient/Framing/SessionStart.cs @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing; internal class SessionStart : Packet { public const byte Type = 67; - - public override void Encode(ByteBuffer buffer) - { - } - - public override void Decode(ByteBuffer buffer) - { - } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index bd4923f..a5f24a8 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -1,3 +1,8 @@ +using ActiveMQ.Artemis.Core.Client.Framing; + namespace ActiveMQ.Artemis.Core.Client; -public interface ISession : IAsyncDisposable; \ No newline at end of file +public interface ISession : IAsyncDisposable +{ + public Task CreateAddress(string address, IEnumerable routingTypes, bool autoCreated, CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index daa7e9e..1494b42 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -21,7 +21,7 @@ public Session(Transport transport) try { var packet = await _transport.ReceiveAsync(default); - if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs)) + if (packet is { IsResponse: true } && _completionSources.TryRemove(packet.CorrelationId, out var tcs)) { tcs.TrySetResult(packet); } @@ -39,21 +39,33 @@ public Session(Transport transport) }); } + public async Task CreateAddress(string address, IEnumerable routingTypes, bool autoCreated, CancellationToken cancellationToken) + { + var createAddressMessage = new CreateAddressMessage + { + Address = address, + RoutingTypes = routingTypes.ToArray(), + AutoCreated = autoCreated, + RequiresResponse = true + }; + _ = await SendBlockingAsync(createAddressMessage, 11, cancellationToken); + } + public async ValueTask DisposeAsync() { - _ = await SendBlockingAsync(new SessionStop(), default); - _ = await SendBlockingAsync(new SessionCloseMessage(), default); + _ = await SendBlockingAsync(new SessionStop(), ChannelId, default); + _ = await SendBlockingAsync(new SessionCloseMessage(),ChannelId, default); await _transport.DisposeAsync().ConfigureAwait(false); } - private async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet + private async Task SendBlockingAsync(TRequest request, long channelId, CancellationToken cancellationToken) where TRequest : Packet { var tcs = new TaskCompletionSource(); // TODO: Handle scenario when we cannot CorrelationId _ = _completionSources.TryAdd(request.CorrelationId, tcs); - await _transport.SendAsync(request, ChannelId, cancellationToken); + await _transport.SendAsync(request, channelId, cancellationToken); var responsePacket = await tcs.Task; if (responsePacket is TResponse response) { diff --git a/src/ArtemisNetCoreClient/Transport.cs b/src/ArtemisNetCoreClient/Transport.cs index 2456e2e..c6f6d45 100644 --- a/src/ArtemisNetCoreClient/Transport.cs +++ b/src/ArtemisNetCoreClient/Transport.cs @@ -12,12 +12,16 @@ public async Task SendAsync(Packet packet, long channelId, CancellationToken can await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false); } - public async Task ReceiveAsync(CancellationToken cancellationToken) + public async Task ReceiveAsync(CancellationToken cancellationToken) { var receiveBuffer = new byte[sizeof(int)]; await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false); var size = new ByteBuffer(receiveBuffer).ReadInt(); + if (size == 0) + { + return null; + } var buffer = new byte[size]; _ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false); diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs index 6bd76e9..750674b 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs @@ -1,3 +1,5 @@ +using ActiveMQ.Artemis.Core.Client.Framing; + namespace ActiveMQ.Artemis.Core.Client.Tests; public class SessionTests @@ -21,4 +23,21 @@ public async Task should_establish_session() Assert.IsNotNull(session); await session.DisposeAsync(); } + + [Test, Ignore("WIP")] + public async Task should_create_address() + { + // Arrange + var connectionFactory = new SessionFactory(); + var session = await connectionFactory.CreateAsync(new Endpoint + { + Host = "localhost", + Port = 5445, + User = "artemis", + Password = "artemis" + }); + + // Act && Assert + await session.CreateAddress("myaddress", Enumerable.Empty(), false, default); + } } \ No newline at end of file