Skip to content

Commit

Permalink
Create Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Mar 29, 2024
1 parent 1b99c25 commit c574769
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client;

public class ConsumerConfiguration
{
public required string QueueName { get; init; }
}
2 changes: 2 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionQueueQueryMessage => SessionQueueQueryMessage.Type,
SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type,
SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type,
CreateProducerMessage => CreateProducerMessage.Type,
RemoveProducerMessage => RemoveProducerMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
19 changes: 19 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateProducerMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateProducerMessage : Packet
{
public const byte Type = unchecked((byte) -20);

public required int Id { get; init; }
public string? Address { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteInt(Id);
buffer.WriteNullableByteString(Address);
}

public override void Decode(ByteBuffer buffer)
{
}
}
17 changes: 17 additions & 0 deletions src/ArtemisNetCoreClient/Framing/RemoveProducerMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class RemoveProducerMessage : Packet
{
public const byte Type = unchecked((byte) -21);

public required int Id { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteInt(Id);
}

public override void Decode(ByteBuffer buffer)
{
}
}
21 changes: 21 additions & 0 deletions src/ArtemisNetCoreClient/IProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface IProducer : IAsyncDisposable
{
}

internal class Producer(Session session) : IProducer
{
public required int ProducerId { get; init; }

public async ValueTask DisposeAsync()
{
var request = new RemoveProducerMessage()
{
Id = ProducerId
};
await session.SendAsync(request, default);
}
}
6 changes: 1 addition & 5 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,5 @@ public interface ISession : IAsyncDisposable
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
Task<QueueInfo?> GetQueueInfo(string queueName, CancellationToken cancellationToken);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken);
}

public class ConsumerConfiguration
{
public required string QueueName { get; init; }
Task<IProducer> CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken);
}
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/ProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client;

public class ProducerConfiguration
{
public string? Address { get; init; }
}
19 changes: 19 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerC
};
}

public async Task<IProducer> CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken)
{
var request = new CreateProducerMessage
{
Id = 0,
Address = producerConfiguration.Address
};
await _transport.SendAsync(request, ChannelId, cancellationToken);
return new Producer(this)
{
ProducerId = request.Id
};
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
Expand Down Expand Up @@ -163,6 +177,11 @@ internal async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest r
}
}

internal async Task SendAsync<TRequest>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
await _transport.SendAsync(request, ChannelId, cancellationToken);
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down
30 changes: 27 additions & 3 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public async Task should_create_queue_with_selected_routing_type(RoutingType rou
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, [routingType], default);

// Act
var queueName = $"{Guid.NewGuid().ToString()}";
var queueName = Guid.NewGuid().ToString();
await session.CreateQueue(new QueueConfiguration
{
Address = addressName,
Expand Down Expand Up @@ -136,7 +136,7 @@ public async Task should_create_and_dispose_consumer()
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);

var queueName = Guid.NewGuid().ToString();
Expand All @@ -155,4 +155,28 @@ await session.CreateQueue(new QueueConfiguration

await consumer.DisposeAsync();
}

[Test]
public async Task should_create_and_dispose_producer()
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);

// Act
var producer = await session.CreateProducerAsync(new ProducerConfiguration()
{
Address = addressName
}, default);

await producer.DisposeAsync();
}
}

0 comments on commit c574769

Please sign in to comment.