diff --git a/ArtemisNetCoreClient.sln.DotSettings b/ArtemisNetCoreClient.sln.DotSettings
new file mode 100644
index 0000000..d469c32
--- /dev/null
+++ b/ArtemisNetCoreClient.sln.DotSettings
@@ -0,0 +1,2 @@
+
+ True
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs
index 4803026..9e11526 100644
--- a/src/ArtemisNetCoreClient/Framing/Codec.cs
+++ b/src/ArtemisNetCoreClient/Framing/Codec.cs
@@ -12,6 +12,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
SessionCloseMessage => SessionCloseMessage.Type,
+ CreateAddressMessage => CreateAddressMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
diff --git a/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs b/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs
new file mode 100644
index 0000000..cfdc045
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs
@@ -0,0 +1,22 @@
+namespace ActiveMQ.Artemis.Core.Client.Framing;
+
+internal class CreateAddressMessage : Packet
+{
+ public const byte Type = unchecked((byte) -11);
+ public required string Address { get; init; }
+ public required RoutingType[] RoutingTypes { get; init; }
+ public required bool RequiresResponse { get; init; }
+ public required bool AutoCreated { get; init; }
+
+ public override void Encode(ByteBuffer buffer)
+ {
+ buffer.WriteString(Address);
+ buffer.WriteInt(RoutingTypes.Length);
+ foreach (var routingType in RoutingTypes)
+ {
+ buffer.WriteByte((byte) routingType);
+ }
+ buffer.WriteBool(RequiresResponse);
+ buffer.WriteBool(AutoCreated);
+ }
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs b/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs
index 52f85c2..fa027e1 100644
--- a/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs
+++ b/src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs
@@ -33,9 +33,4 @@ public override void Encode(ByteBuffer buffer)
buffer.WriteNullableString(DefaultAddress);
buffer.WriteNullableString(ClientId);
}
-
- public override void Decode(ByteBuffer buffer)
- {
- throw new NotImplementedException();
- }
}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/RoutingType.cs b/src/ArtemisNetCoreClient/Framing/RoutingType.cs
new file mode 100644
index 0000000..58d3274
--- /dev/null
+++ b/src/ArtemisNetCoreClient/Framing/RoutingType.cs
@@ -0,0 +1,7 @@
+namespace ActiveMQ.Artemis.Core.Client.Framing;
+
+public enum RoutingType : byte
+{
+ Multicast = 0,
+ Anycast = 1
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Framing/SessionStart.cs b/src/ArtemisNetCoreClient/Framing/SessionStart.cs
index ae35ea8..a514c18 100644
--- a/src/ArtemisNetCoreClient/Framing/SessionStart.cs
+++ b/src/ArtemisNetCoreClient/Framing/SessionStart.cs
@@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;
internal class SessionStart : Packet
{
public const byte Type = 67;
-
- public override void Encode(ByteBuffer buffer)
- {
- }
-
- public override void Decode(ByteBuffer buffer)
- {
- }
}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/ISession.cs b/src/ArtemisNetCoreClient/ISession.cs
index bd4923f..a5f24a8 100644
--- a/src/ArtemisNetCoreClient/ISession.cs
+++ b/src/ArtemisNetCoreClient/ISession.cs
@@ -1,3 +1,8 @@
+using ActiveMQ.Artemis.Core.Client.Framing;
+
namespace ActiveMQ.Artemis.Core.Client;
-public interface ISession : IAsyncDisposable;
\ No newline at end of file
+public interface ISession : IAsyncDisposable
+{
+ public Task CreateAddress(string address, IEnumerable routingTypes, bool autoCreated, CancellationToken cancellationToken);
+}
\ No newline at end of file
diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs
index daa7e9e..1494b42 100644
--- a/src/ArtemisNetCoreClient/Session.cs
+++ b/src/ArtemisNetCoreClient/Session.cs
@@ -21,7 +21,7 @@ public Session(Transport transport)
try
{
var packet = await _transport.ReceiveAsync(default);
- if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
+ if (packet is { IsResponse: true } && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
{
tcs.TrySetResult(packet);
}
@@ -39,21 +39,33 @@ public Session(Transport transport)
});
}
+ public async Task CreateAddress(string address, IEnumerable routingTypes, bool autoCreated, CancellationToken cancellationToken)
+ {
+ var createAddressMessage = new CreateAddressMessage
+ {
+ Address = address,
+ RoutingTypes = routingTypes.ToArray(),
+ AutoCreated = autoCreated,
+ RequiresResponse = true
+ };
+ _ = await SendBlockingAsync(createAddressMessage, 11, cancellationToken);
+ }
+
public async ValueTask DisposeAsync()
{
- _ = await SendBlockingAsync(new SessionStop(), default);
- _ = await SendBlockingAsync(new SessionCloseMessage(), default);
+ _ = await SendBlockingAsync(new SessionStop(), ChannelId, default);
+ _ = await SendBlockingAsync(new SessionCloseMessage(),ChannelId, default);
await _transport.DisposeAsync().ConfigureAwait(false);
}
- private async Task SendBlockingAsync(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
+ private async Task SendBlockingAsync(TRequest request, long channelId, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource();
// TODO: Handle scenario when we cannot CorrelationId
_ = _completionSources.TryAdd(request.CorrelationId, tcs);
- await _transport.SendAsync(request, ChannelId, cancellationToken);
+ await _transport.SendAsync(request, channelId, cancellationToken);
var responsePacket = await tcs.Task;
if (responsePacket is TResponse response)
{
diff --git a/src/ArtemisNetCoreClient/Transport.cs b/src/ArtemisNetCoreClient/Transport.cs
index 2456e2e..c6f6d45 100644
--- a/src/ArtemisNetCoreClient/Transport.cs
+++ b/src/ArtemisNetCoreClient/Transport.cs
@@ -12,12 +12,16 @@ public async Task SendAsync(Packet packet, long channelId, CancellationToken can
await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false);
}
- public async Task ReceiveAsync(CancellationToken cancellationToken)
+ public async Task ReceiveAsync(CancellationToken cancellationToken)
{
var receiveBuffer = new byte[sizeof(int)];
await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false);
var size = new ByteBuffer(receiveBuffer).ReadInt();
+ if (size == 0)
+ {
+ return null;
+ }
var buffer = new byte[size];
_ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
diff --git a/test/ArtemisNetCoreClient.Tests/SessionTests.cs b/test/ArtemisNetCoreClient.Tests/SessionTests.cs
index 6bd76e9..750674b 100644
--- a/test/ArtemisNetCoreClient.Tests/SessionTests.cs
+++ b/test/ArtemisNetCoreClient.Tests/SessionTests.cs
@@ -1,3 +1,5 @@
+using ActiveMQ.Artemis.Core.Client.Framing;
+
namespace ActiveMQ.Artemis.Core.Client.Tests;
public class SessionTests
@@ -21,4 +23,21 @@ public async Task should_establish_session()
Assert.IsNotNull(session);
await session.DisposeAsync();
}
+
+ [Test, Ignore("WIP")]
+ public async Task should_create_address()
+ {
+ // Arrange
+ var connectionFactory = new SessionFactory();
+ var session = await connectionFactory.CreateAsync(new Endpoint
+ {
+ Host = "localhost",
+ Port = 5445,
+ User = "artemis",
+ Password = "artemis"
+ });
+
+ // Act && Assert
+ await session.CreateAddress("myaddress", Enumerable.Empty(), false, default);
+ }
}
\ No newline at end of file