diff --git a/src/ArtemisNetCoreClient/Consumer.cs b/src/ArtemisNetCoreClient/Consumer.cs index 08718fd..027c165 100644 --- a/src/ArtemisNetCoreClient/Consumer.cs +++ b/src/ArtemisNetCoreClient/Consumer.cs @@ -23,22 +23,18 @@ public Consumer(Session session) _writer = channel.Writer; // TODO: should this really be fire and forget? - _ = session.SendAsync(new SessionConsumerFlowCreditMessage - { - ConsumerId = ConsumerId, - Credits = 100 - }, default); + // _ = session.SendAsync(new SessionConsumerFlowCreditMessage + // { + // ConsumerId = ConsumerId, + // Credits = 100 + // }, default); } public required long ConsumerId { get; init; } - + public async ValueTask DisposeAsync() { - var request = new SessionConsumerCloseMessage - { - ConsumerId = ConsumerId - }; - _ = await _session.SendBlockingAsync(request, default); + await _session.CloseConsumer(ConsumerId); } public async ValueTask ReceiveAsync(CancellationToken cancellationToken) diff --git a/src/ArtemisNetCoreClient/Framing/Codec.cs b/src/ArtemisNetCoreClient/Framing/Codec.cs index 5c66100..89a6903 100644 --- a/src/ArtemisNetCoreClient/Framing/Codec.cs +++ b/src/ArtemisNetCoreClient/Framing/Codec.cs @@ -16,8 +16,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId) SessionBindingQueryMessage2 => SessionBindingQueryMessage2.Type, CreateQueueMessageV2 => CreateQueueMessageV2.Type, SessionQueueQueryMessage2 => SessionQueueQueryMessage2.Type, - SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type, - SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type, + SessionCreateConsumerMessage2 => SessionCreateConsumerMessage2.Type, + SessionConsumerCloseMessage2 => SessionConsumerCloseMessage2.Type, CreateProducerMessage => CreateProducerMessage.Type, RemoveProducerMessage => RemoveProducerMessage.Type, SessionSendMessageV3 => SessionSendMessageV3.Type, diff --git a/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs deleted file mode 100644 index 7772bcd..0000000 --- a/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace ActiveMQ.Artemis.Core.Client.Framing; - -internal class SessionConsumerCloseMessage : Packet -{ - public const byte Type = 74; - - public long ConsumerId { get; set; } - - public override void Encode(ByteBuffer buffer) - { - buffer.WriteLong(ConsumerId); - } - - public override void Decode(ByteBuffer buffer) - { - } -} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage2.cs b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage2.cs new file mode 100644 index 0000000..411b5ab --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage2.cs @@ -0,0 +1,32 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionConsumerCloseMessage2 : Packet +{ + public const byte Type = 74; + + public long ConsumerId { get; set; } + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteLong(ConsumerId); + } + + public override void Decode(ByteBuffer buffer) + { + } +} + +internal readonly struct SessionConsumerCloseMessage : IOutgoingPacket +{ + public PacketType PacketType => PacketType.SessionConsumerCloseMessage; + public required long ConsumerId { get; init; } + public int GetRequiredBufferSize() + { + return sizeof(long); + } + + public int Encode(Span buffer) + { + return ArtemisBinaryConverter.WriteInt64(ref buffer.GetReference(), ConsumerId); + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs deleted file mode 100644 index 939e694..0000000 --- a/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace ActiveMQ.Artemis.Core.Client.Framing; - -internal class SessionCreateConsumerMessage : Packet -{ - public const byte Type = 40; - - public required long Id { get; init; } - public required string QueueName { get; init; } - public string? FilterString { get; init; } - public required int Priority { get; init; } - public required bool BrowseOnly { get; init; } - public required bool RequiresResponse { get; init; } - - public override void Encode(ByteBuffer buffer) - { - buffer.WriteLong(Id); - buffer.WriteByteString(QueueName); - buffer.WriteNullableByteString(FilterString); - buffer.WriteBool(BrowseOnly); - buffer.WriteBool(RequiresResponse); - buffer.WriteInt(Priority); - } - - public override void Decode(ByteBuffer buffer) - { - } -} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage2.cs b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage2.cs new file mode 100644 index 0000000..d3596ea --- /dev/null +++ b/src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage2.cs @@ -0,0 +1,68 @@ +namespace ActiveMQ.Artemis.Core.Client.Framing; + +internal class SessionCreateConsumerMessage2 : Packet +{ + public const byte Type = 40; + + public required long Id { get; init; } + public required string QueueName { get; init; } + public string? FilterString { get; init; } + public required int Priority { get; init; } + public required bool BrowseOnly { get; init; } + public required bool RequiresResponse { get; init; } + + public override void Encode(ByteBuffer buffer) + { + buffer.WriteLong(Id); + buffer.WriteByteString(QueueName); + buffer.WriteNullableByteString(FilterString); + buffer.WriteBool(BrowseOnly); + buffer.WriteBool(RequiresResponse); + buffer.WriteInt(Priority); + } + + public override void Decode(ByteBuffer buffer) + { + } +} + +internal readonly struct SessionCreateConsumerMessage : IOutgoingPacket +{ + public PacketType PacketType => PacketType.SessionCreateConsumerMessage; + + public required long Id { get; init; } + public required string QueueName { get; init; } + public required string? FilterString { get; init; } + public required bool BrowseOnly { get; init; } + public required bool RequiresResponse { get; init; } + public required int Priority { get; init; } + + + public int GetRequiredBufferSize() + { + int byteCount = 0; + + byteCount += sizeof(long); // Id + byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(QueueName); + byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(FilterString); + byteCount += sizeof(bool); // BrowseOnly + byteCount += sizeof(bool); // RequiresResponse + byteCount += sizeof(int); // Priority + + return byteCount; + } + + public int Encode(Span buffer) + { + var offset = 0; + + offset += ArtemisBinaryConverter.WriteInt64(ref buffer.GetOffset(offset), Id); + offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), QueueName); + offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), FilterString); + offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), BrowseOnly); + offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), RequiresResponse); + offset += ArtemisBinaryConverter.WriteInt32(ref buffer.GetOffset(offset), Priority); + + return offset; + } +} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/Session.cs b/src/ArtemisNetCoreClient/Session.cs index 83da879..8a1818d 100644 --- a/src/ArtemisNetCoreClient/Session.cs +++ b/src/ArtemisNetCoreClient/Session.cs @@ -180,7 +180,7 @@ public async Task CreateQueueAsync(QueueConfiguration queueConfiguration, Cancel _connection.Send(ref request, ChannelId); await tcs.Task; } - catch (Exception e) + catch (Exception) { _completionSources2.TryRemove(-1, out _); throw; @@ -243,17 +243,68 @@ public async Task CreateConsumerAsync(ConsumerConfiguration consumerC QueueName = consumerConfiguration.QueueName, Priority = 0, BrowseOnly = false, - RequiresResponse = true + RequiresResponse = true, + FilterString = null }; - _ = await SendBlockingAsync(request, cancellationToken); - var consumer = new Consumer(this) + try + { + await _lock.WaitAsync(cancellationToken); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = _completionSources2.TryAdd(-1, tcs); + _connection.Send(ref request, ChannelId); + var result = await tcs.Task; + if (result is QueueInfo) + { + var consumer = new Consumer(this) + { + ConsumerId = request.Id + }; + // TODO: We should remove consumer from this dictionary on dispose + _consumers.TryAdd(request.Id, consumer); + + return consumer; + } + else + { + // TODO: Handle scneario when we cannot create consumer + return null!; + } + } + catch (Exception) + { + _completionSources2.TryRemove(-1, out _); + throw; + } + finally { - ConsumerId = request.Id + _lock.Release(); + } + } + + internal async Task CloseConsumer(long consumerId) + { + var request = new SessionConsumerCloseMessage + { + ConsumerId = consumerId }; - - // TODO: We should remove consumer from this dictionary on dispose - _consumers.TryAdd(request.Id, consumer); - return consumer; + try + { + await _lock.WaitAsync(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = _completionSources2.TryAdd(-1, tcs); + _connection.Send(ref request, ChannelId); + await tcs.Task; + _consumers.TryRemove(consumerId, out _); + } + catch (Exception) + { + _completionSources2.TryRemove(-1, out _); + throw; + } + finally + { + _lock.Release(); + } } public async Task CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken) @@ -402,7 +453,6 @@ public void OnPacket(ref readonly InboundPacket packet) } break; - } default: { diff --git a/src/ArtemisNetCoreClient/Transport2.cs b/src/ArtemisNetCoreClient/Transport2.cs index d2f817e..41ee230 100644 --- a/src/ArtemisNetCoreClient/Transport2.cs +++ b/src/ArtemisNetCoreClient/Transport2.cs @@ -152,5 +152,7 @@ internal enum PacketType : byte SessionBindingQueryResponseMessage = unchecked((byte) -22), CreateQueueMessage = unchecked((byte) -12), SessionQueueQueryMessage = 45, - SessionQueueQueryResponseMessage = unchecked((byte) -14) + SessionQueueQueryResponseMessage = unchecked((byte) -14), + SessionCreateConsumerMessage = 40, + SessionConsumerCloseMessage = 74, } \ No newline at end of file diff --git a/test/ArtemisNetCoreClient.Tests/SessionSpec.cs b/test/ArtemisNetCoreClient.Tests/SessionSpec.cs index 7ee75fe..98cc988 100644 --- a/test/ArtemisNetCoreClient.Tests/SessionSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/SessionSpec.cs @@ -32,7 +32,7 @@ public async Task Should_create_address_with_selected_routing_type(RoutingType[] [Theory] [InlineData(RoutingType.Anycast)] [InlineData(RoutingType.Multicast)] - public async Task should_create_queue_with_selected_routing_type(RoutingType routingType) + public async Task Should_create_queue_with_selected_routing_type(RoutingType routingType) { // Arrange await using var testFixture = await TestFixture.CreateAsync(testOutputHelper); @@ -92,16 +92,17 @@ public async Task should_not_return_queue_info_when_queue_does_not_exist() Assert.Null(queueInfo); } - [Fact(Skip = "Temporarily disabled")] + [Fact] public async Task should_create_and_dispose_consumer() { // Arrange await using var testFixture = await TestFixture.CreateAsync(testOutputHelper); - await using var session = await testFixture.CreateSessionAsync(); + await using var connection = await testFixture.CreateConnectionAsync(); + await using var session = await connection.CreateSessionAsync(testFixture.CancellationToken); var addressName = Guid.NewGuid().ToString(); - await session.CreateAddressAsync(addressName, new [] { RoutingType.Multicast }, testFixture.CancellationToken); + await session.CreateAddressAsync(addressName, [RoutingType.Multicast], testFixture.CancellationToken); var queueName = Guid.NewGuid().ToString(); await session.CreateQueueAsync(new QueueConfiguration