Skip to content

Commit

Permalink
Merge pull request #23 from atidev/resubscribe-fix
Browse files Browse the repository at this point in the history
Dispose and recreate consumer + lock for resubscribe per queue
  • Loading branch information
Artikud authored Mar 15, 2024
2 parents 08565e5 + 297355b commit 1039ce6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 72 deletions.
2 changes: 1 addition & 1 deletion ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<NoWarn>1701;1702;CS1591;CS1571;CS1573;CS1574</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="atisu.services.common" Version="13.7.0" />
<PackageReference Include="atisu.services.common" Version="13.7.2" />
<PackageReference Include="EasyNetQ" Version="7.4.3" />
</ItemGroup>
</Project>
117 changes: 53 additions & 64 deletions ATI.Services.RabbitMQ/EventbusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,22 @@ public class EventbusManager : IDisposable, IInitializer
private static readonly UTF8Encoding BodyEncoding = new(false);
private readonly RmqTopology _rmqTopology;

public EventbusManager(JsonSerializer jsonSerializer,
IOptions<EventbusOptions> options, RmqTopology rmqTopology)
public EventbusManager(JsonSerializer jsonSerializer, IOptions<EventbusOptions> options, RmqTopology rmqTopology)
{
_options = options.Value;
_connectionString = options.Value.ConnectionString;
_jsonSerializer = jsonSerializer;
_rmqTopology = rmqTopology;

_subscribePolicy = Policy.Handle<Exception>()
.WaitAndRetryForeverAsync(
_ => _options.RabbitConnectInterval,
(exception, _) => _logger.Error(exception));
.WaitAndRetryForeverAsync(_ => _options.RabbitConnectInterval,
(exception, _) => _logger.Error(exception));

_retryForeverPolicy =
Policy.Handle<Exception>()
.WaitAndRetryForeverAsync(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, Math.Min(retryAttempt, MaxRetryDelayPow))),
(exception, _) => _logger.ErrorWithObject(exception));
(exception, _) => _logger.Error(exception));
}

public Task InitializeAsync()
Expand All @@ -80,8 +78,9 @@ public Task InitializeAsync()
new RabbitMqConventions(c.Resolve<ITypeNameSerializer>(), _options));
}).Advanced;

_busClient.Connected += async (_, _) => await ResubscribeOnReconnect();
_busClient.Disconnected += (_, _) => { _logger.Error("Disconnected from RMQ for some reason!"); };
_busClient.Connected += (_, _) => ResubscribeOnReconnect();
_busClient.Disconnected += (_, b) => _logger.ErrorWithObject(
"Disconnected from RMQ for some reason!", b.Hostname, b.Port, b.Reason, b.Type.ToString("G"));
}
catch (Exception exception)
{
Expand All @@ -91,6 +90,7 @@ public Task InitializeAsync()
return Task.CompletedTask;
}


public Task<Exchange> DeclareExchangeTopicAsync(string exchangeName, bool durable, bool autoDelete) =>
_busClient.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, durable, autoDelete);

Expand Down Expand Up @@ -187,31 +187,26 @@ public Task SubscribeAsync(
return SubscribeAsync(binding, handler, metricEntity);
}

public async Task SubscribeAsync(
QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task> handler,
string metricEntity = null)
public Task SubscribeAsync(QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task> handler,
string metricEntity = null)
{
_subscriptions.Add(new SubscriptionInfo
{
Binding = bindingInfo,
EventbusSubscriptionHandler = handler,
MetricsEntity = metricEntity
});

RabbitMqDeclaredQueues.DeclaredQueues.Add(bindingInfo.Queue);

try
{
await SubscribePrivateAsync(bindingInfo, handler, metricEntity);
}
catch (Exception ex)
{
_logger.ErrorWithObject(ex,
"Initial subscription failed, trying to subscribe in background",
logObjects: bindingInfo.Queue.Name);
_subscribePolicy.ExecuteAsync(() => SubscribePrivateAsync(bindingInfo, handler, metricEntity)).Forget();
}
//wait for 1 sec to return else subscribe in background
return Task.WhenAny(
Task.Delay(TimeSpan.FromSeconds(1)),
_subscribePolicy.ExecuteAsync(async () =>
{
var consumer = await SubscribePrivateAsync(bindingInfo, handler, metricEntity);
_subscriptions.Add(new SubscriptionInfo
{
Binding = bindingInfo,
Consumer = consumer,
EventbusSubscriptionHandler = handler,
MetricsEntity = metricEntity
});
}));
}

