From 9270c4f3e44e1acd494e7ad45bf49e36a1b7d3a5 Mon Sep 17 00:00:00 2001 From: Ruairidh MacLeod <5160559+rkm@users.noreply.github.com> Date: Fri, 14 Feb 2020 16:08:17 +0000 Subject: [PATCH] Add temporary fix so threaded RabbitMQ only occurs for IdentifierMapper --- src/common/Smi.Common/Execution/MicroserviceHost.cs | 5 ++++- src/common/Smi.Common/RabbitMQAdapter.cs | 13 ++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/common/Smi.Common/Execution/MicroserviceHost.cs b/src/common/Smi.Common/Execution/MicroserviceHost.cs index 866025471..fabad1fa7 100644 --- a/src/common/Smi.Common/Execution/MicroserviceHost.cs +++ b/src/common/Smi.Common/Execution/MicroserviceHost.cs @@ -111,7 +111,10 @@ protected MicroserviceHost(GlobalOptions globals, bool loadSmiLogConfig = true) // throw new ArgumentException("Could not locate the FileSystemRoot \"" + options.FileSystemRoot + "\""); OnFatal += (sender, args) => Fatal(args.Message, args.Exception); - RabbitMqAdapter = new RabbitMqAdapter(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal); + + // TODO(rkm 2020-02-14) This is only a temporary fix + bool threaded = (HostProcessName == "IdentifierMapper"); + RabbitMqAdapter = new RabbitMqAdapter(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal, threaded); _controlMessageConsumer = new ControlMessageConsumer(this, globals.RabbitOptions, HostProcessName, HostProcessID); diff --git a/src/common/Smi.Common/RabbitMQAdapter.cs b/src/common/Smi.Common/RabbitMQAdapter.cs index e409960c7..ab7175b76 100644 --- a/src/common/Smi.Common/RabbitMQAdapter.cs +++ b/src/common/Smi.Common/RabbitMQAdapter.cs @@ -56,7 +56,7 @@ public bool HasConsumers private const int MaxSubscriptionAttempts = 5; - private bool threaded; + private readonly bool _threaded; /// /// @@ -64,10 +64,13 @@ public bool HasConsumers /// Connection parameters to a RabbitMQ server /// Identifier for this host instance /// - public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler hostFatalHandler = null) + /// + public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler hostFatalHandler = null, bool threaded = false) { - threaded = options.ThreadReceivers; - if (threaded) + //_threaded = options.ThreadReceivers; + _threaded = threaded; + + if (_threaded) { int minWorker, minIOC; ThreadPool.GetMinThreads(out minWorker, out minIOC); @@ -399,7 +402,7 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio if (subscription.Next(500, out e)) { - if (threaded) + if (_threaded) Task.Run(() => consumer.ProcessMessage(e)); else consumer.ProcessMessage(e);