From a4af29685c00400b16cafca7c8f84566c4e56683 Mon Sep 17 00:00:00 2001 From: "vitalie.glinca" Date: Mon, 4 Dec 2023 15:25:06 +0200 Subject: [PATCH] Manage Sender and Listener connection separately in RMQ --- .../RabbitMqTransportTests.cs | 12 ++++ .../Wolverine.RabbitMQ.Tests/Samples.cs | 56 +++++++++++++++++++ .../Internal/RabbitMqTransport.cs | 6 +- .../Internal/RabbitMqTransportExpression.cs | 26 +++++++++ 4 files changed, 98 insertions(+), 2 deletions(-) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqTransportTests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqTransportTests.cs index e661ae20e..edafef576 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqTransportTests.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqTransportTests.cs @@ -66,4 +66,16 @@ public void declare_request_reply_system_queue_is_true_by_default() { theTransport.DeclareRequestReplySystemQueue.ShouldBeTrue(); } + + [Fact] + public void use_sender_connection_only_is_false_by_default() + { + theTransport.UseSenderConnectionOnly.ShouldBeFalse(); + } + + [Fact] + public void use_listener_connection_only_is_false_by_default() + { + theTransport.UseListenerConnectionOnly.ShouldBeFalse(); + } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs index 4ff46e2ea..0143952f5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs @@ -78,6 +78,62 @@ public static async Task disable_system_queue() #endregion } + + public static async Task use_listener_connection_only() + { + #region sample_disable_rabbit_mq_system_queue + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // *A* way to configure Rabbit MQ using their Uri schema + // documented here: https://www.rabbitmq.com/uri-spec.html + opts.UseRabbitMq(new Uri("amqp://localhost")) + + // Turn on listener connection only in case if you only need to listen for messages + // The sender connection won't be activated in this case + .UseListenerConnectionOnly(); + + // Set up a listener for a queue, but also + // fine-tune the queue characteristics if Wolverine + // will be governing the queue setup + opts.ListenToRabbitQueue("incoming2", q => + { + q.PurgeOnStartup = true; + q.TimeToLive(5.Minutes()); + }); + }).StartAsync(); + + #endregion + } + + public static async Task use_sender_connection_only() + { + #region sample_disable_rabbit_mq_system_queue + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // *A* way to configure Rabbit MQ using their Uri schema + // documented here: https://www.rabbitmq.com/uri-spec.html + opts.UseRabbitMq(new Uri("amqp://localhost")) + + // Turn on sender connection only in case if you only need to send messages + // The listener connection won't be created in this case + .UseSenderConnectionOnly(); + + // Set up a listener for a queue, but also + // fine-tune the queue characteristics if Wolverine + // will be governing the queue setup + opts.ListenToRabbitQueue("incoming2", q => + { + q.PurgeOnStartup = true; + q.TimeToLive(5.Minutes()); + }); + }).StartAsync(); + + #endregion + } public static async Task listen_to_queue() { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs index 82d44e2b5..4b3a52b58 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs @@ -55,6 +55,8 @@ public RabbitMqTransport() : base(ProtocolName, "Rabbit MQ") public LightweightCache Queues { get; } internal bool DeclareRequestReplySystemQueue { get; set; } = true; + internal bool UseSenderConnectionOnly { get; set; } + internal bool UseListenerConnectionOnly { get; set; } public void Dispose() { @@ -93,13 +95,13 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime) ConnectionFactory.DispatchConsumersAsync = true; - if (_listenerConnection == null) + if (_listenerConnection == null && !UseSenderConnectionOnly) { _listenerConnection = BuildConnection(); listenToEvents("Listener", _listenerConnection, logger); } - if (_sendingConnection == null) + if (_sendingConnection == null && !UseListenerConnectionOnly) { _sendingConnection = BuildConnection(); listenToEvents("Sender", _sendingConnection, logger); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs index 32c729565..4d43a9708 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs @@ -127,6 +127,32 @@ public RabbitMqTransportExpression DisableSystemRequestReplyQueueDeclaration() return this; } + /// + /// Turn on listener connection only in case if you only need to listen for messages + /// The sender connection won't be activated in this case + /// + /// + public RabbitMqTransportExpression UseListenerConnectionOnly() + { + Transport.UseListenerConnectionOnly = true; + Transport.UseSenderConnectionOnly = false; + + return this; + } + + /// + /// Turn on sender connection only in case if you only need to send messages + /// The listener connection won't be created in this case + /// + /// + public RabbitMqTransportExpression UseSenderConnectionOnly() + { + Transport.UseSenderConnectionOnly = true; + Transport.UseListenerConnectionOnly = false; + + return this; + } + public class BindingExpression { private readonly string _exchangeName;