Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Address WIP #33

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ArtemisNetCoreClient.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=Anycast/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionStart => SessionStart.Type,
SessionStop => SessionStop.Type,
SessionCloseMessage => SessionCloseMessage.Type,
CreateAddressMessage => CreateAddressMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
22 changes: 22 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateAddressMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateAddressMessage : Packet
{
public const byte Type = unchecked((byte) -11);
public required string Address { get; init; }
public required RoutingType[] RoutingTypes { get; init; }
public required bool RequiresResponse { get; init; }
public required bool AutoCreated { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteString(Address);
buffer.WriteInt(RoutingTypes.Length);
foreach (var routingType in RoutingTypes)
{
buffer.WriteByte((byte) routingType);
}
buffer.WriteBool(RequiresResponse);
buffer.WriteBool(AutoCreated);
}
}
5 changes: 0 additions & 5 deletions src/ArtemisNetCoreClient/Framing/CreateSessionMessageV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,4 @@ public override void Encode(ByteBuffer buffer)
buffer.WriteNullableString(DefaultAddress);
buffer.WriteNullableString(ClientId);
}

public override void Decode(ByteBuffer buffer)
{
throw new NotImplementedException();
}
}
7 changes: 7 additions & 0 deletions src/ArtemisNetCoreClient/Framing/RoutingType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

public enum RoutingType : byte
{
Multicast = 0,
Anycast = 1
}
8 changes: 0 additions & 8 deletions src/ArtemisNetCoreClient/Framing/SessionStart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,4 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;
internal class SessionStart : Packet
{
public const byte Type = 67;

public override void Encode(ByteBuffer buffer)
{
}

public override void Decode(ByteBuffer buffer)
{
}
}
7 changes: 6 additions & 1 deletion src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable;
public interface ISession : IAsyncDisposable
{
public Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken);
}
22 changes: 17 additions & 5 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public Session(Transport transport)
try
{
var packet = await _transport.ReceiveAsync(default);
if (packet.IsResponse && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
if (packet is { IsResponse: true } && _completionSources.TryRemove(packet.CorrelationId, out var tcs))
{
tcs.TrySetResult(packet);
}
Expand All @@ -39,21 +39,33 @@ public Session(Transport transport)
});
}

public async Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken)
{
var createAddressMessage = new CreateAddressMessage
{
Address = address,
RoutingTypes = routingTypes.ToArray(),
AutoCreated = autoCreated,
RequiresResponse = true
};
_ = await SendBlockingAsync<CreateAddressMessage, NullResponse>(createAddressMessage, 11, cancellationToken);
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(), default);
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), ChannelId, default);
_ = await SendBlockingAsync<SessionCloseMessage, NullResponse>(new SessionCloseMessage(),ChannelId, default);
await _transport.DisposeAsync().ConfigureAwait(false);
}

private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest request, long channelId, CancellationToken cancellationToken) where TRequest : Packet
{
var tcs = new TaskCompletionSource<Packet>();

// TODO: Handle scenario when we cannot CorrelationId
_ = _completionSources.TryAdd(request.CorrelationId, tcs);

await _transport.SendAsync(request, ChannelId, cancellationToken);
await _transport.SendAsync(request, channelId, cancellationToken);
var responsePacket = await tcs.Task;
if (responsePacket is TResponse response)
{
Expand Down
6 changes: 5 additions & 1 deletion src/ArtemisNetCoreClient/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ public async Task SendAsync(Packet packet, long channelId, CancellationToken can
await socket.SendAsync(byteBuffer.GetBuffer(), cancellationToken).ConfigureAwait(false);
}

public async Task<Packet> ReceiveAsync(CancellationToken cancellationToken)
public async Task<Packet?> ReceiveAsync(CancellationToken cancellationToken)
{
var receiveBuffer = new byte[sizeof(int)];
await socket.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false);

var size = new ByteBuffer(receiveBuffer).ReadInt();
if (size == 0)
{
return null;
}

var buffer = new byte[size];
_ = await socket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
Expand Down
19 changes: 19 additions & 0 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class SessionTests
Expand All @@ -21,4 +23,21 @@ public async Task should_establish_session()
Assert.IsNotNull(session);
await session.DisposeAsync();
}

[Test, Ignore("WIP")]
public async Task should_create_address()
{
// Arrange
var connectionFactory = new SessionFactory();
var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});

// Act && Assert
await session.CreateAddress("myaddress", Enumerable.Empty<RoutingType>(), false, default);
}
}
Loading