diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs index accc8d2..6e58fcc 100644 --- a/src/ArtemisNetCoreClient/ISession.cs +++ b/src/ArtemisNetCoreClient/ISession.cs @@ -1,14 +1,11 @@ -using System.Net.Sockets; - namespace ActiveMQ.Artemis.Core.Client; public interface ISession : IAsyncDisposable; -internal class Session(Socket socket) : ISession +internal class Session(Transport socket) : ISession { - public ValueTask DisposeAsync() + public async ValueTask DisposeAsync() { - socket.Dispose(); - return ValueTask.CompletedTask; + await socket.DisposeAsync().ConfigureAwait(false); } } \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/SessionFactory.cs b/src/ArtemisNetCoreClient/SessionFactory.cs index a425180..532f9d0 100644 --- a/src/ArtemisNetCoreClient/SessionFactory.cs +++ b/src/ArtemisNetCoreClient/SessionFactory.cs @@ -1,5 +1,6 @@ using System.Net; using System.Net.Sockets; +using ActiveMQ.Artemis.Core.Client.Framing; namespace ActiveMQ.Artemis.Core.Client; @@ -41,7 +42,41 @@ public async Task CreateAsync(Endpoint endpoint, CancellationToken can { throw exception ?? new SocketException((int)SocketError.AddressNotAvailable); } + + var createSessionMessageV2 = new CreateSessionMessageV2 + { + Name = Guid.NewGuid().ToString(), + SessionChannelId = 1, + Version = 135, + Username = endpoint.User, + Password = endpoint.Password, + MinLargeMessageSize = 100 * 1024, + Xa = false, + AutoCommitSends = true, + AutoCommitAcks = true, + PreAcknowledge = false, + WindowSize = -1, + DefaultAddress = null, + ClientId = null, + }; + + // var byteBuffer = new ByteBuffer(); + // Codec.Encode(byteBuffer, createSessionMessageV2, 1); + // _ = await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken); + + var transport = new Transport(socket); + + await transport.SendAsync(createSessionMessageV2, cancellationToken); - return new Session(socket); + var receivedPacket = await transport.ReceiveAsync(cancellationToken); + + if (receivedPacket is CreateSessionResponseMessage) + { + return new Session(transport); + } + else + { + throw new InvalidOperationException("Received invalid response from the broker"); + } } -} +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Transport.cs b/src/ArtemisNetCoreClient/Transport.cs new file mode 100644 index 0000000..dc2eb47 --- /dev/null +++ b/src/ArtemisNetCoreClient/Transport.cs @@ -0,0 +1,38 @@ +using System.Net.Sockets; +using ActiveMQ.Artemis.Core.Client.Framing; + +namespace ActiveMQ.Artemis.Core.Client; + +internal class Transport(Socket socket) : IAsyncDisposable +{ + public async Task SendAsync(Packet packet, CancellationToken cancellationToken) + { + var byteBuffer = new ByteBuffer(); + Codec.Encode(byteBuffer, packet, 1); + await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false); + } + + public async Task ReceiveAsync(CancellationToken cancellationToken) + { + var receiveBuffer = new byte[sizeof(int)]; + while (0 == await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false)) + { + } + + var size = new ByteBuffer(receiveBuffer).ReadInt(); + + var buffer = new byte[size]; + _ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false); + + var payloadBuffer = new ByteBuffer(buffer); + + var (packet, _) = Codec.Decode(payloadBuffer); + return packet; + } + + public ValueTask DisposeAsync() + { + socket.Dispose(); + return ValueTask.CompletedTask; + } +} \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs new file mode 100644 index 0000000..eb76b69 --- /dev/null +++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs @@ -0,0 +1,23 @@ +namespace ActiveMQ.Artemis.Core.Client.Tests; + +public class SessionTests +{ + [Test] + public async Task should_establish_session() + { + // Arrange + var connectionFactory = new SessionFactory(); + + // Act + var session = await connectionFactory.CreateAsync(new Endpoint + { + Host = "localhost", + Port = 5445, + User = "artemis", + Password = "artemis" + }); + + // Assert + Assert.IsNotNull(session); + } +} \ No newline at end of file