Skip to content

Commit

Permalink
Rework Create Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Apr 27, 2024
1 parent 17426c1 commit 0f8fc41
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 72 deletions.
18 changes: 7 additions & 11 deletions src/ArtemisNetCoreClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@ public Consumer(Session session)
_writer = channel.Writer;

// TODO: should this really be fire and forget?
_ = session.SendAsync(new SessionConsumerFlowCreditMessage
{
ConsumerId = ConsumerId,
Credits = 100
}, default);
// _ = session.SendAsync(new SessionConsumerFlowCreditMessage
// {
// ConsumerId = ConsumerId,
// Credits = 100
// }, default);
}

public required long ConsumerId { get; init; }

public async ValueTask DisposeAsync()
{
var request = new SessionConsumerCloseMessage
{
ConsumerId = ConsumerId
};
_ = await _session.SendBlockingAsync<SessionConsumerCloseMessage, NullResponse>(request, default);
await _session.CloseConsumer(ConsumerId);
}

public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionBindingQueryMessage2 => SessionBindingQueryMessage2.Type,
CreateQueueMessageV2 => CreateQueueMessageV2.Type,
SessionQueueQueryMessage2 => SessionQueueQueryMessage2.Type,
SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type,
SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type,
SessionCreateConsumerMessage2 => SessionCreateConsumerMessage2.Type,
SessionConsumerCloseMessage2 => SessionConsumerCloseMessage2.Type,
CreateProducerMessage => CreateProducerMessage.Type,
RemoveProducerMessage => RemoveProducerMessage.Type,
SessionSendMessageV3 => SessionSendMessageV3.Type,
Expand Down
17 changes: 0 additions & 17 deletions src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage.cs

This file was deleted.

32 changes: 32 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionConsumerCloseMessage2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionConsumerCloseMessage2 : Packet
{
public const byte Type = 74;

public long ConsumerId { get; set; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteLong(ConsumerId);
}

public override void Decode(ByteBuffer buffer)
{
}
}

internal readonly struct SessionConsumerCloseMessage : IOutgoingPacket
{
public PacketType PacketType => PacketType.SessionConsumerCloseMessage;
public required long ConsumerId { get; init; }
public int GetRequiredBufferSize()
{
return sizeof(long);
}

public int Encode(Span<byte> buffer)
{
return ArtemisBinaryConverter.WriteInt64(ref buffer.GetReference(), ConsumerId);
}
}
27 changes: 0 additions & 27 deletions src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage.cs

This file was deleted.

68 changes: 68 additions & 0 deletions src/ArtemisNetCoreClient/Framing/SessionCreateConsumerMessage2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class SessionCreateConsumerMessage2 : Packet
{
public const byte Type = 40;

public required long Id { get; init; }
public required string QueueName { get; init; }
public string? FilterString { get; init; }
public required int Priority { get; init; }
public required bool BrowseOnly { get; init; }
public required bool RequiresResponse { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteLong(Id);
buffer.WriteByteString(QueueName);
buffer.WriteNullableByteString(FilterString);
buffer.WriteBool(BrowseOnly);
buffer.WriteBool(RequiresResponse);
buffer.WriteInt(Priority);
}

public override void Decode(ByteBuffer buffer)
{
}
}

internal readonly struct SessionCreateConsumerMessage : IOutgoingPacket
{
public PacketType PacketType => PacketType.SessionCreateConsumerMessage;

public required long Id { get; init; }
public required string QueueName { get; init; }
public required string? FilterString { get; init; }
public required bool BrowseOnly { get; init; }
public required bool RequiresResponse { get; init; }
public required int Priority { get; init; }


public int GetRequiredBufferSize()
{
int byteCount = 0;

byteCount += sizeof(long); // Id
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(QueueName);
byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(FilterString);
byteCount += sizeof(bool); // BrowseOnly
byteCount += sizeof(bool); // RequiresResponse
byteCount += sizeof(int); // Priority

return byteCount;
}

public int Encode(Span<byte> buffer)
{
var offset = 0;

offset += ArtemisBinaryConverter.WriteInt64(ref buffer.GetOffset(offset), Id);
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), QueueName);
offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), FilterString);
offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), BrowseOnly);
offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), RequiresResponse);
offset += ArtemisBinaryConverter.WriteInt32(ref buffer.GetOffset(offset), Priority);

