From d879a46c93a763974d0f0dabdff0048f47966da0 Mon Sep 17 00:00:00 2001 From: Havret Date: Thu, 14 Mar 2024 23:26:09 +0100 Subject: [PATCH] Session start --- src/ArtemisNetCoreClient/Framing/Codec.cs | 3 ++ .../Framing/NullResponse.cs | 14 ++++++++ .../Framing/SessionStart.cs | 14 ++++++++ .../Framing/SessionStop.cs | 14 ++++++++ src/ArtemisNetCoreClient/ISession.cs | 35 +++++++++++++++++-- src/ArtemisNetCoreClient/SessionFactory.cs | 15 ++++---- src/ArtemisNetCoreClient/Transport.cs | 8 ++--- 7 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 src/ArtemisNetCoreClient/Framing/NullResponse.cs create mode 100644 src/ArtemisNetCoreClient/Framing/SessionStart.cs create mode 100644 src/ArtemisNetCoreClient/Framing/SessionStop.cs diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index c904e16..3a6bf01 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -9,6 +9,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) var type = packet switch { CreateSessionMessageV2 => CreateSessionMessageV2.Type, + SessionStart => SessionStart.Type, + SessionStop => SessionStop.Type, _ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding") }; buffer.WriteByte(type); @@ -27,6 +29,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer) Packet packet = type switch { CreateSessionResponseMessage.Type => new CreateSessionResponseMessage(), + NullResponse.Type => new NullResponse(), _ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding") }; diff --git a/src/ArtemisNetCoreClient/Framing/NullResponse.cs b/src/ArtemisNetCoreClient/Framing/NullResponse.cs new file mode 100644 index 0000000..686af77 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/NullResponse.cs @@ -0,0 +1,14 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class NullResponse : Packet +{ + public const byte Type = 68; + + public override void Encode(ByteBuffer buffer) + { + } + + public override void Decode(ByteBuffer buffer) + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionStart.cs b/src/ArtemisNetCoreClient/Framing/SessionStart.cs new file mode 100644 index 0000000..ae35ea8 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionStart.cs @@ -0,0 +1,14 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionStart : Packet +{ + public const byte Type = 67; + + public override void Encode(ByteBuffer buffer) + { + } + + public override void Decode(ByteBuffer buffer) + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionStop.cs b/src/ArtemisNetCoreClient/Framing/SessionStop.cs new file mode 100644 index 0000000..9338cd6 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionStop.cs @@ -0,0 +1,14 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionStop : Packet +{ + public const byte Type = 68; + + public override void Encode(ByteBuffer buffer) + { + } + + public override void Decode(ByteBuffer buffer) + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index 6e58fcc..ffb6569 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -1,11 +1,42 @@ +using System.Collections.Concurrent; +using ActiveMQ.Artemis.Core.Client.Framing; + namespace ActiveMQ.Artemis.Core.Client; public interface ISession : IAsyncDisposable; -internal class Session(Transport socket) : ISession +internal class Session : ISession { + private readonly Transport _transport; + + private ConcurrentDictionary> _completionSources = new(); + + public Session(Transport transport) + { + _transport = transport; + + _ = Task.Run(async () => + { + while (true) + { + var packet = await _transport.ReceiveAsync(default); + + } + }); + } + public async ValueTask DisposeAsync() { - await socket.DisposeAsync().ConfigureAwait(false); + + await _transport.SendAsync(new SessionStop(), ChannelId, default); + + await _transport.DisposeAsync().ConfigureAwait(false); + } + + public long ChannelId { get; init; } + + public async Task StartAsync(CancellationToken cancellationToken) + { + await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken); } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/SessionFactory.cs b/src/ArtemisNetCoreClient/SessionFactory.cs index 532f9d0..b13db5a 100644 --- a/src/ArtemisNetCoreClient/SessionFactory.cs +++ b/src/ArtemisNetCoreClient/SessionFactory.cs @@ -46,7 +46,7 @@ public async Task CreateAsync(Endpoint endpoint, CancellationToken can var createSessionMessageV2 = new CreateSessionMessageV2 { Name = Guid.NewGuid().ToString(), - SessionChannelId = 1, + SessionChannelId = 10, Version = 135, Username = endpoint.User, Password = endpoint.Password, @@ -59,20 +59,21 @@ public async Task CreateAsync(Endpoint endpoint, CancellationToken can DefaultAddress = null, ClientId = null, }; - - // var byteBuffer = new ByteBuffer(); - // Codec.Encode(byteBuffer, createSessionMessageV2, 1); - // _ = await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken); var transport = new Transport(socket); - await transport.SendAsync(createSessionMessageV2, cancellationToken); + await transport.SendAsync(createSessionMessageV2, 1, cancellationToken); var receivedPacket = await transport.ReceiveAsync(cancellationToken); if (receivedPacket is CreateSessionResponseMessage) { - return new Session(transport); + var session = new Session(transport) + { + ChannelId = createSessionMessageV2.SessionChannelId + }; + await session.StartAsync(cancellationToken); + return session; } else { diff --git a/src/ArtemisNetCoreClient/Transport.cs b/src/ArtemisNetCoreClient/Transport.cs index dc2eb47..2456e2e 100644 --- a/src/ArtemisNetCoreClient/Transport.cs +++ b/src/ArtemisNetCoreClient/Transport.cs @@ -5,19 +5,17 @@ namespace ActiveMQ.Artemis.Core.Client; internal class Transport(Socket socket) : IAsyncDisposable { - public async Task SendAsync(Packet packet, CancellationToken cancellationToken) + public async Task SendAsync(Packet packet, long channelId, CancellationToken cancellationToken) { var byteBuffer = new ByteBuffer(); - Codec.Encode(byteBuffer, packet, 1); + Codec.Encode(byteBuffer, packet, channelId); await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false); } public async Task ReceiveAsync(CancellationToken cancellationToken) { var receiveBuffer = new byte[sizeof(int)]; - while (0 == await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false)) - { - } + await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false); var size = new ByteBuffer(receiveBuffer).ReadInt();