From 4cf8f7ab2b7ec53231f3dda1772a1b42b6825a4f Mon Sep 17 00:00:00 2001 From: Tomek Masternak Date: Wed, 8 Jul 2020 13:53:05 +0200 Subject: [PATCH] model with validation --- .../Connection/ConfirmsAwareChannel.cs | 2 +- .../Connection/ModelWithValidation.cs | 357 ++++++++++++++++++ .../Routing/ConventionalRoutingTopology.cs | 4 - .../Routing/DirectRoutingTopology.cs | 4 - .../Routing/NameValidator.cs | 16 - 5 files changed, 358 insertions(+), 25 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs delete mode 100644 src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs index c74916086..cbf2d1a92 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs @@ -12,7 +12,7 @@ sealed class ConfirmsAwareChannel : IDisposable { public ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology, bool usePublisherConfirms) { - channel = connection.CreateModel(); + channel = new ModelWithValidation(connection.CreateModel()); channel.BasicReturn += Channel_BasicReturn; this.routingTopology = routingTopology; diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs new file mode 100644 index 000000000..661d61788 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ModelWithValidation.cs @@ -0,0 +1,357 @@ +using System; +using System.Collections.Generic; +using System.Text; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace NServiceBus.Transport.RabbitMQ +{ + class ModelWithValidation : IModel + { + IModel model; + + public ModelWithValidation(IModel model) + { + this.model = model; + } + + public void Dispose() + { + model.Dispose(); + } + + public void Abort() + { + model.Abort(); + } + + public void Abort(ushort replyCode, string replyText) + { + model.Abort(replyCode, replyText); + } + + public void BasicAck(ulong deliveryTag, bool multiple) + { + model.BasicAck(deliveryTag, multiple); + } + + public void BasicCancel(string consumerTag) + { + model.BasicCancel(consumerTag); + } + + public void BasicCancelNoWait(string consumerTag) + { + model.BasicCancelNoWait(consumerTag); + } + + public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary arguments, + IBasicConsumer consumer) + { + return model.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer); + } + + public BasicGetResult BasicGet(string queue, bool autoAck) + { + return model.BasicGet(queue, autoAck); + } + + public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) + { + model.BasicNack(deliveryTag, multiple, requeue); + } + + public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, + ReadOnlyMemory body) + { + model.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); + } + + public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) + { + model.BasicQos(prefetchSize, prefetchCount, global); + } + + public void BasicRecover(bool requeue) + { + model.BasicRecover(requeue); + } + + public void BasicRecoverAsync(bool requeue) + { + model.BasicRecoverAsync(requeue); + } + + public void BasicReject(ulong deliveryTag, bool requeue) + { + model.BasicReject(deliveryTag, requeue); + } + + public void Close() + { + model.Close(); + } + + public void Close(ushort replyCode, string replyText) + { + model.Close(replyCode, replyText); + } + + public void ConfirmSelect() + { + model.ConfirmSelect(); + } + + public IBasicPublishBatch CreateBasicPublishBatch() + { + return model.CreateBasicPublishBatch(); + } + + public IBasicProperties CreateBasicProperties() + { + return model.CreateBasicProperties(); + } + + public void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(destination, nameof(destination)); + ThrowIfShortStringIsTooLong(source, nameof(source)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.ExchangeBind(destination, source, routingKey, arguments); + } + + public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(destination, nameof(destination)); + ThrowIfShortStringIsTooLong(source, nameof(source)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.ExchangeBindNoWait(destination, source, routingKey, arguments); + } + + public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(type, nameof(type)); + + model.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(type, nameof(type)); + + model.ExchangeDeclareNoWait(exchange, type, durable, autoDelete, arguments); + } + + public void ExchangeDeclarePassive(string exchange) + { + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + + model.ExchangeDeclarePassive(exchange); + } + + public void ExchangeDelete(string exchange, bool ifUnused) + { + model.ExchangeDelete(exchange, ifUnused); + } + + public void ExchangeDeleteNoWait(string exchange, bool ifUnused) + { + model.ExchangeDeleteNoWait(exchange, ifUnused); + } + + public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary arguments) + { + model.ExchangeUnbind(destination, source, routingKey, arguments); + } + + public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary arguments) + { + model.ExchangeUnbindNoWait(destination, source, routingKey, arguments); + } + + public void QueueBind(string queue, string exchange, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.QueueBind(queue, exchange, routingKey, arguments); + } + + public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + ThrowIfShortStringIsTooLong(exchange, nameof(exchange)); + ThrowIfShortStringIsTooLong(routingKey, nameof(routingKey)); + + model.QueueBindNoWait(queue, exchange, routingKey, arguments); + } + + public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + return model.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); + } + + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + model.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); + } + + public QueueDeclareOk QueueDeclarePassive(string queue) + { + ThrowIfShortStringIsTooLong(queue, nameof(queue)); + + return model.QueueDeclarePassive(queue); + } + + public uint MessageCount(string queue) + { + return model.MessageCount(queue); + } + + public uint ConsumerCount(string queue) + { + return model.ConsumerCount(queue); + } + + public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) + { + return model.QueueDelete(queue, ifUnused, ifEmpty); + } + + public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) + { + model.QueueDeleteNoWait(queue, ifUnused, ifEmpty); + } + + public uint QueuePurge(string queue) + { + return model.QueuePurge(queue); + } + + public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments) + { + model.QueueUnbind(queue, exchange, routingKey, arguments); + } + + public void TxCommit() + { + model.TxCommit(); + } + + public void TxRollback() + { + model.TxRollback(); + } + + public void TxSelect() + { + model.TxSelect(); + } + + public bool WaitForConfirms() + { + return model.WaitForConfirms(); + } + + public bool WaitForConfirms(TimeSpan timeout) + { + return model.WaitForConfirms(timeout); + } + + public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) + { + return model.WaitForConfirms(timeout, out timedOut); + } + + public void WaitForConfirmsOrDie() + { + model.WaitForConfirmsOrDie(); + } + + public void WaitForConfirmsOrDie(TimeSpan timeout) + { + model.WaitForConfirmsOrDie(timeout); + } + + public int ChannelNumber => model.ChannelNumber; + + public ShutdownEventArgs CloseReason => model.CloseReason; + + public IBasicConsumer DefaultConsumer + { + get => model.DefaultConsumer; + set => model.DefaultConsumer = value; + } + + public bool IsClosed => model.IsClosed; + + public bool IsOpen => model.IsOpen; + + public ulong NextPublishSeqNo => model.NextPublishSeqNo; + + public TimeSpan ContinuationTimeout + { + get => model.ContinuationTimeout; + set => model.ContinuationTimeout = value; + } + + public event EventHandler BasicAcks + { + add => model.BasicAcks += value; + remove => model.BasicAcks -= value; + } + + public event EventHandler BasicNacks + { + add => model.BasicNacks += value; + remove => model.BasicNacks -= value; + } + + public event EventHandler BasicRecoverOk + { + add => model.BasicRecoverOk += value; + remove => model.BasicRecoverOk -= value; + } + + public event EventHandler BasicReturn + { + add => model.BasicReturn += value; + remove => model.BasicReturn -= value; + } + + public event EventHandler CallbackException + { + add => model.CallbackException += value; + remove => model.CallbackException -= value; + } + + public event EventHandler FlowControl + { + add => model.FlowControl += value; + remove => model.FlowControl -= value; + } + + public event EventHandler ModelShutdown + { + add => model.ModelShutdown += value; + remove => model.ModelShutdown -= value; + } + + public static void ThrowIfShortStringIsTooLong(string name, string argumentName) + { + if (Encoding.UTF8.GetByteCount(name) > 255) + { + throw new ArgumentOutOfRangeException(argumentName, name, "Value exceeds the maximum allowed length of 255 bytes."); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs index 3c1beb4d9..f894dcb78 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs @@ -67,8 +67,6 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { - NameValidator.ThrowIfNameIsTooLong(address); - channel.QueueDeclare(address, useDurableExchanges, false, false, null); CreateExchange(channel, address); channel.QueueBind(address, address, string.Empty); @@ -121,8 +119,6 @@ void MarkTypeConfigured(Type eventType) void CreateExchange(IModel channel, string exchangeName) { - NameValidator.ThrowIfNameIsTooLong(exchangeName); - channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, useDurableExchanges); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs index 7914843d6..678726a14 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs @@ -46,8 +46,6 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I { foreach (var address in receivingAddresses.Concat(sendingAddresses)) { - NameValidator.ThrowIfNameIsTooLong(address); - channel.QueueDeclare(address, useDurableExchanges, false, false, null); } } @@ -61,8 +59,6 @@ public void BindToDelayInfrastructure(IModel channel, string address, string del void CreateExchange(IModel channel, string exchangeName) { - NameValidator.ThrowIfNameIsTooLong(exchangeName); - if (exchangeName == AmqpTopicExchange) { return; diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs deleted file mode 100644 index ff6f6c9a4..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/NameValidator.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Text; - -namespace NServiceBus.Transport.RabbitMQ -{ - static class NameValidator - { - public static void ThrowIfNameIsTooLong(string name) - { - if (Encoding.UTF8.GetByteCount(name) > 255) - { - throw new ArgumentOutOfRangeException(nameof(name), name, "Value exceeds the maximum allowed length of 255 bytes."); - } - } - } -}