Skip to content

Commit

Permalink
Add named pipe output support.
Browse files Browse the repository at this point in the history
  • Loading branch information
kutukvpavel committed Oct 4, 2021
1 parent d5d3d64 commit f82fb8f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
2 changes: 2 additions & 0 deletions GPIBServer/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public Configuration(bool init)
OutputSeparationLabelFormat = "{0}_";
OutputRetries = 3;
OutputRetryDelayMilliseconds = 300;
PipeName = "GPIBServer_Broadcast_Pipe";
}

public string ScriptsFilter { get; set; }
Expand All @@ -45,6 +46,7 @@ public Configuration(bool init)
public string OutputSeparationLabelFormat { get; set; }
public int OutputRetries { get; set; }
public int OutputRetryDelayMilliseconds { get; set; }
public string PipeName { get; set; }

public string GetFullyQualifiedLogPath()
{
Expand Down
37 changes: 32 additions & 5 deletions GPIBServer/Output.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using NamedPipeWrapper;

namespace GPIBServer
{
Expand Down Expand Up @@ -58,17 +59,36 @@ public static void Dispose()
}
}

public static void Initialize(string dataPath, string terminalPath, CancellationToken cancel)
public static void Initialize(string dataPath, string terminalPath, string pipeName, CancellationToken cancel)
{
_Cancel = cancel;
TerminalLogPath = terminalPath;
DataPath = dataPath;
_PipeQueue = new BlockingCollection<Tuple<object, GpibResponseEventArgs>>();
_Pipe = new NamedPipeServer<string>(pipeName);
_Pipe.Start();
_PipeThread = new Thread(() =>
{
try
{
while (!cancel.IsCancellationRequested)
{
var d = _PipeQueue.Take(cancel);
_Pipe.PushMessage(DataConverter(d.Item1, d.Item2));
}
}
catch (OperationCanceledException)
{ }
});
_PipeThread.Start();
}

public static void QueueData(object sender, GpibResponseEventArgs e)
{
if (!e.Command.OutputResponse) return;
CheckInitialization();
var tuple = new Tuple<object, GpibResponseEventArgs>(sender, e);
_PipeQueue.Add(tuple);
string p = Separation switch
{
OutputSeparation.None => string.Empty,
Expand All @@ -86,7 +106,7 @@ public static void QueueData(object sender, GpibResponseEventArgs e)
_DataWriters.TryAdd(p, t);
}
}
_DataWriters[p].Queue(new Tuple<object, GpibResponseEventArgs>(sender, e));
_DataWriters[p].Queue(tuple);
}

public static void QueueTerminal(object sender, string data)
Expand Down Expand Up @@ -119,9 +139,18 @@ private static readonly ConcurrentDictionary<string, DataWriterThread> _DataWrit
= new ConcurrentDictionary<string, DataWriterThread>();
private static readonly ConcurrentDictionary<string, TerminalWriterThread> _TerminalWriters
= new ConcurrentDictionary<string, TerminalWriterThread>();
private static NamedPipeServer<string> _Pipe;
private static Thread _PipeThread;
private static BlockingCollection<Tuple<object, GpibResponseEventArgs>> _PipeQueue;

private static CancellationToken _Cancel;

private static string DataConverter(object s, GpibResponseEventArgs e)
{
return string.Format(LineFormat, e.TimeReceived,
(s as GpibController).Name, e.Instrument.Name, e.Command.CommandString, e.Response);
}

private static void CheckInitialization()
{
if (_Cancel == null) throw new InvalidOperationException("Output module not initialized.");
Expand All @@ -134,9 +163,7 @@ public DataWriterThread(string path, CancellationToken token) : base(path, token

protected override string ConvertData(Tuple<object, GpibResponseEventArgs> data)
{
return string.Format(LineFormat, data.Item2.TimeReceived,
(data.Item1 as GpibController).Name, data.Item2.Instrument.Name, data.Item2.Command.CommandString,
data.Item2.Response);
return DataConverter(data.Item1, data.Item2);
}
}

Expand Down
1 change: 1 addition & 0 deletions GPIBServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private static ExitCodes MainHelper(string[] args)
Output.ErrorOccurred += ErrorMessageSink;
Output.Initialize(Configuration.Instance.GetFullyQualifiedOutputPath(),
Configuration.Instance.GetFullyQualifiedLogPath(),
Configuration.Instance.PipeName,
Cancel.Token);
Script.ErrorOccured += ErrorMessageSink;
Serializer.ErrorOccured += ErrorMessageSink;
Expand Down

0 comments on commit f82fb8f

Please sign in to comment.