Skip to content

Commit

Permalink
Merge pull request #21 from atidev/delayed-requeue
Browse files Browse the repository at this point in the history
Add delayed re-queue mechanism
  • Loading branch information
alan-kh authored Mar 19, 2024
2 parents 1039ce6 + 05f7840 commit 030fb63
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 18 deletions.
8 changes: 8 additions & 0 deletions ATI.Services.RabbitMQ/Acknowledgements.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ATI.Services.RabbitMQ;

public enum Acknowledgements
{
Ack,
Nack,
Reject
}
3 changes: 3 additions & 0 deletions ATI.Services.RabbitMQ/DelayedRequeueConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace ATI.Services.RabbitMQ;

public record DelayedRequeueConfiguration(int MaxRetryRequeueCount, int DelayedQueueRequeueTtl);
240 changes: 222 additions & 18 deletions ATI.Services.RabbitMQ/EventbusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using ATI.Services.Common.Metrics;
using ATI.Services.Common.Variables;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.DI;
using EasyNetQ.Topology;
using JetBrains.Annotations;
Expand All @@ -35,6 +36,8 @@ public class EventbusManager : IDisposable, IInitializer
private IAdvancedBus _busClient;
private const int RetryAttemptMax = 3;
private const int MaxRetryDelayPow = 2;
private const string DelayQueueSuffix = "_delay";
private const string PoisonQueueSuffix = "_poison";
private readonly JsonSerializer _jsonSerializer;
private readonly string _connectionString;

Expand Down Expand Up @@ -90,8 +93,7 @@ public Task InitializeAsync()
return Task.CompletedTask;
}


public Task<Exchange> DeclareExchangeTopicAsync(string exchangeName, bool durable, bool autoDelete) =>
public Task<Exchange> DeclareExchangeTopicAsync(string exchangeName, bool durable, bool autoDelete) =>
_busClient.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, durable, autoDelete);

public Task<Exchange> DeclareExchangeTypedAsync(string exchangeName,bool durable, bool autoDelete,
Expand Down Expand Up @@ -211,26 +213,48 @@ public Task SubscribeAsync(QueueExchangeBinding bindingInfo,

private AsyncPolicyWrap SetupPolicy(TimeSpan? timeout = null) =>
Policy.WrapAsync(Policy.TimeoutAsync(timeout ?? TimeSpan.FromSeconds(2)),
Policy.Handle<Exception>()
.WaitAndRetryAsync(3, _ => TimeSpan.FromSeconds(3)));
Policy.Handle<Exception>()
.WaitAndRetryAsync(3, _ => TimeSpan.FromSeconds(3)));

private async Task ExecuteWithPolicy(Func<Task> action)
{
var policy = Policy.Handle<TimeoutException>()
.WaitAndRetryAsync(
RetryAttemptMax,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, _) =>
{
_logger.ErrorWithObject(exception, new {TimeSpan = timeSpan, RetryCount = retryCount});
});

var policyResult = await policy.ExecuteAndCaptureAsync(async () => await action.Invoke());

if (policyResult.FinalException != null)
{
_logger.ErrorWithObject(policyResult.FinalException, action);
}
}

private async Task<Acknowledgements> ExecuteWithPolicy(Func<Task<Acknowledgements>> action)
{
var policy = Policy.Handle<TimeoutException>()
.WaitAndRetryAsync(
RetryAttemptMax,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, _) =>
{
_logger.ErrorWithObject(exception, new { TimeSpan = timeSpan, RetryCount = retryCount });
});
RetryAttemptMax,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, _) =>
{
_logger.ErrorWithObject(exception, new {TimeSpan = timeSpan, RetryCount = retryCount});
}
);

var policyResult = await policy.ExecuteAndCaptureAsync(async () => await action.Invoke());

if (policyResult.FinalException != null)
{
_logger.ErrorWithObject(policyResult.FinalException, action);
}

