Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up session #28

Merged
merged 1 commit into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
CreateSessionMessageV2 => CreateSessionMessageV2.Type,
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
SessionCloseMessage => SessionCloseMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ internal class CreateSessionResponseMessage : Packet
{
public const byte Type = 31;
public int ServerVersion { get; private set; }

public override void Encode(ByteBuffer buffer)
{
throw new NotImplementedException();
}

public override void Decode(ByteBuffer buffer)
{
Expand Down
11 changes: 2 additions & 9 deletions src/ArtemisNetCoreClient/Framing/NullResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class NullResponse : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
public const byte Type = 21;
public override bool IsResponse => true;
}
15 changes: 12 additions & 3 deletions src/ArtemisNetCoreClient/Framing/Packet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal abstract class Packet
{
public abstract void Encode(ByteBuffer buffer);

public abstract void Decode(ByteBuffer buffer);
public virtual long CorrelationId => -1;
public virtual bool IsResponse => false;

public virtual void Encode(ByteBuffer buffer)
{

}

public virtual void Decode(ByteBuffer buffer)
{

}
}
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionCloseMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionCloseMessage : Packet
{
public const byte Type = 69;
}
8 changes: 0 additions & 8 deletions src/ArtemisNetCoreClient/Framing/SessionStop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;
internal class SessionStop : Packet
{
public const byte Type = 68;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
41 changes: 1 addition & 40 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,3 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;

internal class Session : ISession
{
private readonly Transport _transport;

private ConcurrentDictionary<long, TaskCompletionSource<Packet>> _completionSources = new();

public Session(Transport transport)
{
_transport = transport;

_ = Task.Run(async () =>
{
while (true)
{
var packet = await _transport.ReceiveAsync(default);

}
});
}

public async ValueTask DisposeAsync()
{

await _transport.SendAsync(new SessionStop(), ChannelId, default);

await _transport.DisposeAsync().ConfigureAwait(false);
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
{
await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken);
}
}
public interface ISession : IAsyncDisposable;
75 changes: 75 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System.Collections.Concurrent;
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

internal class Session : ISession
{
private readonly Transport _transport;

private readonly ConcurrentDictionary<long, TaskCompletionSource<Packet>> _completionSources = new();

public Session(Transport transport)
{
_transport = transport;

// TODO: Clean up while loop on close
_ = Task.Run(async () =>
{
while (true)
{
try
{
var packet = await _transport.ReceiveAsync(default);
if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
{
tcs.TrySetResult(packet);
}
else
{
// TODO: Handle
}
}
catch (Exception e)
{
// TODO: Handle exception
Console.WriteLine(e);
}
}
});
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(), default);
await _transport.DisposeAsync().ConfigureAwait(false);
}

private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource<Packet>();

// TODO: Handle scenario when we cannot CorrelationId
_ = _completionSources.TryAdd(request.CorrelationId, tcs);

await _transport.SendAsync(request, ChannelId, cancellationToken);
var responsePacket = await tcs.Task;
if (responsePacket is TResponse response)
{
return response;
}
else
{
// TODO: Handle gracefully
throw new ArgumentException($"Expected response {typeof(TResponse).Name} but got {responsePacket.GetType().Name}");
}
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
{
await _transport.SendAsync(new SessionStart(), ChannelId, cancellationToken);
}
}
1 change: 1 addition & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public async Task should_establish_session()

// Assert
Assert.IsNotNull(session);
await session.DisposeAsync();
}
}
Loading