From b0224915dee8de7dcb1ce3173473c41ae0630693 Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Tue, 7 Jul 2020 12:32:22 +0200 Subject: [PATCH 1/6] throw if name of a queue or exchange is too long --- .../Routing/ConventionalRoutingTopology.cs | 4 ++++ .../Routing/DirectRoutingTopology.cs | 8 ++++++-- .../Routing/NameValidator.cs | 15 +++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs index 7289a7059..5da5d3f49 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs @@ -76,6 +76,8 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { + NameValidator.ThrowIfNameIsTooLong(address); + channel.QueueDeclare(address, useDurableExchanges, false, false, null); CreateExchange(channel, address); channel.QueueBind(address, address, string.Empty); @@ -128,6 +130,8 @@ void MarkTypeConfigured(Type eventType) void CreateExchange(IModel channel, string exchangeName) { + NameValidator.ThrowIfNameIsTooLong(exchangeName); + try { channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges); diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs index d3300406d..3ea974167 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs @@ -10,7 +10,7 @@ /// class DirectRoutingTopology : IRoutingTopology { - public DirectRoutingTopology(Conventions conventions, bool useDurableExchanges) + public DirectRoutingTopology(DirectRoutingTopology.Conventions conventions, bool useDurableExchanges) { this.conventions = conventions; this.useDurableExchanges = useDurableExchanges; @@ -46,6 +46,8 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { + NameValidator.ThrowIfNameIsTooLong(address); + channel.QueueDeclare(address, useDurableExchanges, false, false, null); } } @@ -59,6 +61,8 @@ public void BindToDelayInfrastructure(IModel channel, string address, string del void CreateExchange(IModel channel, string exchangeName) { + NameValidator.ThrowIfNameIsTooLong(exchangeName); + if (exchangeName == AmqpTopicExchange) { return; @@ -90,7 +94,7 @@ string GetRoutingKeyForBinding(Type eventType) const string AmqpTopicExchange = "amq.topic"; - readonly Conventions conventions; + readonly DirectRoutingTopology.Conventions conventions; readonly bool useDurableExchanges; public class Conventions diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs new file mode 100644 index 000000000..353cef91e --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs @@ -0,0 +1,15 @@ +using System; + +namespace NServiceBus.Transport.RabbitMQ +{ + static class NameValidator + { + public static void ThrowIfNameIsTooLong(string name) + { + if (name.Length > 255) + { + throw new Exception($"{name} exceeds 255 characters which is maximal length for a queue or an exchange."); + } + } + } +} \ No newline at end of file From c0d14717f40e8decc9dbfe256041d5bd624a39a4 Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Tue, 7 Jul 2020 12:34:26 +0200 Subject: [PATCH 2/6] stop exception swallowing during topology setup --- .../Routing/ConventionalRoutingTopology.cs | 24 +++---------------- .../Routing/DirectRoutingTopology.cs | 11 +-------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs index 5da5d3f49..3c1beb4d9 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs @@ -43,17 +43,8 @@ public void SetupSubscription(IModel channel, Type type, string subscriberName) } public void TeardownSubscription(IModel channel, Type type, string subscriberName) - { - try - { - channel.ExchangeUnbind(subscriberName, ExchangeName(type), string.Empty, null); - } - // ReSharper disable EmptyGeneralCatchClause - catch (Exception) - // ReSharper restore EmptyGeneralCatchClause - { - // TODO: Any better way to make this idempotent? - } + { + channel.ExchangeUnbind(subscriberName, ExchangeName(type), string.Empty, null); } public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties) @@ -132,16 +123,7 @@ void CreateExchange(IModel channel, string exchangeName) { NameValidator.ThrowIfNameIsTooLong(exchangeName); - try - { - channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges); - } - // ReSharper disable EmptyGeneralCatchClause - catch (Exception) - // ReSharper restore EmptyGeneralCatchClause - { - // TODO: Any better way to make this idempotent? - } + channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges); } readonly ConcurrentDictionary typeTopologyConfiguredSet = new ConcurrentDictionary(); diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs index 3ea974167..7914843d6 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs @@ -68,16 +68,7 @@ void CreateExchange(IModel channel, string exchangeName) return; } - try - { - channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, useDurableExchanges); - } - // ReSharper disable EmptyGeneralCatchClause - catch (Exception) - // ReSharper restore EmptyGeneralCatchClause - { - - } + channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, useDurableExchanges); } string GetRoutingKeyForPublish(Type eventType) => conventions.RoutingKey(eventType); From 7a3f2e999c876f65b19e61d723016af92cd2837b Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 8 Jul 2020 12:29:35 +0200 Subject: [PATCH 3/6] Check string utf8 byte size instead of string length --- .../Routing/NameValidator.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs index 353cef91e..24f8c3ece 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs @@ -1,4 +1,5 @@ using System; +using System.Text; namespace NServiceBus.Transport.RabbitMQ { @@ -6,10 +7,10 @@ static class NameValidator { public static void ThrowIfNameIsTooLong(string name) { - if (name.Length > 255) + if (Encoding.UTF8.GetByteCount(name) > 255) { - throw new Exception($"{name} exceeds 255 characters which is maximal length for a queue or an exchange."); + throw new Exception($"{name} exceeds 255 bytes which is maximal length for a queue or an exchange."); } } } -} \ No newline at end of file +} From 55412c03f1ffd2f5468e6881d1db187aa619f690 Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Wed, 8 Jul 2020 13:27:24 +0200 Subject: [PATCH 4/6] better exception --- src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs index 24f8c3ece..ff6f6c9a4 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs @@ -9,7 +9,7 @@ public static void ThrowIfNameIsTooLong(string name) { if (Encoding.UTF8.GetByteCount(name) > 255) { - throw new Exception($"{name} exceeds 255 bytes which is maximal length for a queue or an exchange."); + throw new ArgumentOutOfRangeException(nameof(name), name, "Value exceeds the maximum allowed length of 255 bytes."); } } } From 4cf8f7ab2b7ec53231f3dda1772a1b42b6825a4f Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Wed, 8 Jul 2020 13:53:05 +0200 Subject: [PATCH 5/6] model with validation --- .../Connection/ConfirmsAwareChannel.cs | 2 +- .../Connection/ModelWithValidation.cs | 357 ++++++++++++++++++ .../Routing/ConventionalRoutingTopology.cs | 4 - .../Routing/DirectRoutingTopology.cs | 4 - .../Routing/NameValidator.cs | 16 - 5 files changed, 358 insertions(+), 25 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs delete mode 100644 src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs index c74916086..cbf2d1a92 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs @@ -12,7 +12,7 @@ sealed class ConfirmsAwareChannel : IDisposable { public ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology, bool usePublisherConfirms) { - channel = connection.CreateModel(); + channel = new ModelWithValidation(connection.CreateModel()); channel.BasicReturn += Channel_BasicReturn; this.routingTopology = routingTopology; diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs new file mode 100644 index 000000000..661d61788 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs @@ -0,0 +1,357 @@ +using System; +using System.Collections.Generic; +using System.Text; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace NServiceBus.Transport.RabbitMQ +{ + class ModelWithValidation : IModel + { + IModel model; + + public ModelWithValidation(IModel model) + { + this.model = model; + } + + public void Dispose() + { + model.Dispose(); + } + + public void Abort() + { + model.Abort(); + } + + public void Abort(ushort replyCode, string replyText) + { + model.Abort(replyCode, replyText); + } + + public void BasicAck(ulong deliveryTag, bool multiple) + { + model.BasicAck(deliveryTag, multiple); + } + + public void BasicCancel(string consumerTag) + { + model.BasicCancel(consumerTag); + } + + public void BasicCancelNoWait(string consumerTag) + { + model.BasicCancelNoWait(consumerTag); + } + + public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary arguments, + IBasicConsumer consumer) + { + return model.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer); + } + + public BasicGetResult BasicGet(string queue, bool autoAck) + { + return model.BasicGet(queue, autoAck); + } + + public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) + { + model.BasicNack(deliveryTag, multiple, requeue); + } + + public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, + ReadOnlyMemory body) + { + model.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); + } + + public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) + { + model.BasicQos(prefetchSize, prefetchCount, global); + } + + public void BasicRecover(bool requeue) + { + model.BasicRecover(requeue); + } + + public void BasicRecoverAsync(bool requeue) + { + model.BasicRecoverAsync(requeue); + } + + public void BasicReject(ulong deliveryTag, bool requeue) + { + model.BasicReject(deliveryTag, requeue); + } + + public void Close() + { + model.Close(); + } + + public void Close(ushort replyCode, string replyText) + { + model.Close(replyCode, replyText); + } + + public void ConfirmSelect() + { + model.ConfirmSelect(); + } + + public IBasicPublishBatch CreateBasicPublishBatch() + { + return model.CreateBasicPublishBatch(); + } + + public IBasicProperties CreateBasicProperties() + { + return model.CreateBasicProperties(); + } + + public void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(destination, nameof(destination)); + ThrowIfShortStringIsTooLong(source, nameof(source)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.ExchangeBind(destination, source, routingKey, arguments); + } + + public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(destination, nameof(destination)); + ThrowIfShortStringIsTooLong(source, nameof(source)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.ExchangeBindNoWait(destination, source, routingKey, arguments); + } + + public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(type, nameof(type)); + + model.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(type, nameof(type)); + + model.ExchangeDeclareNoWait(exchange, type, durable, autoDelete, arguments); + } + + public void ExchangeDeclarePassive(string exchange) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + + model.ExchangeDeclarePassive(exchange); + } + + public void ExchangeDelete(string exchange, bool ifUnused) + { + model.ExchangeDelete(exchange, ifUnused); + } + + public void ExchangeDeleteNoWait(string exchange, bool ifUnused) + { + model.ExchangeDeleteNoWait(exchange, ifUnused); + } + + public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary arguments) + { + model.ExchangeUnbind(destination, source, routingKey, arguments); + } + + public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary arguments) + { + model.ExchangeUnbindNoWait(destination, source, routingKey, arguments); + } + + public void QueueBind(string queue, string exchange, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.QueueBind(queue, exchange, routingKey, arguments); + } + + public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.QueueBindNoWait(queue, exchange, routingKey, arguments); + } + + public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + return model.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); + } + + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + model.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); + } + + public QueueDeclareOk QueueDeclarePassive(string queue) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + return model.QueueDeclarePassive(queue); + } + + public uint MessageCount(string queue) + { + return model.MessageCount(queue); + } + + public uint ConsumerCount(string queue) + { + return model.ConsumerCount(queue); + } + + public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) + { + return model.QueueDelete(queue, ifUnused, ifEmpty); + } + + public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) + { + model.QueueDeleteNoWait(queue, ifUnused, ifEmpty); + } + + public uint QueuePurge(string queue) + { + return model.QueuePurge(queue); + } + + public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments) + { + model.QueueUnbind(queue, exchange, routingKey, arguments); + } + + public void TxCommit() + { + model.TxCommit(); + } + + public void TxRollback() + { + model.TxRollback(); + } + + public void TxSelect() + { + model.TxSelect(); + } + + public bool WaitForConfirms() + { + return model.WaitForConfirms(); + } + + public bool WaitForConfirms(TimeSpan timeout) + { + return model.WaitForConfirms(timeout); + } + + public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) + { + return model.WaitForConfirms(timeout, out timedOut); + } + + public void WaitForConfirmsOrDie() + { + model.WaitForConfirmsOrDie(); + } + + public void WaitForConfirmsOrDie(TimeSpan timeout) + { + model.WaitForConfirmsOrDie(timeout); + } + + public int ChannelNumber => model.ChannelNumber; + + public ShutdownEventArgs CloseReason => model.CloseReason; + + public IBasicConsumer DefaultConsumer + { + get => model.DefaultConsumer; + set => model.DefaultConsumer = value; + } + + public bool IsClosed => model.IsClosed; + + public bool IsOpen => model.IsOpen; + + public ulong NextPublishSeqNo => model.NextPublishSeqNo; + + public TimeSpan ContinuationTimeout + { + get => model.ContinuationTimeout; + set => model.ContinuationTimeout = value; + } + + public event EventHandler BasicAcks + { + add => model.BasicAcks += value; + remove => model.BasicAcks -= value; + } + + public event EventHandler BasicNacks + { + add => model.BasicNacks += value; + remove => model.BasicNacks -= value; + } + + public event EventHandler BasicRecoverOk + { + add => model.BasicRecoverOk += value; + remove => model.BasicRecoverOk -= value; + } + + public event EventHandler BasicReturn + { + add => model.BasicReturn += value; + remove => model.BasicReturn -= value; + } + + public event EventHandler CallbackException + { + add => model.CallbackException += value; + remove => model.CallbackException -= value; + } + + public event EventHandler FlowControl + { + add => model.FlowControl += value; + remove => model.FlowControl -= value; + } + + public event EventHandler ModelShutdown + { + add => model.ModelShutdown += value; + remove => model.ModelShutdown -= value; + } + + public static void ThrowIfShortStringIsTooLong(string name, string argumentName) + { + if (Encoding.UTF8.GetByteCount(name) > 255) + { + throw new ArgumentOutOfRangeException(argumentName, name, "Value exceeds the maximum allowed length of 255 bytes."); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs index 3c1beb4d9..f894dcb78 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs @@ -67,8 +67,6 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { - NameValidator.ThrowIfNameIsTooLong(address); - channel.QueueDeclare(address, useDurableExchanges, false, false, null); CreateExchange(channel, address); channel.QueueBind(address, address, string.Empty); @@ -121,8 +119,6 @@ void MarkTypeConfigured(Type eventType) void CreateExchange(IModel channel, string exchangeName) { - NameValidator.ThrowIfNameIsTooLong(exchangeName); - channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs index 7914843d6..678726a14 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs @@ -46,8 +46,6 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { - NameValidator.ThrowIfNameIsTooLong(address); - channel.QueueDeclare(address, useDurableExchanges, false, false, null); } } @@ -61,8 +59,6 @@ public void BindToDelayInfrastructure(IModel channel, string address, string del void CreateExchange(IModel channel, string exchangeName) { - NameValidator.ThrowIfNameIsTooLong(exchangeName); - if (exchangeName == AmqpTopicExchange) { return; diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs deleted file mode 100644 index ff6f6c9a4..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Text; - -namespace NServiceBus.Transport.RabbitMQ -{ - static class NameValidator - { - public static void ThrowIfNameIsTooLong(string name) - { - if (Encoding.UTF8.GetByteCount(name) > 255) - { - throw new ArgumentOutOfRangeException(nameof(name), name, "Value exceeds the maximum allowed length of 255 bytes."); - } - } - } -} From b5d93fc9f6d69e1d97c4f7afdec29cd1334ae1da Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Wed, 8 Jul 2020 14:07:51 +0200 Subject: [PATCH 6/6] using validating model in all places --- .../Administration/QueueCreator.cs | 2 +- .../Administration/QueuePurger.cs | 2 +- .../Administration/SubscriptionManager.cs | 4 ++-- src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs index 0b42423b0..dc236f061 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs @@ -16,7 +16,7 @@ public QueueCreator(ConnectionFactory connectionFactory, IRoutingTopology routin public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) { using (var connection = connectionFactory.CreateAdministrationConnection()) - using (var channel = connection.CreateModel()) + using (var channel = new ModelWithValidation(connection.CreateModel())) { DelayInfrastructure.Build(channel); diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/QueuePurger.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/QueuePurger.cs index fe03483b8..c922f89f8 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/QueuePurger.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/QueuePurger.cs @@ -12,7 +12,7 @@ public QueuePurger(ConnectionFactory connectionFactory) public void Purge(string queue) { using (var connection = connectionFactory.CreateAdministrationConnection()) - using (var channel = connection.CreateModel()) + using (var channel = new ModelWithValidation(connection.CreateModel())) { channel.QueuePurge(queue); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs index 221528c93..c0db505b0 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs @@ -20,7 +20,7 @@ public SubscriptionManager(ConnectionFactory connectionFactory, IRoutingTopology public Task Subscribe(Type eventType, ContextBag context) { using (var connection = connectionFactory.CreateAdministrationConnection()) - using (var channel = connection.CreateModel()) + using (var channel = new ModelWithValidation(connection.CreateModel())) { routingTopology.SetupSubscription(channel, eventType, localQueue); } @@ -31,7 +31,7 @@ public Task Subscribe(Type eventType, ContextBag context) public Task Unsubscribe(Type eventType, ContextBag context) { using (var connection = connectionFactory.CreateAdministrationConnection()) - using (var channel = connection.CreateModel()) + using (var channel = new ModelWithValidation(connection.CreateModel())) { routingTopology.TeardownSubscription(channel, eventType, localQueue); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs index 7aaa9c67b..e6cb9ebbc 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs @@ -81,7 +81,7 @@ public void Start(PushRuntimeSettings limitations) connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump"); - var channel = connection.CreateModel(); + var channel = new ModelWithValidation(connection.CreateModel()); long prefetchCount;