diff --git a/OpenEphys.Commutator/TurnMotor.cs b/OpenEphys.Commutator/TurnMotor.cs new file mode 100644 index 0000000..eca8c87 --- /dev/null +++ b/OpenEphys.Commutator/TurnMotor.cs @@ -0,0 +1,72 @@ +using System; +using System.ComponentModel; +using System.Reactive; +using System.Reactive.Linq; +using Bonsai; +using Bonsai.IO.Ports; + +namespace OpenEphys.Commutator +{ + /// + /// Turns an Open Ephys commutator using a sequence of angle steps, in units of turns. + /// + [Description("Turns an Open Ephys commutator using a sequence of angle steps, in units of turns.")] + public class TurnMotor : Sink + { + readonly SerialWriteLine serialWriteLine = new(); + + /// + /// Gets or sets the name of the serial port that the commutator is plugged into. + /// + [TypeConverter("Bonsai.IO.Ports.PortNameConverter, Bonsai.System")] + [Description("The name of the serial port that the commutator is plugged into.")] + public string PortName + { + get => serialWriteLine.PortName; + set => serialWriteLine.PortName = value; + } + + /// + /// Turns an Open Ephys commutator using a sequence of angle steps, in units of turns. + /// + /// + /// The sequence of angle steps, in units of turns, by which to rotate the commutator. + /// + /// + /// A sequence which is identical to the sequence, but + /// where the commutator is instructed to turn by each step value in the sequence. + /// + public override IObservable Process(IObservable source) + { + return Observable.Create(observer => + { + // inner observable will format commands to turn the motor + var commands = Observable.Create(commandObserver => + { + var valueObserver = Observer.Create( + turns => + { + // only format turns for valid values + if (!double.IsNaN(turns) && !double.IsInfinity(turns)) + { + var command = $"{{turn: {turns}}}"; + commandObserver.OnNext(command); + } + + // send all values to the output observer regardless + observer.OnNext(turns); + }, + commandObserver.OnError, + commandObserver.OnCompleted); + return source.SubscribeSafe(valueObserver); + }); + + // route termination notifications to the output observer + return serialWriteLine.Process(commands).SubscribeSafe(Observer.Create( + _ => { }, + observer.OnError, + observer.OnCompleted)); + }); + } + } +}