return offset;
}
}
70 changes: 60 additions & 10 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public async Task CreateQueueAsync(QueueConfiguration queueConfiguration, Cancel
_connection.Send(ref request, ChannelId);
await tcs.Task;
}
catch (Exception e)
catch (Exception)
{
_completionSources2.TryRemove(-1, out _);
throw;
Expand Down Expand Up @@ -243,17 +243,68 @@ public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerC
QueueName = consumerConfiguration.QueueName,
Priority = 0,
BrowseOnly = false,
RequiresResponse = true
RequiresResponse = true,
FilterString = null
};
_ = await SendBlockingAsync<SessionCreateConsumerMessage, SessionQueueQueryResponseMessageV3>(request, cancellationToken);
var consumer = new Consumer(this)
try
{
await _lock.WaitAsync(cancellationToken);
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = _completionSources2.TryAdd(-1, tcs);
_connection.Send(ref request, ChannelId);
var result = await tcs.Task;
if (result is QueueInfo)
{
var consumer = new Consumer(this)
{
ConsumerId = request.Id
};
// TODO: We should remove consumer from this dictionary on dispose
_consumers.TryAdd(request.Id, consumer);

return consumer;
}
else
{
// TODO: Handle scneario when we cannot create consumer
return null!;
}
}
catch (Exception)
{
_completionSources2.TryRemove(-1, out _);
throw;
}
finally
{
ConsumerId = request.Id
_lock.Release();
}
}

internal async Task CloseConsumer(long consumerId)
{
var request = new SessionConsumerCloseMessage
{
ConsumerId = consumerId
};

// TODO: We should remove consumer from this dictionary on dispose
_consumers.TryAdd(request.Id, consumer);
return consumer;
try
{
await _lock.WaitAsync();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = _completionSources2.TryAdd(-1, tcs);
_connection.Send(ref request, ChannelId);
await tcs.Task;
_consumers.TryRemove(consumerId, out _);
}
catch (Exception)
{
_completionSources2.TryRemove(-1, out _);
throw;
}
finally
{
_lock.Release();
}
}

public async Task<IProducer> CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken)
Expand Down Expand Up @@ -402,7 +453,6 @@ public void OnPacket(ref readonly InboundPacket packet)
}

break;

}
default:
{
Expand Down
4 changes: 3 additions & 1 deletion src/ArtemisNetCoreClient/Transport2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,7 @@ internal enum PacketType : byte
SessionBindingQueryResponseMessage = unchecked((byte) -22),
CreateQueueMessage = unchecked((byte) -12),
SessionQueueQueryMessage = 45,
SessionQueueQueryResponseMessage = unchecked((byte) -14)
SessionQueueQueryResponseMessage = unchecked((byte) -14),
SessionCreateConsumerMessage = 40,
SessionConsumerCloseMessage = 74,
}
9 changes: 5 additions & 4 deletions test/ArtemisNetCoreClient.Tests/SessionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task Should_create_address_with_selected_routing_type(RoutingType[]
[Theory]
[InlineData(RoutingType.Anycast)]
[InlineData(RoutingType.Multicast)]
public async Task should_create_queue_with_selected_routing_type(RoutingType routingType)
public async Task Should_create_queue_with_selected_routing_type(RoutingType routingType)
{
// Arrange
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
Expand Down Expand Up @@ -92,16 +92,17 @@ public async Task should_not_return_queue_info_when_queue_does_not_exist()
Assert.Null(queueInfo);
}

[Fact(Skip = "Temporarily disabled")]
[Fact]
public async Task should_create_and_dispose_consumer()
{
// Arrange
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);

await using var session = await testFixture.CreateSessionAsync();
await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync(testFixture.CancellationToken);

var addressName = Guid.NewGuid().ToString();
await session.CreateAddressAsync(addressName, new [] { RoutingType.Multicast }, testFixture.CancellationToken);
await session.CreateAddressAsync(addressName, [RoutingType.Multicast], testFixture.CancellationToken);

var queueName = Guid.NewGuid().ToString();
await session.CreateQueueAsync(new QueueConfiguration
Expand Down

0 comments on commit 0f8fc41

Please sign in to comment.