Skip to content

Commit

Permalink
Add temporary fix so threaded RabbitMQ only occurs for IdentifierMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
rkm committed Feb 14, 2020
1 parent 36fe53f commit 9270c4f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
5 changes: 4 additions & 1 deletion src/common/Smi.Common/Execution/MicroserviceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ protected MicroserviceHost(GlobalOptions globals, bool loadSmiLogConfig = true)
// throw new ArgumentException("Could not locate the FileSystemRoot \"" + options.FileSystemRoot + "\"");

OnFatal += (sender, args) => Fatal(args.Message, args.Exception);
RabbitMqAdapter = new RabbitMqAdapter(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal);

// TODO(rkm 2020-02-14) This is only a temporary fix
bool threaded = (HostProcessName == "IdentifierMapper");
RabbitMqAdapter = new RabbitMqAdapter(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal, threaded);

_controlMessageConsumer = new ControlMessageConsumer(this, globals.RabbitOptions, HostProcessName, HostProcessID);

Expand Down
13 changes: 8 additions & 5 deletions src/common/Smi.Common/RabbitMQAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,21 @@ public bool HasConsumers

private const int MaxSubscriptionAttempts = 5;

private bool threaded;
private readonly bool _threaded;

/// <summary>
///
/// </summary>
/// <param name="options">Connection parameters to a RabbitMQ server</param>
/// <param name="hostId">Identifier for this host instance</param>
/// <param name="hostFatalHandler"></param>
public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler hostFatalHandler = null)
/// <param name="threaded"></param>
public RabbitMqAdapter(RabbitOptions options, string hostId, HostFatalHandler hostFatalHandler = null, bool threaded = false)
{
threaded = options.ThreadReceivers;
if (threaded)
//_threaded = options.ThreadReceivers;
_threaded = threaded;

if (_threaded)
{
int minWorker, minIOC;
ThreadPool.GetMinThreads(out minWorker, out minIOC);
Expand Down Expand Up @@ -399,7 +402,7 @@ private void Consume(ISubscription subscription, IConsumer consumer, Cancellatio

if (subscription.Next(500, out e))
{
if (threaded)
if (_threaded)
Task.Run(() => consumer.ProcessMessage(e));
else
consumer.ProcessMessage(e);
Expand Down

0 comments on commit 9270c4f

Please sign in to comment.