Skip to content

Commit

Permalink
Add raw consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Kudriashov committed Jan 21, 2022
1 parent 43ee4a8 commit b17f41d
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 207 deletions.
2 changes: 1 addition & 1 deletion ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageId>atisu.services.rabbitmq</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageVersion>4.0.0-rc7</PackageVersion>
<PackageVersion>4.0.0-rc8</PackageVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;CS1571;CS1573;CS1574</NoWarn>
Expand Down
71 changes: 0 additions & 71 deletions ATI.Services.RabbitMQ/BaseRmqConsumer.cs

This file was deleted.

1 change: 0 additions & 1 deletion ATI.Services.RabbitMQ/BaseRmqProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace ATI.Services.RabbitMQ
public abstract class BaseRmqProvider
{
protected abstract ExchangeType ExchangeType { get; }
protected abstract ISerializer Serializer { get; }
protected abstract string ExchangeName { get; }
protected virtual bool DurableExchange => true;

Expand Down
41 changes: 41 additions & 0 deletions ATI.Services.RabbitMQ/Consumers/BaseRmqConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Threading.Tasks;
using JetBrains.Annotations;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace ATI.Services.RabbitMQ.Consumers
{
[PublicAPI]
public abstract class BaseRmqConsumer : BaseRmqProvider, IRmqConsumer
{
protected IModel Channel;
private AsyncEventingBasicConsumer _consumer;
protected abstract string QueueName { get; }
protected abstract bool AutoDelete { get; }
protected virtual bool RequeueOnError => false;
protected virtual bool DurableQueue => true;
protected virtual bool AutoAck => true;
protected abstract string RoutingKey { get; }

public void Init(IConnection connection)
{
Channel = connection.CreateModel();
Channel.ExchangeDeclare(exchange: ExchangeName, type: GetExchangeType(), durable: DurableExchange);
Channel.QueueDeclare(queue: QueueName, durable: DurableQueue, exclusive: false, autoDelete: AutoDelete);
Channel.QueueBind(QueueName, ExchangeName, RoutingKey);

_consumer = new AsyncEventingBasicConsumer(Channel);
_consumer.Received += async (_, args) => await OnReceivedInternalAsync(args).ConfigureAwait(false);

Channel.BasicConsume(queue: QueueName, autoAck: AutoAck, consumer: _consumer);

RabbitMqDeclaredQueues.DeclaredQueues.Add(new QueueInfo { QueueName = QueueName });
}

protected abstract Task OnReceivedInternalAsync(BasicDeliverEventArgs ea);
public void Dispose()
{
Channel?.Dispose();
}
}
}
63 changes: 63 additions & 0 deletions ATI.Services.RabbitMQ/Consumers/RawRmqConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Threading.Tasks;
using ATI.Services.Common.Logging;
using JetBrains.Annotations;
using NLog;
using RabbitMQ.Client.Events;

namespace ATI.Services.RabbitMQ.Consumers
{
[PublicAPI]
internal sealed class RawRmqConsumer : BaseRmqConsumer
{
private readonly ILogger _logger;
private readonly Func<byte[], Task> _onReceivedAsync;

public RawRmqConsumer(
ILogger logger,
Func<byte[], Task> onReceivedAsync,
ExchangeType exchangeType,
string exchangeName,
string routingKey,
string queueName,
bool autoDelete,
bool durableQueue)
{
_logger = logger;
_onReceivedAsync = onReceivedAsync;
ExchangeType = exchangeType;
ExchangeName = exchangeName;
QueueName = queueName;
AutoDelete = autoDelete;
RoutingKey = routingKey;
DurableQueue = durableQueue;
}

protected override ExchangeType ExchangeType { get; }
protected override string ExchangeName { get; }
protected override string QueueName { get; }
protected override bool AutoDelete { get; }
protected override string RoutingKey { get; }
protected override bool DurableQueue { get; }

protected override async Task OnReceivedInternalAsync(BasicDeliverEventArgs ea)
{
try
{
await _onReceivedAsync(ea.Body.ToArray()).ConfigureAwait(false);
if (!AutoAck)
{
Channel.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception e)
{
_logger.ErrorWithObject(e, $"Error during message processing {GetType()}", ea);
if (!AutoAck)
{
Channel.BasicNack(ea.DeliveryTag, false, RequeueOnError);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
using System;
using System.Threading.Tasks;
using ATI.Services.Common.Logging;
using ATI.Services.Serialization;
using NLog;
using RabbitMQ.Client.Events;

namespace ATI.Services.RabbitMQ
namespace ATI.Services.RabbitMQ.Consumers
{
internal sealed class InternalRmqConsumer<T> : BaseRmqConsumer<T>
internal sealed class RmqConsumer<T> : BaseRmqConsumer
{
private readonly ILogger _logger;
private readonly Func<T, Task> _onReceivedAsync;

protected override ExchangeType ExchangeType { get; }
protected override string ExchangeName { get; }
protected override string QueueName { get; }
protected override bool AutoDelete { get; }
protected override string RoutingKey { get; }
protected override bool DurableQueue { get; }
private ISerializer Serializer { get; }

public InternalRmqConsumer(
public RmqConsumer(
ILogger logger,
Func<T, Task> onReceivedAsync,
ExchangeType exchangeType,
Expand All @@ -18,8 +29,9 @@ public InternalRmqConsumer(
ISerializer serializer,
string queueName,
bool autoDelete,
bool durableQueue) : base(logger)
bool durableQueue)
{
_logger = logger;
_onReceivedAsync = onReceivedAsync;
ExchangeType = exchangeType;
Serializer = serializer;
Expand All @@ -30,16 +42,27 @@ public InternalRmqConsumer(
DurableQueue = durableQueue;
}

protected override ExchangeType ExchangeType { get; }
protected override ISerializer Serializer { get; }
protected override string ExchangeName { get; }
protected override string QueueName { get; }
protected override bool AutoDelete { get; }
protected override string RoutingKey { get; }
protected override bool DurableQueue { get; }
protected override Task OnReceivedAsync(T obj)

protected override async Task OnReceivedInternalAsync(BasicDeliverEventArgs ea)
{
return _onReceivedAsync.Invoke(obj);
try
{
var body = ea.Body.ToArray();
var obj = Serializer.Deserialize<T>(body);
await _onReceivedAsync(obj).ConfigureAwait(false);
if (!AutoAck)
{
Channel.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception e)
{
_logger.ErrorWithObject(e, $"Error during message processing {GetType()}", ea);
if (!AutoAck)
{
Channel.BasicNack(ea.DeliveryTag, false, RequeueOnError);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
using ATI.Services.Common.Extensions;
using ATI.Services.Common.Logging;
using ATI.Services.Common.ServiceVariables;
using ATI.Services.Serialization;
using JetBrains.Annotations;
using NLog;
using RabbitMQ.Client;

namespace ATI.Services.RabbitMQ
namespace ATI.Services.RabbitMQ.Producers
{
[PublicAPI]
public abstract class BaseRmqProducer : BaseRmqProvider, IRmqProducer
Expand All @@ -23,8 +24,10 @@ private readonly
private readonly ILogger _logger;
protected abstract string DefaultRoutingKey { get; }
private bool _initialized;
private ISerializer Serializer { get; }
private readonly object _lock = new object();


protected BaseRmqProducer(ILogger logger)
{
_logger = logger;
Expand All @@ -33,30 +36,14 @@ protected BaseRmqProducer(ILogger logger)
taskCompletionSource)>();
}

internal void EnsureInitialized(IConnection connection, TimeSpan timeout)
public void Init(IConnection connection, TimeSpan timeout)
{
if (_initialized)
{
return;
}

lock (_lock)
{
if (_initialized)
{
return;
}

_logger.Warn($"Не зарегистрирован заранее продьюсер с ExchangeName: {ExchangeName}");
Init(connection, timeout);
}
}

public void Init(IConnection connection, TimeSpan timeout)
{
_defaultTimeout = timeout;
_channel = connection.CreateModel();
_channel.ExchangeDeclare(exchange: ExchangeName, type: GetExchangeType(), durable: DurableExchange);
_channel.ExchangeDeclare(ExchangeName, GetExchangeType(), DurableExchange);
_channel.ConfirmSelect();
_channel.BasicNacks += (_, _) =>
_logger.Error(
Expand Down Expand Up @@ -98,18 +85,18 @@ public Task<bool> PublishBytesAsync(byte[] body, string routingKey = null, TimeS
public async Task<bool> PublishBytesAsync(byte[] body, CancellationToken token, string routingKey = null,
TimeSpan timeout = default, TimeSpan expiration = default)
{
if (timeout == default)
{
timeout = _defaultTimeout;
}

using var cts = new CancellationTokenSource();
var taskCompletionSource =
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cancellationToken = token.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(cts.Token, token).Token
: cts.Token;

if (timeout == default)
{
timeout = _defaultTimeout;
}

var timeoutTask = Task.Delay(timeout, cancellationToken);

AddMessageToQueue(body, routingKey, expiration, cancellationToken, taskCompletionSource);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using ATI.Services.Serialization;
using NLog;

namespace ATI.Services.RabbitMQ
namespace ATI.Services.RabbitMQ.Producers
{
internal sealed class InternalRmqProducer : BaseRmqProducer
internal sealed class RmqProducer : BaseRmqProducer
{
public InternalRmqProducer(
public RmqProducer(
ILogger logger,
ExchangeType exchangeType,
ISerializer serializer,
Expand All @@ -21,7 +21,7 @@ public InternalRmqProducer(
}

protected override ExchangeType ExchangeType { get; }
protected override ISerializer Serializer { get; }
private ISerializer Serializer { get; }
protected override string ExchangeName { get; }
protected override string DefaultRoutingKey { get; }
protected override bool DurableExchange { get; }
Expand Down
Loading

0 comments on commit b17f41d

Please sign in to comment.