Skip to content

Commit

Permalink
Configurable queue/сonsumer + ack strategies for handler + DotNet8 (#26)
Browse files Browse the repository at this point in the history
* Update lib and dotnet ver

* Consumer configuration and ack strategies

* QueueConfiguration + refactoring
  • Loading branch information
muphblu authored Jul 8, 2024
1 parent c910de4 commit 97843bc
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ jobs:
- name: Setup dotnet
uses: actions/setup-dotnet@v1
with:
dotnet-version: '7.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
dotnet-version: '8.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
- name: Build
run: dotnet build --configuration Release
2 changes: 1 addition & 1 deletion .github/workflows/release-preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup dotnet
uses: actions/setup-dotnet@v1
with:
dotnet-version: '7.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
dotnet-version: '8.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
- name: Pack
run: dotnet pack --configuration Release /p:Version=${VERSION} --output .
- name: Push
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Setup dotnet
uses: actions/setup-dotnet@v1
with:
dotnet-version: '7.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
dotnet-version: '8.0.x' # SDK Version to use; x will use the latest version of the 6.0 channel
- name: Build
run: dotnet build --configuration Release /p:Version=${VERSION}
- name: Pack
Expand Down
4 changes: 2 additions & 2 deletions ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup Label="Main">
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<RuntimeIdentifiers>linux-x64</RuntimeIdentifiers>
<Authors>Team Services</Authors>
<Company>ATI</Company>
Expand All @@ -16,7 +16,7 @@
<NoWarn>1701;1702;CS1591;CS1571;CS1573;CS1574</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="atisu.services.common" Version="15.0.0" />
<PackageReference Include="atisu.services.common" Version="16.4.0" />
<PackageReference Include="EasyNetQ" Version="7.4.3" />
</ItemGroup>
</Project>
68 changes: 39 additions & 29 deletions ATI.Services.RabbitMQ/EventbusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ public Task SubscribeAsync(
public Task SubscribeAsync(QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task> handler,
string metricEntity = null)
{
return SubscribeAsync(bindingInfo,
async (body, props, info) =>
{
await handler(body, props, info);
return AckStrategies.Ack;
},
metricEntity);
}

public Task SubscribeAsync(QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task<AckStrategy>> handler,
string metricEntity = null)
{
RabbitMqDeclaredQueues.DeclaredQueues.Add(bindingInfo.Queue);

Expand All @@ -223,25 +236,6 @@ private AsyncPolicyWrap SetupPolicy(TimeSpan? timeout = null) =>
Policy.Handle<Exception>()
.WaitAndRetryAsync(3, _ => TimeSpan.FromSeconds(3)));

private async Task ExecuteWithPolicy(Func<Task> action)
{
var policy = Policy.Handle<TimeoutException>()
.WaitAndRetryAsync(
RetryAttemptMax,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, _) =>
{
_logger.ErrorWithObject(exception, new {TimeSpan = timeSpan, RetryCount = retryCount});
});

var policyResult = await policy.ExecuteAndCaptureAsync(async () => await action.Invoke());

if (policyResult.FinalException != null)
{
_logger.ErrorWithObject(policyResult.FinalException, action);
}
}

private async Task<Acknowledgements> ExecuteWithPolicy(Func<Task<Acknowledgements>> action)
{
var policy = Policy.Handle<TimeoutException>()
Expand Down Expand Up @@ -294,22 +288,34 @@ private void ResubscribeOnReconnect()

private async Task<IDisposable> SubscribePrivateAsync(
QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task> handler,
Func<byte[], MessageProperties, MessageReceivedInfo, Task<AckStrategy>> handler,
string metricEntity)
{
var queue = await DeclareBindQueue(bindingInfo);
var consumer = _busClient.Consume(queue, HandleEventBusMessageWithPolicy);
var consumer = _busClient.Consume(queue,
HandleEventBusMessageWithPolicyAckStrategy,
bindingInfo.ConsumerConfiguration ?? (_ => { }));

return consumer;

async Task HandleEventBusMessageWithPolicy(ReadOnlyMemory<byte> body, MessageProperties props, MessageReceivedInfo info)
async Task<AckStrategy> HandleEventBusMessageWithPolicyAckStrategy(ReadOnlyMemory<byte> body,
MessageProperties props,
MessageReceivedInfo info)
{
using (_inMetrics.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
$"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
$"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
HandleMessageProps(props);
await ExecuteWithPolicy(async () => await handler.Invoke(body.ToArray(), props, info));
try
{
return await handler(body.ToArray(), props, info);
}
catch (Exception e)
{
_logger.ErrorWithObject(e, "Error while handling eventbus message", new {info.Exchange, info.RoutingKey});
throw;
}
}
}
}
Expand Down Expand Up @@ -384,10 +390,14 @@ await ExecuteWithPolicy(
private async Task<Queue> DeclareBindQueue(QueueExchangeBinding bindingInfo)
{
var queue = await _busClient.QueueDeclareAsync(bindingInfo.Queue.Name,
c => c.AsAutoDelete(bindingInfo.Queue.IsAutoDelete)
.AsDurable(bindingInfo.Queue.IsDurable)
.AsExclusive(bindingInfo.Queue.IsExclusive)
.WithQueueType(bindingInfo.QueueType));
c =>
{
bindingInfo.QueueConfiguration?.Invoke(c);
c.AsAutoDelete(bindingInfo.Queue.IsAutoDelete)
.AsDurable(bindingInfo.Queue.IsDurable)
.AsExclusive(bindingInfo.Queue.IsExclusive)
.WithQueueType(bindingInfo.QueueType);
});

var exchange = new Exchange(bindingInfo.Exchange.Name,
bindingInfo.Exchange.Type,
Expand Down
45 changes: 33 additions & 12 deletions ATI.Services.RabbitMQ/QueueExchangeBinding.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
using EasyNetQ.Topology;
using System;
using EasyNetQ;
using EasyNetQ.Topology;

namespace ATI.Services.RabbitMQ;

public class QueueExchangeBinding
/// <param name="queueConfiguration">Configuration for queue declare. This configuration will be overriden by params in SubscribeAsync method</param>
/// <param name="consumerConfiguration">Configuration for consumer</param>
public class QueueExchangeBinding(
ExchangeInfo exchange,
Queue queue,
string routingKey,
string queueType = QueueType.Quorum,
Action<IQueueDeclareConfiguration> queueConfiguration = null,
Action<ISimpleConsumeConfiguration> consumerConfiguration = null)
{
public QueueExchangeBinding(ExchangeInfo exchange, Queue queue, string routingKey, string queueType = EasyNetQ.QueueType.Quorum)
//for back compatibility release
[Obsolete("Use constructor with queueConfiguration and consumerConfiguration")]
public QueueExchangeBinding(ExchangeInfo exchange,
Queue queue,
string routingKey,
string queueType = EasyNetQ.QueueType.Quorum)
:this(exchange, queue, routingKey, queueType, null, null)
{
Queue = queue;
RoutingKey = routingKey;
Exchange = exchange;
QueueType = queueType;
}

public Queue Queue { get; }
public string RoutingKey { get; }
public ExchangeInfo Exchange { get; }
public string QueueType { get; set; }

public Queue Queue { get; } = queue;
public string RoutingKey { get; } = routingKey;
public ExchangeInfo Exchange { get; } = exchange;
public string QueueType { get; } = queueType;
/// <summary>
/// Configuration for queue declare
/// This configuration will be overriden by params in SubscribeAsync method
/// </summary>
public Action<IQueueDeclareConfiguration> QueueConfiguration { get; } = queueConfiguration;
/// <summary>
/// Configuration for consumer
/// </summary>
public Action<ISimpleConsumeConfiguration> ConsumerConfiguration { get; } = consumerConfiguration;
}
26 changes: 15 additions & 11 deletions ATI.Services.RabbitMQ/RMQTopology.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
using System.Net;
using System;
using System.Net;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Extensions;
using EasyNetQ;
using EasyNetQ.Topology;
using JetBrains.Annotations;
using Microsoft.Extensions.Options;

namespace ATI.Services.RabbitMQ;

[PublicAPI]
public class RmqTopology
public class RmqTopology(IOptions<EventbusOptions> options)
{
private readonly EventbusOptions _eventbusOptions;
private readonly EventbusOptions _eventbusOptions = options.Value;

private const string SubscriptionType = "eventbus";

public RmqTopology(IOptions<EventbusOptions> options)
{
_eventbusOptions = options.Value;
}

/// <summary>
/// </summary>
/// <param name="exchangeName"></param>
Expand All @@ -38,7 +35,9 @@ public QueueExchangeBinding CreateBinding(
bool isExclusiveQueueName = false,
string customQueueName = null,
string entityName = null,
string queueType = EasyNetQ.QueueType.Quorum)
string queueType = QueueType.Quorum,
Action<IQueueDeclareConfiguration> queueConfiguration = null,
Action<ISimpleConsumeConfiguration> consumerConfiguration = null)
{
var queueName =
EventbusQueueNameTemplate(exchangeName, routingKey, customQueueName, isExclusiveQueueName,
Expand All @@ -51,13 +50,18 @@ public QueueExchangeBinding CreateBinding(
Name = exchangeName,
Type = ExchangeType.Topic
};
return new QueueExchangeBinding(subscribeExchange, createdQueue, routingKey, queueType);
return new QueueExchangeBinding(subscribeExchange,
createdQueue,
routingKey,
queueType,
queueConfiguration,
consumerConfiguration);
}

private readonly string _queuePostfixName = $"-{Dns.GetHostName()}-{ConfigurationManager.GetApplicationPort()}";

private string EventbusQueueNameTemplate(
string rabbitService,
string rabbitService,
string routingKey,
string customQueueName,
bool isExclusiveQueueName,
Expand Down
5 changes: 4 additions & 1 deletion ATI.Services.RabbitMQ/SubscriptionInfo.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;

namespace ATI.Services.RabbitMQ;

public class SubscriptionInfo
{
public QueueExchangeBinding Binding { get; set; }
public Func<byte[], MessageProperties, MessageReceivedInfo, Task> EventbusSubscriptionHandler { get; set; }

public Func<byte[], MessageProperties, MessageReceivedInfo, Task<AckStrategy>> EventbusSubscriptionHandler { get; set; }

public string MetricsEntity { get; set; }
public IDisposable Consumer { get; set; }

Expand Down

0 comments on commit 97843bc

Please sign in to comment.