diff --git a/docs/agents/hwp_pcu.rst b/docs/agents/hwp_pcu.rst index b1ff8b6c6..7b9169508 100644 --- a/docs/agents/hwp_pcu.rst +++ b/docs/agents/hwp_pcu.rst @@ -6,10 +6,12 @@ HWP PCU Agent ============= -The HWP Phase Compensation Unit Agent interfaces with a 8 channel USB relay module +The HWP Phase Compensation Unit (PCU) Agent interfaces with a 8 channel USB relay module (Numato Lab, product Number SKU:RL80001) to apply the discrete phase compensation in 120-degree increments for the HWP motor drive circuit. When used in conjunction with a HWP pid controller, phase compensation in 60-degree increments can be achieved. +The HWP PCU can also force the HWP to stop by making the phases of all three-phase +motors the same. .. argparse:: :filename: ../socs/agents/hwp_pcu/agent.py diff --git a/socs/agents/hwp_pcu/agent.py b/socs/agents/hwp_pcu/agent.py index 0b9f220c7..f5c4d77fd 100644 --- a/socs/agents/hwp_pcu/agent.py +++ b/socs/agents/hwp_pcu/agent.py @@ -1,78 +1,78 @@ import argparse import time +from dataclasses import dataclass +from queue import Queue + +import txaio +from twisted.internet import defer + +txaio.use_twisted() from ocs import ocs_agent, site_config -from ocs.ocs_twisted import TimeoutLock -from twisted.internet import reactor import socs.agents.hwp_pcu.drivers.hwp_pcu as pcu +class Actions: + class BaseAction: + def __post_init__(self): + self.deferred = defer.Deferred() + self.log = txaio.make_logger() + + @dataclass + class SendCommand (BaseAction): + command: str + + +def process_action(action, PCU: pcu.PCU): + """Process an action with PCU hardware""" + if isinstance(action, Actions.SendCommand): + off_channel = [] + on_channel = [] + if action.command == 'off': + off_channel = [0, 1, 2, 5, 6, 7] + on_channel = [] + elif action.command == 'on_1': + off_channel = [5, 6, 7] + on_channel = [0, 1, 2] + elif action.command == 'on_2': + on_channel = [0, 1, 2, 5, 6, 7] + off_channel = [] + elif action.command == 'stop': + on_channel = [1, 2, 5] + off_channel = [0, 6, 7] + + action.log.info(f"Command: {action.command}") + action.log.info(f" Off channels: {off_channel}") + action.log.info(f" On channels: {on_channel}") + for i in off_channel: + PCU.relay_off(i) + for i in on_channel: + PCU.relay_on(i) + + return dict(off_channel=off_channel, on_channel=on_channel) + + class HWPPCUAgent: """Agent to phase compensation improve the CHWP motor efficiency Args: agent (ocs.ocs_agent.OCSAgent): Instantiated OCSAgent class for this agent port (str): Path to USB device in '/dev/' - """ def __init__(self, agent, port): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log - self.lock = TimeoutLock() - self.initialized = False - self.take_data = False self.port = port - self.status = 'off' + self.action_queue = Queue() agg_params = {'frame_length': 60} self.agent.register_feed( 'hwppcu', record=True, agg_params=agg_params) - @ocs_agent.param('auto_acquire', default=False, type=bool) - @ocs_agent.param('force', default=False, type=bool) - def init_connection(self, session, params): - """init_connection(auto_acquire=False, force=False) - - **Task** - Initialize connection to PCU - Controller. - - Parameters: - auto_acquire (bool, optional): Default is False. Starts data - acquisition after initialization if True. - force (bool, optional): Force initialization, even if already - initialized. Defaults to False. - - """ - if self.initialized and not params['force']: - self.log.info("Connection already initialized. Returning...") - return True, "Connection already initialized" - - with self.lock.acquire_timeout(3, job='init_connection') as acquired: - if not acquired: - self.log.warn( - 'Could not run init_connection because {} is already running'.format(self.lock.job)) - return False, 'Could not acquire lock' - - try: - self.PCU = pcu.PCU(port=self.port) - self.log.info('Connected to PCU') - except BrokenPipeError: - self.log.error('Could not establish connection to PCU') - reactor.callFromThread(reactor.stop) - return False, 'Unable to connect to PCU' - - self.status = self.PCU.get_status() - self.initialized = True - - # Start 'acq' Process if requested - if params['auto_acquire']: - self.agent.start('acq') - - return True, 'Connection to PCU established' - - @ocs_agent.param('command', default='off', type=str, choices=['off', 'on_1', 'on_2', 'hold']) + @defer.inlineCallbacks + @ocs_agent.param('command', default='off', type=str, choices=['off', 'on_1', 'on_2', 'stop']) def send_command(self, session, params): """send_command(command) @@ -80,131 +80,68 @@ def send_command(self, session, params): off: The compensation phase is zero. on_1: The compensation phase is +120 deg. on_2: The compensation phase is -120 deg. - hold: Stop the HWP spin. + stop: Stop the HWP spin. Parameters: - command (str): set the operation mode from 'off', 'on_1', 'on_2' or 'hold'. + command (str): set the operation mode from 'off', 'on_1', 'on_2' or 'stop'. """ - with self.lock.acquire_timeout(3, job='send_command') as acquired: - if not acquired: - self.log.warn('Could not send command because {} is already running'.format(self.lock.job)) - return False, 'Could not acquire lock' - - command = params['command'] - if command == 'off': - off_channel = [0, 1, 2, 5, 6, 7] - for i in off_channel: - self.PCU.relay_off(i) - self.status = 'off' - return True, 'Phase compensation is "off".' - - elif command == 'on_1': - on_channel = [0, 1, 2] - off_channel = [5, 6, 7] - for i in on_channel: - self.PCU.relay_on(i) - for i in off_channel: - self.PCU.relay_off(i) - self.status = 'on_1' - return True, 'Phase compensation operates "on_1".' - - elif command == 'on_2': - on_channel = [0, 1, 2, 5, 6, 7] - for i in on_channel: - self.PCU.relay_on(i) - self.status = 'on_2' - return True, 'Phase compensation operates "on_2".' - - elif command == 'hold': - on_channel = [0, 1, 2, 5] - off_channel = [6, 7] - for i in on_channel: - self.PCU.relay_on(i) - for i in off_channel: - self.PCU.relay_off(i) - self.status = 'hold' - return True, 'Phase compensation operates "hold".' - - else: - return True, "Choose the command from 'off', 'on_1', 'on_2' and 'hold'." - - def get_status(self, session, params): - """get_status() - - **Task** - Return the status of the PCU. - + action = Actions.SendCommand(**params) + self.action_queue.put(action) + session.data = yield action.deferred + return True, f"Set relays for cmd={action.command}" + + def _process_actions(self, PCU: pcu.PCU): + while not self.action_queue.empty(): + action = self.action_queue.get() + try: + self.log.info(f"Running action {action}") + res = process_action(action, PCU) + action.deferred.callback(res) + except Exception as e: + self.log.error(f"Error processing action: {action}") + action.deferred.errback(e) + + def _get_and_publish_data(self, PCU: pcu.PCU, session): + now = time.time() + data = {'timestamp': now, + 'block_name': 'hwppcu', + 'data': {}} + status = PCU.get_status() + data['data']['status'] = status + self.agent.publish_to_feed('hwppcu', data) + session.data = {'status': status, 'last_updated': now} + + def main(self, session, params): """ - with self.lock.acquire_timeout(3, job='get_status') as acquired: - if not acquired: - self.log.warn( - 'Could not get status because {} is already running'.format(self.lock.job)) - return False, 'Could not acquire lock' - - self.status = self.PCU.get_status() - - return True, 'Current status is ' + self.status - - def acq(self, session, params): - """acq() - - **Process** - Start PCU data acquisition. - - Notes: - The most recent data collected is stored in the session data in the - structure:: - - >>> response.session['data'] - {'status': 'on_1', - 'last_updated': 1649085992.719602} - + **Process** - Main process for PCU agent. """ - with self.lock.acquire_timeout(timeout=3, job='acq') as acquired: - if not acquired: - self.log.warn('Could not start pcu acq because {} is already running' - .format(self.lock.job)) - return False, 'Could not acquire lock' - - session.set_status('running') - last_release = time.time() - self.take_data = True - - while self.take_data: - # Relinquish sampling lock occasionally. - if time.time() - last_release > 1.: - last_release = time.time() - if not self.lock.release_and_acquire(timeout=10): - self.log.warn(f"Failed to re-acquire sampling lock, " - f"currently held by {self.lock.job}.") - continue + PCU = pcu.PCU(port=self.port) + self.log.info('Connected to PCU') - data = {'timestamp': time.time(), - 'block_name': 'hwppcu', 'data': {}} + session.set_status('running') + while not self.action_queue.empty(): + action = self.action_queue.get() + action.deferred.errback(Exception("Action cancelled")) - # status = self.PCU.get_status() - status = self.status - data['data']['status'] = status + last_daq = 0 + while session.status in ['starting', 'running']: + now = time.time() + if now - last_daq > 5: + self._get_and_publish_data(PCU, session) + last_daq = now - self.agent.publish_to_feed('hwppcu', data) + self._process_actions(PCU) + time.sleep(0.1) - session.data = {'status': status, - 'last_updated': time.time()} + PCU.close() - time.sleep(5) - - self.agent.feeds['hwppcu'].flush_buffer() - return True, 'Acqusition exited cleanly' - - def _stop_acq(self, session, params): + def _stop_main(self, session, params): """ Stop acq process. """ - if self.take_data: - self.PCU.close() - self.take_data = False - return True, 'requested to stop taking data' - - return False, 'acq is not currently running' + session.set_status('stopping') + return True, 'Set main status to stopping' def make_parser(parser=None): @@ -217,10 +154,7 @@ def make_parser(parser=None): # Add options specific to this agent pgroup = parser.add_argument_group('Agent Options') - pgroup.add_argument('--port', type=str, help="Path to USB node for the lakeshore") - pgroup.add_argument('--mode', type=str, default='acq', - choices=['init', 'acq'], - help="Starting operation for the Agent.") + pgroup.add_argument('--port', type=str, help="Path to USB node for the PCU") return parser @@ -230,21 +164,12 @@ def main(args=None): parser=parser, args=args) - init_params = False - if args.mode == 'init': - init_params = {'auto_acquire': False} - elif args.mode == 'acq': - init_params = {'auto_acquire': True} - agent, runner = ocs_agent.init_site_agent(args) hwppcu_agent = HWPPCUAgent(agent, port=args.port) - agent.register_task('init_connection', hwppcu_agent.init_connection, - startup=init_params) - agent.register_process('acq', hwppcu_agent.acq, - hwppcu_agent._stop_acq) - agent.register_task('send_command', hwppcu_agent.send_command) - agent.register_task('get_status', hwppcu_agent.get_status) + agent.register_task('send_command', hwppcu_agent.send_command, blocking=False) + agent.register_process( + 'main', hwppcu_agent.main, hwppcu_agent._stop_main, startup=True) runner.run(agent, auto_reconnect=True) diff --git a/socs/agents/hwp_pcu/drivers/hwp_pcu.py b/socs/agents/hwp_pcu/drivers/hwp_pcu.py index 7ea7e58d6..86132c8f5 100644 --- a/socs/agents/hwp_pcu/drivers/hwp_pcu.py +++ b/socs/agents/hwp_pcu/drivers/hwp_pcu.py @@ -10,7 +10,7 @@ class PCU: port (str): Path to USB device in '/dev/' Attributes: - status (str): The status of the unit (off/on_1/on_2/hold) + status (str): The status of the unit (off/on_1/on_2/stop) """ def __init__(self, port): @@ -54,7 +54,7 @@ def get_status(self): off: The compensation phase is zero. on_1: The compensation phase is +120 deg. on_2: The compensation phase is -120 deg. - hold: Stop the HWP spin. + stop: Stop the HWP spin. """ channel = [0, 1, 2, 5, 6, 7] channel_switch = [] @@ -67,7 +67,9 @@ def get_status(self): return 'on_1' elif channel_switch == [True, True, True, True, True, True]: return 'on_2' - elif channel_switch == [True, True, True, True, False, False]: - return 'hold' + elif channel_switch == [False, True, True, True, False, False]: + return 'stop' + elif -1 in channel_switch: + return 'failed' else: return 'undefined'