return policyResult.Result;
}

private void ResubscribeOnReconnect()
Expand Down Expand Up @@ -283,13 +307,80 @@ async Task HandleEventBusMessageWithPolicy(ReadOnlyMemory<byte> body, MessagePro
}
}

private async Task<Queue> DeclareBindQueue(QueueExchangeBinding bindingInfo)
private async Task BindConsumerAsync(QueueExchangeBinding mainQueueBinding,
QueueExchangeBinding delayQueueBinding,
QueueExchangeBinding poisonQueueBinding,
Func<byte[], MessageProperties, MessageReceivedInfo, Task<Acknowledgements>> handler,
Func<byte[], MessageProperties, MessageReceivedInfo, Task<Acknowledgements>> poisonHandler,
DelayedRequeueConfiguration delayedConfig,
string metricEntity)
{
var queue = await _busClient.QueueDeclareAsync(
name: bindingInfo.Queue.Name,
autoDelete: bindingInfo.Queue.IsAutoDelete,
durable: bindingInfo.Queue.IsDurable,
exclusive: bindingInfo.Queue.IsExclusive);
var mainQueue = await DeclareBindQueue(mainQueueBinding);
_busClient.Consume(mainQueue, HandleEventBusMessageWithPolicyAndRequeue());

if (poisonHandler != null)
{
var poisonQueue = await DeclareBindQueue(poisonQueueBinding);
_busClient.Consume(poisonQueue, HandlePoisonQueueMessages());
}

Func<ReadOnlyMemory<byte>, MessageProperties, MessageReceivedInfo, Task<AckStrategy>>
HandleEventBusMessageWithPolicyAndRequeue()
{
return async (body, props, info) =>
{
using (_inMetricsFactory.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
$"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
HandleMessageProps(props);
var handlerAcknowledgeResponse = await ExecuteWithPolicy(
async () => await handler.Invoke(body.ToArray(), props, info)
);

return handlerAcknowledgeResponse switch
{
Acknowledgements.Ack => AckStrategies.Ack,

Acknowledgements.Nack => await HandleNackResponse(
mainQueueBinding,
delayQueueBinding,
poisonQueueBinding,
delayedConfig,
props,
body.ToArray()),

Acknowledgements.Reject => AckStrategies.NackWithRequeue,

_ => AckStrategies.Ack
};
}
};
}

Func<ReadOnlyMemory<byte>, MessageProperties, MessageReceivedInfo, Task> HandlePoisonQueueMessages()
{
return async (body, props, info) =>
{
using (_outMetricsFactory.CreateLoggingMetricsTimer($"{metricEntity ?? "Eventbus"}-Poison", $"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
HandleMessageProps(props);
await ExecuteWithPolicy(
async () => await poisonHandler.Invoke(body.ToArray(), props, info)
);
}
};
}
}

private async Task<Queue> DeclareBindQueue(QueueExchangeBinding bindingInfo)
{
var queue = await _busClient.QueueDeclareAsync(
name: bindingInfo.Queue.Name,
autoDelete: bindingInfo.Queue.IsAutoDelete,
durable: bindingInfo.Queue.IsDurable,
exclusive: bindingInfo.Queue.IsExclusive);

var exchange = new Exchange(bindingInfo.Exchange.Name,
bindingInfo.Exchange.Type,
Expand All @@ -308,14 +399,128 @@ private void HandleMessageProps(MessageProperties props)
GetAcceptLanguageFromProperties(props);
}

#nullable enable
private async Task<AckStrategy> HandleNackResponse(
QueueExchangeBinding mainQueueBinding,
QueueExchangeBinding delayQueueBinding,
QueueExchangeBinding poisonQueueBinding,
DelayedRequeueConfiguration delayedConfig,
MessageProperties props,
byte[] body)
{
var counter = props.Headers.TryGetValue("x-counter", out object? xCounterHeader)
&& int.TryParse(xCounterHeader?.ToString(), out int headerCounter)
? headerCounter
: 0;

if (counter >= delayedConfig.MaxRetryRequeueCount)
{
await PublishToPoisonQueueAsync(poisonQueueBinding, body);
}
else
{
await PublishToDelayQueueAsync(
delayQueueBinding,
mainQueueBinding,
counter,
delayedConfig.DelayedQueueRequeueTtl,
body);
}

return AckStrategies.Ack;
}
#nullable disable

private async Task PublishToPoisonQueueAsync(QueueExchangeBinding poisonQueueBinding, byte[] messageBody)
{
try
{
await _busClient.QueueDeclareAsync(
poisonQueueBinding.Queue.Name,
c => c.AsAutoDelete(poisonQueueBinding.Queue.IsAutoDelete)
.AsDurable(poisonQueueBinding.Queue.IsDurable)
.AsExclusive(poisonQueueBinding.Queue.IsExclusive));
}
catch (Exception exception)
{
_logger.ErrorWithObject(
exception,
"Не удалось создать очередь задержки."
);

return;
}

var delayExchange = await _busClient.ExchangeDeclareAsync(
poisonQueueBinding.Exchange.Name,
poisonQueueBinding.Exchange.Type);

await _busClient.BindAsync(delayExchange, poisonQueueBinding.Queue, poisonQueueBinding.RoutingKey);
await SetupPolicy().ExecuteAndCaptureAsync(async () =>
await _busClient.PublishAsync(
delayExchange,
poisonQueueBinding.RoutingKey,
false,
GetProperties(null, true),
messageBody)
);
}

private async Task PublishToDelayQueueAsync(
QueueExchangeBinding delayQueueBinding,
QueueExchangeBinding mainQueue,
int counter,
int delayedQueueRequeueTtl,
byte[] messageBody)
{
try
{
await _busClient.QueueDeclareAsync(
delayQueueBinding.Queue.Name,
c => c.WithArgument("x-dead-letter-exchange", String.Empty)
.WithArgument("x-dead-letter-routing-key", mainQueue.Queue.Name)
.WithArgument("x-message-ttl", delayedQueueRequeueTtl)
.AsAutoDelete(delayQueueBinding.Queue.IsAutoDelete)
.AsDurable(delayQueueBinding.Queue.IsDurable)
.AsExclusive(delayQueueBinding.Queue.IsExclusive));
}
catch (Exception exception)
{
_logger.ErrorWithObject(
exception,
"Не удалось создать очередь задержки. Причина, скорее всего, в существующей очереди."
);

return;
}

var delayExchange = await _busClient.ExchangeDeclareAsync(
delayQueueBinding.Exchange.Name,
delayQueueBinding.Exchange.Type);

_busClient.Bind(delayExchange, delayQueueBinding.Queue, delayQueueBinding.RoutingKey);
await SetupPolicy().ExecuteAndCaptureAsync(async () =>
await _busClient.PublishAsync(
delayExchange,
delayQueueBinding.RoutingKey,
false,
GetProperties(new Dictionary<string, object>
{
{"x-counter", ++counter}
},
true),
messageBody)
);
}

private void GetAcceptLanguageFromProperties(MessageProperties props)
{
try
{
if (!props.Headers.TryGetValue(MessagePropertiesNames.AcceptLang, out var acceptLanguage))
return;

var acceptLanguageStr = BodyEncoding.GetString((byte[])acceptLanguage);
var acceptLanguageStr = BodyEncoding.GetString((byte[]) acceptLanguage);
FlowContext<RequestMetaData>.Current =
new RequestMetaData
{
Expand Down Expand Up @@ -393,6 +598,5 @@ public void Dispose()
}

public string InitStartConsoleMessage() => "Start Eventbus initializer";

public string InitEndConsoleMessage() => "End Eventbus initializer";
}

0 comments on commit 030fb63

Please sign in to comment.