diff --git a/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj b/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
index aeed719..f9bd10f 100644
--- a/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
+++ b/ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
@@ -16,7 +16,7 @@
1701;1702;CS1591;CS1571;CS1573;CS1574
-
+
\ No newline at end of file
diff --git a/ATI.Services.RabbitMQ/EventbusManager.cs b/ATI.Services.RabbitMQ/EventbusManager.cs
index 6205056..daa5c3b 100644
--- a/ATI.Services.RabbitMQ/EventbusManager.cs
+++ b/ATI.Services.RabbitMQ/EventbusManager.cs
@@ -49,8 +49,7 @@ public class EventbusManager : IDisposable, IInitializer
private static readonly UTF8Encoding BodyEncoding = new(false);
private readonly RmqTopology _rmqTopology;
- public EventbusManager(JsonSerializer jsonSerializer,
- IOptions options, RmqTopology rmqTopology)
+ public EventbusManager(JsonSerializer jsonSerializer, IOptions options, RmqTopology rmqTopology)
{
_options = options.Value;
_connectionString = options.Value.ConnectionString;
@@ -58,15 +57,14 @@ public EventbusManager(JsonSerializer jsonSerializer,
_rmqTopology = rmqTopology;
_subscribePolicy = Policy.Handle()
- .WaitAndRetryForeverAsync(
- _ => _options.RabbitConnectInterval,
- (exception, _) => _logger.Error(exception));
+ .WaitAndRetryForeverAsync(_ => _options.RabbitConnectInterval,
+ (exception, _) => _logger.Error(exception));
_retryForeverPolicy =
Policy.Handle()
.WaitAndRetryForeverAsync(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, Math.Min(retryAttempt, MaxRetryDelayPow))),
- (exception, _) => _logger.ErrorWithObject(exception));
+ (exception, _) => _logger.Error(exception));
}
public Task InitializeAsync()
@@ -80,8 +78,9 @@ public Task InitializeAsync()
new RabbitMqConventions(c.Resolve(), _options));
}).Advanced;
- _busClient.Connected += async (_, _) => await ResubscribeOnReconnect();
- _busClient.Disconnected += (_, _) => { _logger.Error("Disconnected from RMQ for some reason!"); };
+ _busClient.Connected += (_, _) => ResubscribeOnReconnect();
+ _busClient.Disconnected += (_, b) => _logger.ErrorWithObject(
+ "Disconnected from RMQ for some reason!", b.Hostname, b.Port, b.Reason, b.Type.ToString("G"));
}
catch (Exception exception)
{
@@ -91,6 +90,7 @@ public Task InitializeAsync()
return Task.CompletedTask;
}
+
public Task DeclareExchangeTopicAsync(string exchangeName, bool durable, bool autoDelete) =>
_busClient.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, durable, autoDelete);
@@ -187,31 +187,26 @@ public Task SubscribeAsync(
return SubscribeAsync(binding, handler, metricEntity);
}
- public async Task SubscribeAsync(
- QueueExchangeBinding bindingInfo,
- Func handler,
- string metricEntity = null)
+ public Task SubscribeAsync(QueueExchangeBinding bindingInfo,
+ Func handler,
+ string metricEntity = null)
{
- _subscriptions.Add(new SubscriptionInfo
- {
- Binding = bindingInfo,
- EventbusSubscriptionHandler = handler,
- MetricsEntity = metricEntity
- });
-
RabbitMqDeclaredQueues.DeclaredQueues.Add(bindingInfo.Queue);
- try
- {
- await SubscribePrivateAsync(bindingInfo, handler, metricEntity);
- }
- catch (Exception ex)
- {
- _logger.ErrorWithObject(ex,
- "Initial subscription failed, trying to subscribe in background",
- logObjects: bindingInfo.Queue.Name);
- _subscribePolicy.ExecuteAsync(() => SubscribePrivateAsync(bindingInfo, handler, metricEntity)).Forget();
- }
+ //wait for 1 sec to return else subscribe in background
+ return Task.WhenAny(
+ Task.Delay(TimeSpan.FromSeconds(1)),
+ _subscribePolicy.ExecuteAsync(async () =>
+ {
+ var consumer = await SubscribePrivateAsync(bindingInfo, handler, metricEntity);
+ _subscriptions.Add(new SubscriptionInfo
+ {
+ Binding = bindingInfo,
+ Consumer = consumer,
+ EventbusSubscriptionHandler = handler,
+ MetricsEntity = metricEntity
+ });
+ }));
}
private AsyncPolicyWrap SetupPolicy(TimeSpan? timeout = null) =>
@@ -238,47 +233,44 @@ private async Task ExecuteWithPolicy(Func action)
}
}
- private async Task ResubscribeOnReconnect()
+ private void ResubscribeOnReconnect()
{
- _logger.Warn("Reconnect happened, start resubscribing");
+ _logger.Warn("Connected to rmq");
+
foreach (var subscription in _subscriptions)
{
- try
- {
- await ResubscribeInternalAsync(subscription);
- }
- catch (Exception e)
- {
- _logger.ErrorWithObject(e, "Failed to resubscribe", logObjects: subscription.Binding.Queue.Name);
- _retryForeverPolicy.ExecuteAsync(() => ResubscribeInternalAsync(subscription)).Forget();
- }
- }
+ _logger.WarnWithObject("Acquire resubscribing lock", subscription.Binding.Exchange, subscription.Binding.RoutingKey);
- async Task ResubscribeInternalAsync(SubscriptionInfo sub)
- {
- if (sub.Binding.Queue.IsExclusive)
+ lock (subscription.ResubscribeLock)
{
- await SubscribePrivateAsync(sub.Binding,
- sub.EventbusSubscriptionHandler,
- sub.MetricsEntity);
- }
- else
- {
- // for non exclusive queues we reuse existing consumer
- // alternative is to dispose old consumer and create new consumer
- await DeclareBindQueue(sub.Binding);
+ if (subscription.ResubscribeTask is { IsCompleted: false })
+ continue;
+
+ _logger.WarnWithObject("Acquired lock, start resubscribing", subscription.Binding.Exchange, subscription.Binding.RoutingKey);
+
+ subscription.ResubscribeTask = _retryForeverPolicy.ExecuteAsync(
+ async () =>
+ {
+ var newConsumer = await SubscribePrivateAsync(subscription.Binding,
+ subscription.EventbusSubscriptionHandler,
+ subscription.MetricsEntity);
+ subscription.Consumer.Dispose();
+ subscription.Consumer = newConsumer;
+ });
}
}
}
- private async Task SubscribePrivateAsync(
+ private async Task SubscribePrivateAsync(
QueueExchangeBinding bindingInfo,
Func handler,
string metricEntity)
{
var queue = await DeclareBindQueue(bindingInfo);
- _busClient.Consume(queue, HandleEventBusMessageWithPolicy);
+ var consumer = _busClient.Consume(queue, HandleEventBusMessageWithPolicy);
+ return consumer;
+
async Task HandleEventBusMessageWithPolicy(ReadOnlyMemory body, MessageProperties props, MessageReceivedInfo info)
{
using (_inMetricsFactory.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
@@ -393,17 +385,14 @@ public void Dispose()
_busClient.QueueDelete(queue.Name);
}
}
+
+ foreach (var subscription in _subscriptions)
+ subscription.Consumer.Dispose();
_busClient?.Dispose();
}
- public string InitStartConsoleMessage()
- {
- return "Start Eventbus initializer";
- }
+ public string InitStartConsoleMessage() => "Start Eventbus initializer";
- public string InitEndConsoleMessage()
- {
- return "End Eventbus initializer";
- }
+ public string InitEndConsoleMessage() => "End Eventbus initializer";
}
\ No newline at end of file
diff --git a/ATI.Services.RabbitMQ/SubscriptionInfo.cs b/ATI.Services.RabbitMQ/SubscriptionInfo.cs
index 675d512..eead8f4 100644
--- a/ATI.Services.RabbitMQ/SubscriptionInfo.cs
+++ b/ATI.Services.RabbitMQ/SubscriptionInfo.cs
@@ -2,13 +2,15 @@
using System.Threading.Tasks;
using EasyNetQ;
-namespace ATI.Services.RabbitMQ
+namespace ATI.Services.RabbitMQ;
+
+public class SubscriptionInfo
{
- public class SubscriptionInfo
- {
- public QueueExchangeBinding Binding { get; set; }
- public Func EventbusSubscriptionHandler { get; set; }
+ public QueueExchangeBinding Binding { get; set; }
+ public Func EventbusSubscriptionHandler { get; set; }
+ public string MetricsEntity { get; set; }
+ public IDisposable Consumer { get; set; }
- public string MetricsEntity { get; set; }
- }
+ public Task ResubscribeTask { get; set; }
+ public object ResubscribeLock { get; } = new();
}
\ No newline at end of file