private AsyncPolicyWrap SetupPolicy(TimeSpan? timeout = null) =>
Expand All @@ -238,47 +233,44 @@ private async Task ExecuteWithPolicy(Func<Task> action)
}
}

private async Task ResubscribeOnReconnect()
private void ResubscribeOnReconnect()
{
_logger.Warn("Reconnect happened, start resubscribing");
_logger.Warn("Connected to rmq");

foreach (var subscription in _subscriptions)
{
try
{
await ResubscribeInternalAsync(subscription);
}
catch (Exception e)
{
_logger.ErrorWithObject(e, "Failed to resubscribe", logObjects: subscription.Binding.Queue.Name);
_retryForeverPolicy.ExecuteAsync(() => ResubscribeInternalAsync(subscription)).Forget();
}
}
_logger.WarnWithObject("Acquire resubscribing lock", subscription.Binding.Exchange, subscription.Binding.RoutingKey);

async Task ResubscribeInternalAsync(SubscriptionInfo sub)
{
if (sub.Binding.Queue.IsExclusive)
lock (subscription.ResubscribeLock)
{
await SubscribePrivateAsync(sub.Binding,
sub.EventbusSubscriptionHandler,
sub.MetricsEntity);
}
else
{
// for non exclusive queues we reuse existing consumer
// alternative is to dispose old consumer and create new consumer
await DeclareBindQueue(sub.Binding);
if (subscription.ResubscribeTask is { IsCompleted: false })
continue;

_logger.WarnWithObject("Acquired lock, start resubscribing", subscription.Binding.Exchange, subscription.Binding.RoutingKey);

subscription.ResubscribeTask = _retryForeverPolicy.ExecuteAsync(
async () =>
{
var newConsumer = await SubscribePrivateAsync(subscription.Binding,
subscription.EventbusSubscriptionHandler,
subscription.MetricsEntity);
subscription.Consumer.Dispose();
subscription.Consumer = newConsumer;
});
}
}
}

private async Task SubscribePrivateAsync(
private async Task<IDisposable> SubscribePrivateAsync(
QueueExchangeBinding bindingInfo,
Func<byte[], MessageProperties, MessageReceivedInfo, Task> handler,
string metricEntity)
{
var queue = await DeclareBindQueue(bindingInfo);
_busClient.Consume(queue, HandleEventBusMessageWithPolicy);
var consumer = _busClient.Consume(queue, HandleEventBusMessageWithPolicy);

return consumer;

async Task HandleEventBusMessageWithPolicy(ReadOnlyMemory<byte> body, MessageProperties props, MessageReceivedInfo info)
{
using (_inMetricsFactory.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
Expand Down Expand Up @@ -393,17 +385,14 @@ public void Dispose()
_busClient.QueueDelete(queue.Name);
}
}

foreach (var subscription in _subscriptions)
subscription.Consumer.Dispose();

_busClient?.Dispose();
}

public string InitStartConsoleMessage()
{
return "Start Eventbus initializer";
}
public string InitStartConsoleMessage() => "Start Eventbus initializer";

public string InitEndConsoleMessage()
{
return "End Eventbus initializer";
}
public string InitEndConsoleMessage() => "End Eventbus initializer";
}
16 changes: 9 additions & 7 deletions ATI.Services.RabbitMQ/SubscriptionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
using System.Threading.Tasks;
using EasyNetQ;

namespace ATI.Services.RabbitMQ
namespace ATI.Services.RabbitMQ;

public class SubscriptionInfo
{
public class SubscriptionInfo
{
public QueueExchangeBinding Binding { get; set; }
public Func<byte[], MessageProperties, MessageReceivedInfo, Task> EventbusSubscriptionHandler { get; set; }
public QueueExchangeBinding Binding { get; set; }
public Func<byte[], MessageProperties, MessageReceivedInfo, Task> EventbusSubscriptionHandler { get; set; }
public string MetricsEntity { get; set; }
public IDisposable Consumer { get; set; }

public string MetricsEntity { get; set; }
}
public Task ResubscribeTask { get; set; }
public object ResubscribeLock { get; } = new();
}

0 comments on commit 1039ce6

Please sign in to comment.