From 12e2b1f919ae07ae40aab16f6b78e71549fb47a3 Mon Sep 17 00:00:00 2001 From: Havret Date: Sat, 16 Mar 2024 23:01:57 +0100 Subject: [PATCH] Clean up session --- src/ArtemisNetCoreClient/Framing/Codec.cs | 1 + .../Framing/CreateSessionResponseMessage.cs | 5 -- .../Framing/NullResponse.cs | 11 +-- src/ArtemisNetCoreClient/Framing/Packet.cs | 15 +++- .../Framing/SessionCloseMessage.cs | 6 ++ .../Framing/SessionStop.cs | 8 -- src/ArtemisNetCoreClient/ISession.cs | 41 +--------- src/ArtemisNetCoreClient/Session.cs | 75 +++++++++++++++++++ .../SessionTests.cs | 1 + 9 files changed, 98 insertions(+), 65 deletions(-) create mode 100644 src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs create mode 100644 src/ArtemisNetCoreClient/Session.cs diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index 3a6bf01..4803026 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -11,6 +11,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) CreateSessionMessageV2 => CreateSessionMessageV2.Type, SessionStart => SessionStart.Type, SessionStop => SessionStop.Type, + SessionCloseMessage => SessionCloseMessage.Type, _ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding") }; buffer.WriteByte(type); diff --git a/src/ArtemisNetCoreClient/Framing/CreateSessionResponseMessage.cs b/src/ArtemisNetCoreClient/Framing/CreateSessionResponseMessage.cs index 997820d..d2a6828 100644 --- a/src/ArtemisNetCoreClient/Framing/CreateSessionResponseMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/CreateSessionResponseMessage.cs @@ -4,11 +4,6 @@ internal class CreateSessionResponseMessage : Packet { public const byte Type = 31; public int ServerVersion { get; private set; } - - public override void Encode(ByteBuffer buffer) - { - throw new NotImplementedException(); - } public override void Decode(ByteBuffer buffer) { diff --git a/src/ArtemisNetCoreClient/Framing/NullResponse.cs b/src/ArtemisNetCoreClient/Framing/NullResponse.cs index 686af77..f623293 100644 --- a/src/ArtemisNetCoreClient/Framing/NullResponse.cs +++ b/src/ArtemisNetCoreClient/Framing/NullResponse.cs @@ -2,13 +2,6 @@ namespace ActiveMQ.Artemis.Core.Client.Framing; internal class NullResponse : Packet { - public const byte Type = 68; - - public override void Encode(ByteBuffer buffer) - { - } - - public override void Decode(ByteBuffer buffer) - { - } + public const byte Type = 21; + public override bool IsResponse => true; } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/Packet.cs b/src/ArtemisNetCoreClient/Framing/Packet.cs index 1913726..efbac91 100644 --- a/src/ArtemisNetCoreClient/Framing/Packet.cs +++ b/src/ArtemisNetCoreClient/Framing/Packet.cs @@ -2,7 +2,16 @@ namespace ActiveMQ.Artemis.Core.Client.Framing; internal abstract class Packet { - public abstract void Encode(ByteBuffer buffer); - - public abstract void Decode(ByteBuffer buffer); + public virtual long CorrelationId => -1; + public virtual bool IsResponse => false; + + public virtual void Encode(ByteBuffer buffer) + { + + } + + public virtual void Decode(ByteBuffer buffer) + { + + } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs new file mode 100644 index 0000000..f197b04 --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs @@ -0,0 +1,6 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionCloseMessage : Packet +{ + public const byte Type = 69; +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionStop.cs b/src/ArtemisNetCoreClient/Framing/SessionStop.cs index 9338cd6..8bea849 100644 --- a/src/ArtemisNetCoreClient/Framing/SessionStop.cs +++ b/src/ArtemisNetCoreClient/Framing/SessionStop.cs @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing; internal class SessionStop : Packet { public const byte Type = 68; - - public override void Encode(ByteBuffer buffer) - { - } - - public override void Decode(ByteBuffer buffer) - { - } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index ffb6569..bd4923f 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -1,42 +1,3 @@ -using System.Collections.Concurrent; -using ActiveMQ.Artemis.Core.Client.Framing; - namespace ActiveMQ.Artemis.Core.Client; -public interface ISession : IAsyncDisposable; - -internal class Session : ISession -{ - private readonly Transport _transport; - - private ConcurrentDictionary> _completionSources = new(); - - public Session(Transport transport) - { - _transport = transport; - - _ = Task.Run(async () => - { - while (true) - { - var packet = await _transport.ReceiveAsync(default); - - } - }); - } - - public async ValueTask DisposeAsync() - { - - await _transport.SendAsync(new SessionStop(), ChannelId, default); - - await _transport.DisposeAsync().ConfigureAwait(false); - } - - public long ChannelId { get; init; } - - public async Task StartAsync(CancellationToken cancellationToken) - { - await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken); - } -} \ No newline at end of file +public interface ISession : IAsyncDisposable; \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs new file mode 100644 index 0000000..daa7e9e --- /dev/null +++ b/src/ArtemisNetCoreClient/Session.cs @@ -0,0 +1,75 @@ +using System.Collections.Concurrent; +using ActiveMQ.Artemis.Core.Client.Framing; + +namespace ActiveMQ.Artemis.Core.Client; + +internal class Session : ISession +{ + private readonly Transport _transport; + + private readonly ConcurrentDictionary> _completionSources = new(); + + public Session(Transport transport) + { + _transport = transport; + + // TODO: Clean up while loop on close + _ = Task.Run(async () => + { + while (true) + { + try + { + var packet = await _transport.ReceiveAsync(default); + if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs)) + { + tcs.TrySetResult(packet); + } + else + { + // TODO: Handle + } + } + catch (Exception e) + { + // TODO: Handle exception + Console.WriteLine(e); + } + } + }); + } + + public async ValueTask DisposeAsync() + { + _ = await SendBlockingAsync(new SessionStop(), default); + _ = await SendBlockingAsync(new SessionCloseMessage(), default); + await _transport.DisposeAsync().ConfigureAwait(false); + } + + private async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet + { + var tcs = new TaskCompletionSource(); + + // TODO: Handle scenario when we cannot CorrelationId + _ = _completionSources.TryAdd(request.CorrelationId, tcs); + + await _transport.SendAsync(request, ChannelId, cancellationToken); + var responsePacket = await tcs.Task; + if (responsePacket is TResponse response) + { + return response; + } + else + { + // TODO: Handle gracefully + throw new ArgumentException($"Expected response {typeof(TResponse).Name} but got {responsePacket.GetType().Name}"); + } + } + + public long ChannelId { get; init; } + + public async Task StartAsync(CancellationToken cancellationToken) + { + await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken); + } +} \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs index eb76b69..6bd76e9 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs @@ -19,5 +19,6 @@ public async Task should_establish_session() // Assert Assert.IsNotNull(session); + await session.DisposeAsync(); } } \ No newline at end of file