From f0aa6df05b369eb292fc75ae84724cfb293bc306 Mon Sep 17 00:00:00 2001 From: Renato Golia Date: Mon, 18 Feb 2019 13:59:06 +0100 Subject: [PATCH] RabbitMQ: Improved handling of prefetched messages (#73) * Fixes to RabbitMQ engine components --- src/Nybus/NybusHost.cs | 1 - .../RabbitMq/BufferSubject.cs | 8 ++++--- .../RabbitMq/RabbitMqBusEngine.cs | 23 +++++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Nybus/NybusHost.cs b/src/Nybus/NybusHost.cs index 74f73ec..6582466 100644 --- a/src/Nybus/NybusHost.cs +++ b/src/Nybus/NybusHost.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.ComponentModel; using System.Linq; using System.Reactive.Linq; using System.Threading.Tasks; diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/BufferSubject.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/BufferSubject.cs index a407b2e..5cef7ea 100644 --- a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/BufferSubject.cs +++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/BufferSubject.cs @@ -61,12 +61,14 @@ public void OnNext(T value) public IDisposable Subscribe(IObserver observer) { - Interlocked.Increment(ref _refCount); - - return new CompositeDisposable( + var disposable = new CompositeDisposable( Observable.Create(o => ConsumeActions(o)).Concat(_subject).Subscribe(observer), Disposable.Create(() => Interlocked.Decrement(ref _refCount)) ); + + Interlocked.Increment(ref _refCount); + + return disposable; } private IDisposable ConsumeActions(IObserver observable) diff --git a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs index e3c9354..c4a78d5 100644 --- a/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs +++ b/src/engines/Nybus.Engine.RabbitMq/RabbitMq/RabbitMqBusEngine.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; -using System.Reflection; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -82,12 +81,11 @@ public Task> StartAsync() queueToConsume.Add(commandQueue.QueueName); } - - var sequence = Observable.Defer(() => from queue in queueToConsume.ToObservable() - from args in SubscribeMessages(_channel, queue) - let message = GetMessage(args) - where message != null - select message); + var sequence = from queue in queueToConsume.ToObservable() + from args in SubscribeMessages(_channel, queue) + let message = GetMessage(args) + where message != null + select message; return Task.FromResult(sequence); @@ -233,8 +231,7 @@ public Task NotifySuccessAsync(Message message) { try { - _channel.BasicAck(deliveryTag, false); - _processingMessages.TryRemoveItem(deliveryTag); + AckMessage(deliveryTag); } catch (AlreadyClosedException ex) { @@ -247,10 +244,16 @@ public Task NotifySuccessAsync(Message message) _logger.LogWarning(state, ex, (s,e) => $"Unable to ack message {s.MessageId} (delivery tag: {s.DeliveryTag})"); } } - + return Task.CompletedTask; } + private void AckMessage(ulong deliveryTag) + { + _channel.BasicAck(deliveryTag, false); + _processingMessages.TryRemoveItem(deliveryTag); + } + public Task NotifyFailAsync(Message message) { if (message.Headers.TryGetValue(RabbitMqHeaders.DeliveryTag, out var headerValue) && ulong.TryParse(headerValue, out var deliveryTag))