Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Producer #48

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client;

public class ConsumerConfiguration
{
public required string QueueName { get; init; }
}
2 changes: 2 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public static void Encode(ByteBuffer buffer, Packet packet, long channelId)
SessionQueueQueryMessage => SessionQueueQueryMessage.Type,
SessionCreateConsumerMessage => SessionCreateConsumerMessage.Type,
SessionConsumerCloseMessage => SessionConsumerCloseMessage.Type,
CreateProducerMessage => CreateProducerMessage.Type,
RemoveProducerMessage => RemoveProducerMessage.Type,
_ => throw new ArgumentOutOfRangeException(nameof(packet), packet, $"{packet.GetType()} is not supported for encoding")
};
buffer.WriteByte(type);
Expand Down
19 changes: 19 additions & 0 deletions src/ArtemisNetCoreClient/Framing/CreateProducerMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class CreateProducerMessage : Packet
{
public const byte Type = unchecked((byte) -20);

public required int Id { get; init; }
public string? Address { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteInt(Id);
buffer.WriteNullableByteString(Address);
}

public override void Decode(ByteBuffer buffer)
{
}
}
17 changes: 17 additions & 0 deletions src/ArtemisNetCoreClient/Framing/RemoveProducerMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ActiveMQ.Artemis.Core.Client.Framing;

internal class RemoveProducerMessage : Packet
{
public const byte Type = unchecked((byte) -21);

public required int Id { get; init; }

public override void Encode(ByteBuffer buffer)
{
buffer.WriteInt(Id);
}

public override void Decode(ByteBuffer buffer)
{
}
}
21 changes: 21 additions & 0 deletions src/ArtemisNetCoreClient/IProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using ActiveMQ.Artemis.Core.Client.Framing;

namespace ActiveMQ.Artemis.Core.Client;

public interface IProducer : IAsyncDisposable
{
}

internal class Producer(Session session) : IProducer
{
public required int ProducerId { get; init; }

public async ValueTask DisposeAsync()
{
var request = new RemoveProducerMessage()
{
Id = ProducerId
};
await session.SendAsync(request, default);
}
}
6 changes: 1 addition & 5 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,5 @@ public interface ISession : IAsyncDisposable
Task CreateQueue(QueueConfiguration queueConfiguration, CancellationToken cancellationToken);
Task<QueueInfo?> GetQueueInfo(string queueName, CancellationToken cancellationToken);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerConfiguration, CancellationToken cancellationToken);
}

public class ConsumerConfiguration
{
public required string QueueName { get; init; }
Task<IProducer> CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken);
}
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/ProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ActiveMQ.Artemis.Core.Client;

public class ProducerConfiguration
{
public string? Address { get; init; }
}
19 changes: 19 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration consumerC
};
}

public async Task<IProducer> CreateProducerAsync(ProducerConfiguration producerConfiguration, CancellationToken cancellationToken)
{
var request = new CreateProducerMessage
{
Id = 0,
Address = producerConfiguration.Address
};
await _transport.SendAsync(request, ChannelId, cancellationToken);
return new Producer(this)
{
ProducerId = request.Id
};
}

public async ValueTask DisposeAsync()
{
_ = await SendBlockingAsync<SessionStop, NullResponse>(new SessionStop(), default);
Expand Down Expand Up @@ -163,6 +177,11 @@ internal async Task<TResponse> SendBlockingAsync<TRequest, TResponse>(TRequest r
}
}

internal async Task SendAsync<TRequest>(TRequest request, CancellationToken cancellationToken) where TRequest : Packet
{
await _transport.SendAsync(request, ChannelId, cancellationToken);
}

public long ChannelId { get; init; }

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down
30 changes: 27 additions & 3 deletions test/ArtemisNetCoreClient.Tests/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

// Assert
var addressInfo = await session.GetAddressInfo(addressName, default);
Assert.That(addressInfo.Name, Is.EqualTo(addressName));

Check warning on line 48 in test/ArtemisNetCoreClient.Tests/SessionTests.cs

View workflow job for this annotation

GitHub Actions / linux

Dereference of a possibly null reference.

Check warning on line 48 in test/ArtemisNetCoreClient.Tests/SessionTests.cs

View workflow job for this annotation

GitHub Actions / linux

Dereference of a possibly null reference.
CollectionAssert.AreEqual(routingTypes, addressInfo.RoutingTypes);
}

Expand All @@ -62,11 +62,11 @@
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, [routingType], default);

// Act
var queueName = $"{Guid.NewGuid().ToString()}";
var queueName = Guid.NewGuid().ToString();
await session.CreateQueue(new QueueConfiguration
{
Address = addressName,
Expand Down Expand Up @@ -136,7 +136,7 @@
User = "artemis",
Password = "artemis"
});
var addressName = $"{Guid.NewGuid().ToString()}";
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);

var queueName = Guid.NewGuid().ToString();
Expand All @@ -155,4 +155,28 @@

await consumer.DisposeAsync();
}

[Test]
public async Task should_create_and_dispose_producer()
{
// Arrange
var connectionFactory = new SessionFactory();
await using var session = await connectionFactory.CreateAsync(new Endpoint
{
Host = "localhost",
Port = 5445,
User = "artemis",
Password = "artemis"
});
var addressName = Guid.NewGuid().ToString();
await session.CreateAddress(addressName, new [] { RoutingType.Multicast }, default);

// Act
var producer = await session.CreateProducerAsync(new ProducerConfiguration()
{
Address = addressName
}, default);

await producer.DisposeAsync();
}
}
Loading