From b17f41ddc6f103927e22e1eaba80644a93aa4ee3 Mon Sep 17 00:00:00 2001 From: Artem Kudriashov Date: Fri, 21 Jan 2022 15:21:24 +0300 Subject: [PATCH] Add raw consumer --- .../ATI.Services.RabbitMQ.csproj | 2 +- ATI.Services.RabbitMQ/BaseRmqConsumer.cs | 71 -------- ATI.Services.RabbitMQ/BaseRmqProvider.cs | 1 - .../Consumers/BaseRmqConsumer.cs | 41 +++++ .../Consumers/RawRmqConsumer.cs | 63 +++++++ .../RmqConsumer.cs} | 49 +++-- .../{ => Producers}/BaseRmqProducer.cs | 35 ++-- .../RmqProducer.cs} | 8 +- ATI.Services.RabbitMQ/RmqConnection.cs | 167 ++++++++---------- 9 files changed, 230 insertions(+), 207 deletions(-) delete mode 100644 ATI.Services.RabbitMQ/BaseRmqConsumer.cs create mode 100644 ATI.Services.RabbitMQ/Consumers/BaseRmqConsumer.cs create mode 100644 ATI.Services.RabbitMQ/Consumers/RawRmqConsumer.cs rename ATI.Services.RabbitMQ/{InternalRmqConsumer.cs => Consumers/RmqConsumer.cs} (50%) rename ATI.Services.RabbitMQ/{ => Producers}/BaseRmqProducer.cs (91%) rename ATI.Services.RabbitMQ/{InternalRmqProducer.cs => Producers/RmqProducer.cs} (79%) diff --git a/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj b/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj index a527fdc..508cf4c 100644 --- a/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj +++ b/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj @@ -8,7 +8,7 @@ true atisu.services.rabbitmq MIT - 4.0.0-rc7 + 4.0.0-rc8 1701;1702;CS1591;CS1571;CS1573;CS1574 diff --git a/ATI.Services.RabbitMQ/BaseRmqConsumer.cs b/ATI.Services.RabbitMQ/BaseRmqConsumer.cs deleted file mode 100644 index cf29bf9..0000000 --- a/ATI.Services.RabbitMQ/BaseRmqConsumer.cs +++ /dev/null @@ -1,71 +0,0 @@ -using System; -using System.Threading.Tasks; -using ATI.Services.Common.Logging; -using NLog; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; - -namespace ATI.Services.RabbitMQ -{ - public abstract class BaseRmqConsumer : BaseRmqProvider, IRmqConsumer - { - private IModel _channel; - private AsyncEventingBasicConsumer _consumer; - private readonly ILogger _logger; - protected abstract string QueueName { get; } - protected abstract bool AutoDelete { get; } - protected virtual bool RequeueOnError => false; - protected virtual bool DurableQueue => true; - protected virtual bool AutoAck => true; - protected abstract string RoutingKey { get; } - - protected BaseRmqConsumer(ILogger logger) - { - _logger = logger; - } - - protected abstract Task OnReceivedAsync(T obj); - - public void Init(IConnection connection) - { - _channel = connection.CreateModel(); - _channel.ExchangeDeclare(exchange: ExchangeName, type: GetExchangeType(), durable: DurableExchange); - _channel.QueueDeclare(queue: QueueName, durable: DurableQueue, exclusive: false, autoDelete: AutoDelete); - _channel.QueueBind(QueueName, ExchangeName, RoutingKey); - - _consumer = new AsyncEventingBasicConsumer(_channel); - _consumer.Received += async (_, args) => await OnReceivedInternalAsync(args).ConfigureAwait(false); - - _channel.BasicConsume(queue: QueueName, autoAck: AutoAck, consumer: _consumer); - - RabbitMqDeclaredQueues.DeclaredQueues.Add(new QueueInfo { QueueName = QueueName }); - } - - private async Task OnReceivedInternalAsync(BasicDeliverEventArgs ea) - { - try - { - var body = ea.Body.ToArray(); - var obj = Serializer.Deserialize(body); - await OnReceivedAsync(obj).ConfigureAwait(false); - if (!AutoAck) - { - _channel.BasicAck(ea.DeliveryTag, false); - } - } - catch (Exception e) - { - _logger.ErrorWithObject(e, $"Error during message processing {GetType()}", ea); - if (!AutoAck) - { - _channel.BasicNack(ea.DeliveryTag, false, RequeueOnError); - } - } - } - - public void Dispose() - { - _channel?.Dispose(); - } - } -} \ No newline at end of file diff --git a/ATI.Services.RabbitMQ/BaseRmqProvider.cs b/ATI.Services.RabbitMQ/BaseRmqProvider.cs index 0a52340..4d8fb02 100644 --- a/ATI.Services.RabbitMQ/BaseRmqProvider.cs +++ b/ATI.Services.RabbitMQ/BaseRmqProvider.cs @@ -5,7 +5,6 @@ namespace ATI.Services.RabbitMQ public abstract class BaseRmqProvider { protected abstract ExchangeType ExchangeType { get; } - protected abstract ISerializer Serializer { get; } protected abstract string ExchangeName { get; } protected virtual bool DurableExchange => true; diff --git a/ATI.Services.RabbitMQ/Consumers/BaseRmqConsumer.cs b/ATI.Services.RabbitMQ/Consumers/BaseRmqConsumer.cs new file mode 100644 index 0000000..360dd4d --- /dev/null +++ b/ATI.Services.RabbitMQ/Consumers/BaseRmqConsumer.cs @@ -0,0 +1,41 @@ +using System.Threading.Tasks; +using JetBrains.Annotations; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace ATI.Services.RabbitMQ.Consumers +{ + [PublicAPI] + public abstract class BaseRmqConsumer : BaseRmqProvider, IRmqConsumer + { + protected IModel Channel; + private AsyncEventingBasicConsumer _consumer; + protected abstract string QueueName { get; } + protected abstract bool AutoDelete { get; } + protected virtual bool RequeueOnError => false; + protected virtual bool DurableQueue => true; + protected virtual bool AutoAck => true; + protected abstract string RoutingKey { get; } + + public void Init(IConnection connection) + { + Channel = connection.CreateModel(); + Channel.ExchangeDeclare(exchange: ExchangeName, type: GetExchangeType(), durable: DurableExchange); + Channel.QueueDeclare(queue: QueueName, durable: DurableQueue, exclusive: false, autoDelete: AutoDelete); + Channel.QueueBind(QueueName, ExchangeName, RoutingKey); + + _consumer = new AsyncEventingBasicConsumer(Channel); + _consumer.Received += async (_, args) => await OnReceivedInternalAsync(args).ConfigureAwait(false); + + Channel.BasicConsume(queue: QueueName, autoAck: AutoAck, consumer: _consumer); + + RabbitMqDeclaredQueues.DeclaredQueues.Add(new QueueInfo { QueueName = QueueName }); + } + + protected abstract Task OnReceivedInternalAsync(BasicDeliverEventArgs ea); + public void Dispose() + { + Channel?.Dispose(); + } + } +} \ No newline at end of file diff --git a/ATI.Services.RabbitMQ/Consumers/RawRmqConsumer.cs b/ATI.Services.RabbitMQ/Consumers/RawRmqConsumer.cs new file mode 100644 index 0000000..7f36f5b --- /dev/null +++ b/ATI.Services.RabbitMQ/Consumers/RawRmqConsumer.cs @@ -0,0 +1,63 @@ +using System; +using System.Threading.Tasks; +using ATI.Services.Common.Logging; +using JetBrains.Annotations; +using NLog; +using RabbitMQ.Client.Events; + +namespace ATI.Services.RabbitMQ.Consumers +{ + [PublicAPI] + internal sealed class RawRmqConsumer : BaseRmqConsumer + { + private readonly ILogger _logger; + private readonly Func _onReceivedAsync; + + public RawRmqConsumer( + ILogger logger, + Func onReceivedAsync, + ExchangeType exchangeType, + string exchangeName, + string routingKey, + string queueName, + bool autoDelete, + bool durableQueue) + { + _logger = logger; + _onReceivedAsync = onReceivedAsync; + ExchangeType = exchangeType; + ExchangeName = exchangeName; + QueueName = queueName; + AutoDelete = autoDelete; + RoutingKey = routingKey; + DurableQueue = durableQueue; + } + + protected override ExchangeType ExchangeType { get; } + protected override string ExchangeName { get; } + protected override string QueueName { get; } + protected override bool AutoDelete { get; } + protected override string RoutingKey { get; } + protected override bool DurableQueue { get; } + + protected override async Task OnReceivedInternalAsync(BasicDeliverEventArgs ea) + { + try + { + await _onReceivedAsync(ea.Body.ToArray()).ConfigureAwait(false); + if (!AutoAck) + { + Channel.BasicAck(ea.DeliveryTag, false); + } + } + catch (Exception e) + { + _logger.ErrorWithObject(e, $"Error during message processing {GetType()}", ea); + if (!AutoAck) + { + Channel.BasicNack(ea.DeliveryTag, false, RequeueOnError); + } + } + } + } +} \ No newline at end of file diff --git a/ATI.Services.RabbitMQ/InternalRmqConsumer.cs b/ATI.Services.RabbitMQ/Consumers/RmqConsumer.cs similarity index 50% rename from ATI.Services.RabbitMQ/InternalRmqConsumer.cs rename to ATI.Services.RabbitMQ/Consumers/RmqConsumer.cs index f93cd1f..6151a74 100644 --- a/ATI.Services.RabbitMQ/InternalRmqConsumer.cs +++ b/ATI.Services.RabbitMQ/Consumers/RmqConsumer.cs @@ -1,15 +1,26 @@ using System; using System.Threading.Tasks; +using ATI.Services.Common.Logging; using ATI.Services.Serialization; using NLog; +using RabbitMQ.Client.Events; -namespace ATI.Services.RabbitMQ +namespace ATI.Services.RabbitMQ.Consumers { - internal sealed class InternalRmqConsumer : BaseRmqConsumer + internal sealed class RmqConsumer : BaseRmqConsumer { + private readonly ILogger _logger; private readonly Func _onReceivedAsync; + + protected override ExchangeType ExchangeType { get; } + protected override string ExchangeName { get; } + protected override string QueueName { get; } + protected override bool AutoDelete { get; } + protected override string RoutingKey { get; } + protected override bool DurableQueue { get; } + private ISerializer Serializer { get; } - public InternalRmqConsumer( + public RmqConsumer( ILogger logger, Func onReceivedAsync, ExchangeType exchangeType, @@ -18,8 +29,9 @@ public InternalRmqConsumer( ISerializer serializer, string queueName, bool autoDelete, - bool durableQueue) : base(logger) + bool durableQueue) { + _logger = logger; _onReceivedAsync = onReceivedAsync; ExchangeType = exchangeType; Serializer = serializer; @@ -30,16 +42,27 @@ public InternalRmqConsumer( DurableQueue = durableQueue; } - protected override ExchangeType ExchangeType { get; } - protected override ISerializer Serializer { get; } - protected override string ExchangeName { get; } - protected override string QueueName { get; } - protected override bool AutoDelete { get; } - protected override string RoutingKey { get; } - protected override bool DurableQueue { get; } - protected override Task OnReceivedAsync(T obj) + + protected override async Task OnReceivedInternalAsync(BasicDeliverEventArgs ea) { - return _onReceivedAsync.Invoke(obj); + try + { + var body = ea.Body.ToArray(); + var obj = Serializer.Deserialize(body); + await _onReceivedAsync(obj).ConfigureAwait(false); + if (!AutoAck) + { + Channel.BasicAck(ea.DeliveryTag, false); + } + } + catch (Exception e) + { + _logger.ErrorWithObject(e, $"Error during message processing {GetType()}", ea); + if (!AutoAck) + { + Channel.BasicNack(ea.DeliveryTag, false, RequeueOnError); + } + } } } } \ No newline at end of file diff --git a/ATI.Services.RabbitMQ/BaseRmqProducer.cs b/ATI.Services.RabbitMQ/Producers/BaseRmqProducer.cs similarity index 91% rename from ATI.Services.RabbitMQ/BaseRmqProducer.cs rename to ATI.Services.RabbitMQ/Producers/BaseRmqProducer.cs index 482b19b..b6daefc 100644 --- a/ATI.Services.RabbitMQ/BaseRmqProducer.cs +++ b/ATI.Services.RabbitMQ/Producers/BaseRmqProducer.cs @@ -5,11 +5,12 @@ using ATI.Services.Common.Extensions; using ATI.Services.Common.Logging; using ATI.Services.Common.ServiceVariables; +using ATI.Services.Serialization; using JetBrains.Annotations; using NLog; using RabbitMQ.Client; -namespace ATI.Services.RabbitMQ +namespace ATI.Services.RabbitMQ.Producers { [PublicAPI] public abstract class BaseRmqProducer : BaseRmqProvider, IRmqProducer @@ -23,8 +24,10 @@ private readonly private readonly ILogger _logger; protected abstract string DefaultRoutingKey { get; } private bool _initialized; + private ISerializer Serializer { get; } private readonly object _lock = new object(); + protected BaseRmqProducer(ILogger logger) { _logger = logger; @@ -33,30 +36,14 @@ protected BaseRmqProducer(ILogger logger) taskCompletionSource)>(); } - internal void EnsureInitialized(IConnection connection, TimeSpan timeout) + public void Init(IConnection connection, TimeSpan timeout) { if (_initialized) - { return; - } - - lock (_lock) - { - if (_initialized) - { - return; - } - - _logger.Warn($"Не зарегистрирован заранее продьюсер с ExchangeName: {ExchangeName}"); - Init(connection, timeout); - } - } - public void Init(IConnection connection, TimeSpan timeout) - { _defaultTimeout = timeout; _channel = connection.CreateModel(); - _channel.ExchangeDeclare(exchange: ExchangeName, type: GetExchangeType(), durable: DurableExchange); + _channel.ExchangeDeclare(ExchangeName, GetExchangeType(), DurableExchange); _channel.ConfirmSelect(); _channel.BasicNacks += (_, _) => _logger.Error( @@ -98,11 +85,6 @@ public Task PublishBytesAsync(byte[] body, string routingKey = null, TimeS public async Task PublishBytesAsync(byte[] body, CancellationToken token, string routingKey = null, TimeSpan timeout = default, TimeSpan expiration = default) { - if (timeout == default) - { - timeout = _defaultTimeout; - } - using var cts = new CancellationTokenSource(); var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -110,6 +92,11 @@ public async Task PublishBytesAsync(byte[] body, CancellationToken token, ? CancellationTokenSource.CreateLinkedTokenSource(cts.Token, token).Token : cts.Token; + if (timeout == default) + { + timeout = _defaultTimeout; + } + var timeoutTask = Task.Delay(timeout, cancellationToken); AddMessageToQueue(body, routingKey, expiration, cancellationToken, taskCompletionSource); diff --git a/ATI.Services.RabbitMQ/InternalRmqProducer.cs b/ATI.Services.RabbitMQ/Producers/RmqProducer.cs similarity index 79% rename from ATI.Services.RabbitMQ/InternalRmqProducer.cs rename to ATI.Services.RabbitMQ/Producers/RmqProducer.cs index 4acbadb..cca404b 100644 --- a/ATI.Services.RabbitMQ/InternalRmqProducer.cs +++ b/ATI.Services.RabbitMQ/Producers/RmqProducer.cs @@ -1,11 +1,11 @@ using ATI.Services.Serialization; using NLog; -namespace ATI.Services.RabbitMQ +namespace ATI.Services.RabbitMQ.Producers { - internal sealed class InternalRmqProducer : BaseRmqProducer + internal sealed class RmqProducer : BaseRmqProducer { - public InternalRmqProducer( + public RmqProducer( ILogger logger, ExchangeType exchangeType, ISerializer serializer, @@ -21,7 +21,7 @@ public InternalRmqProducer( } protected override ExchangeType ExchangeType { get; } - protected override ISerializer Serializer { get; } + private ISerializer Serializer { get; } protected override string ExchangeName { get; } protected override string DefaultRoutingKey { get; } protected override bool DurableExchange { get; } diff --git a/ATI.Services.RabbitMQ/RmqConnection.cs b/ATI.Services.RabbitMQ/RmqConnection.cs index 270569e..7d6a35b 100644 --- a/ATI.Services.RabbitMQ/RmqConnection.cs +++ b/ATI.Services.RabbitMQ/RmqConnection.cs @@ -8,6 +8,8 @@ using ATI.Services.Common.Initializers; using ATI.Services.Common.Initializers.Interfaces; using ATI.Services.Common.Logging; +using ATI.Services.RabbitMQ.Consumers; +using ATI.Services.RabbitMQ.Producers; using ATI.Services.Serialization; using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; @@ -27,13 +29,12 @@ public class RmqConnection : IDisposable, IInitializer private IConnection _connection; private readonly IServiceProvider _serviceProvider; - private readonly ConcurrentDictionary _customRmqProducers = - new ConcurrentDictionary(); + private readonly ConcurrentDictionary _customRmqProducers = + new ConcurrentDictionary(); private readonly ConcurrentBag _customRmqConsumers = new ConcurrentBag(); - private readonly object _initializationLock = new(); - private bool _initialized; + private readonly object _initializationLock = new object(); public RmqConnection(IOptions config, IServiceProvider serviceProvider) @@ -42,38 +43,21 @@ public RmqConnection(IOptions config, _config = config.Value; } - public void Init() - { - if (_initialized) - { - return; - } - - lock (_initializationLock) - { - if (_initialized) - { - return; - } - - var producers = _serviceProvider.GetServices(); - var consumers = _serviceProvider.GetServices(); - - Init(producers.Concat(_customRmqProducers.Values), consumers.Concat(_customRmqConsumers)); - _initialized = true; - } - } - public void RegisterProducer( string exchangeName, string defaultRoutingKey, ISerializer serializer, bool durable = true, - ExchangeType exchangeType = ExchangeType.Topic) + ExchangeType exchangeType = ExchangeType.Topic, + TimeSpan timeout = default) { - var producer = new InternalRmqProducer(_logger, exchangeType, serializer, exchangeName, defaultRoutingKey, - durable); + var producer = new RmqProducer(_logger, exchangeType, serializer, exchangeName, defaultRoutingKey, durable); _customRmqProducers.GetOrAdd(exchangeName, producer); + + lock (_initializationLock) + { + producer.Init(_connection, timeout == default ? _config.PublishMessageTimeout : timeout); + } } public Task PublishBytesAsync( @@ -91,17 +75,8 @@ public Task PublishBytesAsync( timeout = _config.PublishMessageTimeout; } - // Проверяем, что создан - var producer = _customRmqProducers.GetOrAdd(exchangeName, _ => - { - var serializer = NewtonsoftJsonSerializer.SnakeCase; - return new InternalRmqProducer(_logger, exchangeType, serializer, exchangeName, routingKey, durable); - }); - - // Проверяем, что заиничен - producer.EnsureInitialized(_connection, timeout); + var producer = _customRmqProducers[exchangeName]; - // отправляем return producer.PublishBytesAsync(publishBody, cancellationToken, routingKey, timeout, expiration); } @@ -121,17 +96,8 @@ public Task PublishAsync( timeout = _config.PublishMessageTimeout; } - // Проверяем, что создан - var producer = _customRmqProducers.GetOrAdd(exchangeName, _ => - { - serializer ??= NewtonsoftJsonSerializer.SnakeCase; - return new InternalRmqProducer(_logger, exchangeType, serializer, exchangeName, routingKey, durable); - }); - - // Проверяем, что заиничен - producer.EnsureInitialized(_connection, timeout); + var producer = _customRmqProducers[exchangeName]; - // отправляем return producer.PublishAsync(publishBody, cancellationToken, routingKey, timeout, expiration); } @@ -146,63 +112,35 @@ public void Subscribe( ISerializer serializer = default) { serializer ??= NewtonsoftJsonSerializer.SnakeCase; - var consumer = new InternalRmqConsumer( + var consumer = new RmqConsumer( _logger, onReceivedAsync, exchangeType, exchangeName, routingKey, serializer, queueName, autoDelete, durable); lock (_initializationLock) { _customRmqConsumers.Add(consumer); - if (_initialized) - { - consumer.Init(_connection); - } + consumer.Init(_connection); } } - private void Init(IEnumerable producers, IEnumerable consumers) + public void SubscribeRaw( + string exchangeName, + string queueName, + string routingKey, + Func onReceivedAsync, + bool autoDelete, + bool durable = true, + ExchangeType exchangeType = ExchangeType.Topic) { - var factory = new ConnectionFactory - { - AutomaticRecoveryEnabled = true, - DispatchConsumersAsync = true - }; - - var connectionUri = new Uri(_config.ConnectionString); - FillUserInfo(connectionUri, factory); - - var amqpTcpEndpoints = GetAmqpTcpEndpoints(connectionUri); - _connection = factory.CreateConnection(amqpTcpEndpoints); - - foreach (var producer in producers) - { - producer.Init(_connection, _config.PublishMessageTimeout); - } + var consumer = new RawRmqConsumer( + _logger, onReceivedAsync, exchangeType, exchangeName, routingKey, queueName, autoDelete, + durable); - foreach (var consumer in consumers) + lock (_initializationLock) { + _customRmqConsumers.Add(consumer); consumer.Init(_connection); } - - _connection.ConnectionShutdown += (_, args) => - { - _logger.Error($"Rmq connection shutdown. {args.ReplyText}"); - }; - - _connection.CallbackException += (_, args) => - { - _logger.Error(args.Exception, "Rmq callback exception."); - }; - - _connection.ConnectionBlocked += (_, args) => - { - _logger.Error($"Rmq connection blocked. {args.Reason}"); - }; - - _connection.ConnectionUnblocked += (obj, args) => - { - _logger.WarnWithObject("Rmq connection unblocked", obj, args); - }; } private static List GetAmqpTcpEndpoints(Uri connectionUri) @@ -255,7 +193,50 @@ public void Dispose() public Task InitializeAsync() { - Init(); + var producers = _serviceProvider.GetServices(); + var consumers = _serviceProvider.GetServices(); + + var factory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + DispatchConsumersAsync = true + }; + + var connectionUri = new Uri(_config.ConnectionString); + FillUserInfo(connectionUri, factory); + + var amqpTcpEndpoints = GetAmqpTcpEndpoints(connectionUri); + var connection = factory.CreateConnection(amqpTcpEndpoints); + + foreach (var producer in producers) + { + producer.Init(connection, _config.PublishMessageTimeout); + } + + foreach (var consumer in consumers) + { + consumer.Init(connection); + } + + connection.ConnectionShutdown += (_, args) => + { + _logger.Error($"Rmq connection shutdown. {args.ReplyText}"); + }; + + connection.CallbackException += (_, args) => { _logger.Error(args.Exception, "Rmq callback exception."); }; + + connection.ConnectionBlocked += (_, args) => { _logger.Error($"Rmq connection blocked. {args.Reason}"); }; + + connection.ConnectionUnblocked += (obj, args) => + { + _logger.WarnWithObject("Rmq connection unblocked", obj, args); + }; + + lock (_initializationLock) + { + _connection = connection; + } + return Task.CompletedTask; } }