From 47cf2c64106d4bf4d1281c6e77200735d820cfd1 Mon Sep 17 00:00:00 2001 From: Ruairidh MacLeod <5160559+rkm@users.noreply.github.com> Date: Wed, 26 Feb 2020 17:09:05 +0000 Subject: [PATCH] Release v1.4.5 (#134) * Fix line endings * Add Shutdown call to messaging consumers, defaulting to doing nothing Call this from Rabbit adapter when shutting down in threaded mode Use that to clean up and drain the Ack queue in IdentifierMapper Co-authored-by: James A Sutherland --- CHANGELOG.md | 7 ++- README.md | 2 +- src/SharedAssemblyInfo.cs | 6 +-- src/common/Smi.Common/Messaging/Consumer.cs | 4 ++ src/common/Smi.Common/Messaging/IConsumer.cs | 5 +++ src/common/Smi.Common/RabbitMQAdapter.cs | 22 ++++++++- .../IdentifierMapperQueueConsumer.cs | 45 +++++++++++++------ 7 files changed, 71 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ddd77ee2..acfaf5d23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ... +## [1.4.5] - 2020-02-26 + +- Add clean shutdown hook for IdentifierMapper to clean up the worker threads + ## [1.4.4] - 2020-02-25 - Update Travis config and Java library install shell script to resolve some Travis stability issues @@ -181,7 +185,8 @@ First stable release after importing the repository from the private [SMIPlugin] - Anonymous `MappingTableName` must now be fully specified to pass validation (e.g. `mydb.mytbl`). Previously skipping database portion was supported. -[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.4...develop +[Unreleased]: https://github.com/SMI/SmiServices/compare/v1.4.5...develop +[1.4.5]: https://github.com/SMI/SmiServices/compare/v1.4.4...v1.4.5 [1.4.4]: https://github.com/SMI/SmiServices/compare/v1.4.3...v1.4.4 [1.4.3]: https://github.com/SMI/SmiServices/compare/v1.4.2...v1.4.3 [1.4.2]: https://github.com/SMI/SmiServices/compare/v1.4.1...v1.4.2 diff --git a/README.md b/README.md index 062c5026f..a6aa86d90 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ![GitHub](https://img.shields.io/github/license/SMI/SmiServices) [![Total alerts](https://img.shields.io/lgtm/alerts/g/SMI/SmiServices.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/SMI/SmiServices/alerts/) -Version: `1.4.4` +Version: `1.4.5` # SMI Services diff --git a/src/SharedAssemblyInfo.cs b/src/SharedAssemblyInfo.cs index 3c49c222b..656fe6a01 100644 --- a/src/SharedAssemblyInfo.cs +++ b/src/SharedAssemblyInfo.cs @@ -7,6 +7,6 @@ [assembly: AssemblyCulture("")] // These should be overwritten by release builds -[assembly: AssemblyVersion("1.4.4")] -[assembly: AssemblyFileVersion("1.4.4")] -[assembly: AssemblyInformationalVersion("1.4.4")] // This one can have the extra build info after it +[assembly: AssemblyVersion("1.4.5")] +[assembly: AssemblyFileVersion("1.4.5")] +[assembly: AssemblyInformationalVersion("1.4.5")] // This one can have the extra build info after it diff --git a/src/common/Smi.Common/Messaging/Consumer.cs b/src/common/Smi.Common/Messaging/Consumer.cs index fc59469ac..76986da61 100644 --- a/src/common/Smi.Common/Messaging/Consumer.cs +++ b/src/common/Smi.Common/Messaging/Consumer.cs @@ -38,6 +38,10 @@ public abstract class Consumer : IConsumer protected IModel Model; + public virtual void Shutdown() + { + + } protected Consumer() { diff --git a/src/common/Smi.Common/Messaging/IConsumer.cs b/src/common/Smi.Common/Messaging/IConsumer.cs index e64114957..cc6f16f8d 100644 --- a/src/common/Smi.Common/Messaging/IConsumer.cs +++ b/src/common/Smi.Common/Messaging/IConsumer.cs @@ -26,5 +26,10 @@ public interface IConsumer /// /// event ConsumerFatalHandler OnFatal; + + /// + /// Trigger a clean shutdown of worker threads etc + /// + void Shutdown(); } } \ No newline at end of file diff --git a/src/common/Smi.Common/RabbitMQAdapter.cs b/src/common/Smi.Common/RabbitMQAdapter.cs index e6b480d74..f7e52f181 100644 --- a/src/common/Smi.Common/RabbitMQAdapter.cs +++ b/src/common/Smi.Common/RabbitMQAdapter.cs @@ -396,6 +396,7 @@ public void Shutdown(int timeout = 5000) /// private void Consume(ISubscription subscription, IConsumer consumer, CancellationToken cancellationToken) { + ReaderWriterLockSlim worklock = new ReaderWriterLockSlim(); IModel m = subscription.Model; consumer.SetModel(m); @@ -409,13 +410,32 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio { Task.Run(() => { - consumer.ProcessMessage(e); + worklock.EnterReadLock(); + try + { + consumer.ProcessMessage(e); + } + finally + { + worklock.ExitReadLock(); + } },cancellationToken); } else consumer.ProcessMessage(e); } } + if (_threaded) + { + // Taking a write lock means waiting for all read locks to + // release, i.e. all workers have finished + worklock.EnterWriteLock(); + + // Now there are no *new* messages being processed, flush the queue + consumer.Shutdown(); + worklock.ExitWriteLock(); + } + worklock.Dispose(); string reason = "unknown"; diff --git a/src/microservices/Microservices.IdentifierMapper/Messaging/IdentifierMapperQueueConsumer.cs b/src/microservices/Microservices.IdentifierMapper/Messaging/IdentifierMapperQueueConsumer.cs index 6d9c8731f..1d25a08bc 100644 --- a/src/microservices/Microservices.IdentifierMapper/Messaging/IdentifierMapperQueueConsumer.cs +++ b/src/microservices/Microservices.IdentifierMapper/Messaging/IdentifierMapperQueueConsumer.cs @@ -30,33 +30,50 @@ public IdentifierMapperQueueConsumer(IProducerModel producer, ISwapIdentifiers s _swapper = swapper; acker=new Thread(() => { - while (true) + try { - List> done = new List>(); - Tuple t; - t = msgq.Take(); - - lock (_producer) + while (true) { - _producer.SendMessage(t.Item1, t.Item2, ""); - done.Add(new Tuple(t.Item2, t.Item3)); - while (msgq.TryTake(out t)) + List> done = new List>(); + Tuple t; + t = msgq.Take(); + + lock (_producer) { _producer.SendMessage(t.Item1, t.Item2, ""); done.Add(new Tuple(t.Item2, t.Item3)); - } - _producer.WaitForConfirms(); - foreach (var ack in done) - { - Ack(ack.Item1, ack.Item2); + while (msgq.TryTake(out t)) + { + _producer.SendMessage(t.Item1, t.Item2, ""); + done.Add(new Tuple(t.Item2, t.Item3)); + } + _producer.WaitForConfirms(); + foreach (var ack in done) + { + Ack(ack.Item1, ack.Item2); + } } } } + catch (InvalidOperationException) + { + // The BlockingCollection will throw this exception when closed by Shutdown() + return; + } }); acker.IsBackground = true; acker.Start(); } + /// + /// Cleanly shut this process down, draining the Ack queue and ending that thread + /// + public override void Shutdown() + { + msgq.CompleteAdding(); + acker.Join(); + } + protected override void ProcessMessageImpl(IMessageHeader header, BasicDeliverEventArgs deliverArgs) { DicomFileMessage msg;