Skip to content

Commit

Permalink
Query Address Info
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 23, 2024
1 parent 7257a29 commit e30e70c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 28 deletions.
10 changes: 10 additions & 0 deletions src/ArtemisNetCoreClient/AddressInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public class AddressInfo
{
public string Name { get; init; }

Check warning on line 7 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Name' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'Name' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public IReadOnlyList<string> QueueNames { get; init; }

Check warning on line 8 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'QueueNames' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 8 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'QueueNames' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public IReadOnlyList<RoutingType> RoutingTypes { get; init; }

Check warning on line 9 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'RoutingTypes' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 9 in src/ArtemisNetCoreClient/AddressInfo.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'RoutingTypes' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
}
8 changes: 7 additions & 1 deletion src/ArtemisNetCoreClient/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ByteBuffer(byte[] payload)
{
_memoryStream = new MemoryStream(payload, writable: false);
}

public ReadOnlyMemory<byte> GetBuffer()
{
_memoryStream.TryGetBuffer(out var buffer);
Expand Down Expand Up @@ -295,6 +295,12 @@ private string ReadAsShorts(int length)
return value == DataConstants.NotNull ? ReadString() : null;
}

public string? ReadNullableStringAsBytes()
{
var value = _memoryStream.ReadByte();
return value == DataConstants.NotNull ? ReadStringAsBytes() : null;
}

public void WriteSize()
{
_memoryStream.TryGetBuffer(out var buffer);
Expand Down
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static (Packet packet, long channelId) Decode(ByteBuffer buffer)
{
CreateSessionResponseMessage.Type => new CreateSessionResponseMessage(),
NullResponse.Type => new NullResponse(),
SessionBindingQueryResponseMessageV5.Type => new SessionBindingQueryResponseMessageV5(),
_ => throw new ArgumentOutOfRangeException($"Type {type} is not supported for decoding")
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionBindingQueryResponseMessageV5 : Packet
{
public const byte Type = unchecked((byte) -22);

public override bool IsResponse => true;

public bool Exists { get; set; }
public IReadOnlyList<string> QueueNames { get; set; }

Check warning on line 10 in src/ArtemisNetCoreClient/Framing/SessionBindingQueryResponseMessageV5.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'QueueNames' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 10 in src/ArtemisNetCoreClient/Framing/SessionBindingQueryResponseMessageV5.cs

View workflow job for this annotation

GitHub Actions / linux

Non-nullable property 'QueueNames' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public bool AutoCreateQueues { get; set; }
Expand All @@ -12,8 +16,8 @@ internal class SessionBindingQueryResponseMessageV5 : Packet
public bool? DefaultLastValue { get; set; }
public string? DefaultLastValueKey { get; set; }
public bool? DefaultNonDestructive { get; set; }
public int DefaultConsumersBeforeDispatch { get; set; }
public long DefaultDelayBeforeDispatch { get; set; }
public int? DefaultConsumersBeforeDispatch { get; set; }
public long? DefaultDelayBeforeDispatch { get; set; }
public bool SupportsMulticast { get; set; }
public bool SupportsAnycast { get; set; }

Expand All @@ -26,11 +30,12 @@ public override void Decode(ByteBuffer buffer)
{
Exists = buffer.ReadBool();
var numQueues = buffer.ReadInt();
var queueNames = new string [numQueues];
var queueNames = new string[numQueues];
for (int i = 0; i < numQueues; i++)
{
queueNames[i] = buffer.ReadStringAsBytes();
}

QueueNames = queueNames;
AutoCreateQueues = buffer.ReadBool();
AutoCreateAddresses = buffer.ReadBool();
Expand All @@ -40,5 +45,9 @@ public override void Decode(ByteBuffer buffer)
DefaultLastValue = buffer.ReadNullableBool();
DefaultLastValueKey = buffer.ReadNullableStringAsBytes();
DefaultNonDestructive = buffer.ReadNullableBool();
DefaultConsumersBeforeDispatch = buffer.ReadNullableInt();
DefaultDelayBeforeDispatch = buffer.ReadNullableLong();
SupportsMulticast = buffer.ReadBool();
SupportsAnycast = buffer.ReadBool();
}
}
3 changes: 2 additions & 1 deletion src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ namespace ActiveMQ.Artemis.Core.Client;

public interface ISession : IAsyncDisposable
{
public Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken);
Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, CancellationToken cancellationToken);
Task<AddressInfo> GetAddressInfo(string address, CancellationToken cancellationToken);
}
35 changes: 32 additions & 3 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,46 @@ public Session(Transport transport)
});
}

public async Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, bool autoCreated, CancellationToken cancellationToken)
public async Task CreateAddress(string address, IEnumerable<RoutingType> routingTypes, CancellationToken cancellationToken)
{
var createAddressMessage = new CreateAddressMessage
{
Address = address,
RoutingTypes = routingTypes.ToArray(),
AutoCreated = autoCreated,
AutoCreated = false,
RequiresResponse = true
};
_ = await SendBlockingAsync<CreateAddressMessage, NullResponse>(createAddressMessage, cancellationToken);
}

public async Task<AddressInfo> GetAddressInfo(string address, CancellationToken cancellationToken)
{
var request = new SessionBindingQueryMessage
{
Address = address
};
var response = await SendBlockingAsync<SessionBindingQueryMessage, SessionBindingQueryResponseMessageV5>(request, cancellationToken);

return new AddressInfo
{
Name = address,
QueueNames = response.QueueNames,
RoutingTypes = GetRoutingTypes(response).ToArray(),
};
}

private static IEnumerable<RoutingType> GetRoutingTypes(SessionBindingQueryResponseMessageV5 sessionBindingQueryResponseMessageV5)
{
if (sessionBindingQueryResponseMessageV5.SupportsAnycast)
{
yield return RoutingType.Anycast;
}

if (sessionBindingQueryResponseMessageV5.SupportsMulticast)
{
yield return RoutingType.Multicast;
}
}

public async ValueTask DisposeAsync()
{
Expand All @@ -62,7 +91,7 @@ private async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest re
{
var tcs = new TaskCompletionSource<Packet>();

// TODO: Handle scenario when we cannot CorrelationId
// TODO: Handle scenario when we cannot add request for this CorrelationId, because there is already another pending request
_ = _completionSources.TryAdd(request.CorrelationId, tcs);

await _transport.SendAsync(request, ChannelId, cancellationToken);
Expand Down
26 changes: 6 additions & 20 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,11 @@ public async Task should_create_address_with_selected_routing_type(RoutingType[]

// Act
var addressName = $"{Guid.NewGuid().ToString()}-{string.Join("-", routingTypes)}";
await session.CreateAddress(addressName, routingTypes, false, default);
}

[TestCase(false)]
[TestCase(true)]
public async Task should_create_address_with_autoCreated_flag(bool autoCreated)
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});

// Act
var addressName = $"{Guid.NewGuid().ToString()}";
await session.CreateAddress(addressName, new[] { RoutingType.Multicast }, autoCreated: autoCreated, default);
await session.CreateAddress(addressName, routingTypes, default);

// Assert
var addressInfo = await session.GetAddressInfo(addressName, default);
Assert.That(addressInfo.Name, Is.EqualTo(addressName));
CollectionAssert.AreEqual(routingTypes, addressInfo.RoutingTypes);
}
}

0 comments on commit e30e70c

Please sign in to comment.