Skip to content

Commit

Permalink
TimeoutLockless PCU agent (#600)
Browse files Browse the repository at this point in the history
* PCU agent restructure

* Adds back agg_param frame-length

* Adds inline callback dec

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Debugged agent.py

Added 'hold' option to process_action function.

* Updated 'stop' phase definition

* 'hold' is replaced with 'stop'

Strictly speaking, the new 'hold' operation mode doesn't hold the hwp. It just stops the motor from spinning the hwp.

* Update hwp_pcu.py -- added 'failed' to get_status responses

Status 'failed' is obtained when relay_read apparently failed.

* fix typo and docs

* fix typo

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: jsugiyama <[email protected]>
Co-authored-by: ykyohei <[email protected]>
  • Loading branch information
4 people authored Dec 23, 2023
1 parent 9a26191 commit 06ddfd4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 182 deletions.
4 changes: 3 additions & 1 deletion docs/agents/hwp_pcu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
HWP PCU Agent
=============

The HWP Phase Compensation Unit Agent interfaces with a 8 channel USB relay module
The HWP Phase Compensation Unit (PCU) Agent interfaces with a 8 channel USB relay module
(Numato Lab, product Number SKU:RL80001) to apply the discrete phase compensation in
120-degree increments for the HWP motor drive circuit. When used in conjunction with
a HWP pid controller, phase compensation in 60-degree increments can be achieved.
The HWP PCU can also force the HWP to stop by making the phases of all three-phase
motors the same.

.. argparse::
:filename: ../socs/agents/hwp_pcu/agent.py
Expand Down
279 changes: 102 additions & 177 deletions socs/agents/hwp_pcu/agent.py
Original file line number Diff line number Diff line change
@@ -1,210 +1,147 @@
import argparse
import time
from dataclasses import dataclass
from queue import Queue

import txaio
from twisted.internet import defer

txaio.use_twisted()

from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
from twisted.internet import reactor

import socs.agents.hwp_pcu.drivers.hwp_pcu as pcu


class Actions:
class BaseAction:
def __post_init__(self):
self.deferred = defer.Deferred()
self.log = txaio.make_logger()

@dataclass
class SendCommand (BaseAction):
command: str


def process_action(action, PCU: pcu.PCU):
"""Process an action with PCU hardware"""
if isinstance(action, Actions.SendCommand):
off_channel = []
on_channel = []
if action.command == 'off':
off_channel = [0, 1, 2, 5, 6, 7]
on_channel = []
elif action.command == 'on_1':
off_channel = [5, 6, 7]
on_channel = [0, 1, 2]
elif action.command == 'on_2':
on_channel = [0, 1, 2, 5, 6, 7]
off_channel = []
elif action.command == 'stop':
on_channel = [1, 2, 5]
off_channel = [0, 6, 7]

action.log.info(f"Command: {action.command}")
action.log.info(f" Off channels: {off_channel}")
action.log.info(f" On channels: {on_channel}")
for i in off_channel:
PCU.relay_off(i)
for i in on_channel:
PCU.relay_on(i)

return dict(off_channel=off_channel, on_channel=on_channel)


class HWPPCUAgent:
"""Agent to phase compensation improve the CHWP motor efficiency
Args:
agent (ocs.ocs_agent.OCSAgent): Instantiated OCSAgent class for this agent
port (str): Path to USB device in '/dev/'
"""

def __init__(self, agent, port):
self.agent: ocs_agent.OCSAgent = agent
self.log = agent.log
self.lock = TimeoutLock()
self.initialized = False
self.take_data = False
self.port = port
self.status = 'off'
self.action_queue = Queue()

agg_params = {'frame_length': 60}
self.agent.register_feed(
'hwppcu', record=True, agg_params=agg_params)

@ocs_agent.param('auto_acquire', default=False, type=bool)
@ocs_agent.param('force', default=False, type=bool)
def init_connection(self, session, params):
"""init_connection(auto_acquire=False, force=False)
**Task** - Initialize connection to PCU
Controller.
Parameters:
auto_acquire (bool, optional): Default is False. Starts data
acquisition after initialization if True.
force (bool, optional): Force initialization, even if already
initialized. Defaults to False.
"""
if self.initialized and not params['force']:
self.log.info("Connection already initialized. Returning...")
return True, "Connection already initialized"

with self.lock.acquire_timeout(3, job='init_connection') as acquired:
if not acquired:
self.log.warn(
'Could not run init_connection because {} is already running'.format(self.lock.job))
return False, 'Could not acquire lock'

try:
self.PCU = pcu.PCU(port=self.port)
self.log.info('Connected to PCU')
except BrokenPipeError:
self.log.error('Could not establish connection to PCU')
reactor.callFromThread(reactor.stop)
return False, 'Unable to connect to PCU'

self.status = self.PCU.get_status()
self.initialized = True

# Start 'acq' Process if requested
if params['auto_acquire']:
self.agent.start('acq')

return True, 'Connection to PCU established'

@ocs_agent.param('command', default='off', type=str, choices=['off', 'on_1', 'on_2', 'hold'])
@defer.inlineCallbacks
@ocs_agent.param('command', default='off', type=str, choices=['off', 'on_1', 'on_2', 'stop'])
def send_command(self, session, params):
"""send_command(command)
**Task** - Send commands to the phase compensation unit.
off: The compensation phase is zero.
on_1: The compensation phase is +120 deg.
on_2: The compensation phase is -120 deg.
hold: Stop the HWP spin.
stop: Stop the HWP spin.
Parameters:
command (str): set the operation mode from 'off', 'on_1', 'on_2' or 'hold'.
command (str): set the operation mode from 'off', 'on_1', 'on_2' or 'stop'.
"""
with self.lock.acquire_timeout(3, job='send_command') as acquired:
if not acquired:
self.log.warn('Could not send command because {} is already running'.format(self.lock.job))
return False, 'Could not acquire lock'

command = params['command']
if command == 'off':
off_channel = [0, 1, 2, 5, 6, 7]
for i in off_channel:
self.PCU.relay_off(i)
self.status = 'off'
return True, 'Phase compensation is "off".'

elif command == 'on_1':
on_channel = [0, 1, 2]
off_channel = [5, 6, 7]
for i in on_channel:
self.PCU.relay_on(i)
for i in off_channel:
self.PCU.relay_off(i)
self.status = 'on_1'
return True, 'Phase compensation operates "on_1".'

elif command == 'on_2':
on_channel = [0, 1, 2, 5, 6, 7]
for i in on_channel:
self.PCU.relay_on(i)
self.status = 'on_2'
return True, 'Phase compensation operates "on_2".'

elif command == 'hold':
on_channel = [0, 1, 2, 5]
off_channel = [6, 7]
for i in on_channel:
self.PCU.relay_on(i)
for i in off_channel:
self.PCU.relay_off(i)
self.status = 'hold'
return True, 'Phase compensation operates "hold".'

else:
return True, "Choose the command from 'off', 'on_1', 'on_2' and 'hold'."

def get_status(self, session, params):
"""get_status()
**Task** - Return the status of the PCU.
action = Actions.SendCommand(**params)
self.action_queue.put(action)
session.data = yield action.deferred
return True, f"Set relays for cmd={action.command}"

def _process_actions(self, PCU: pcu.PCU):
while not self.action_queue.empty():
action = self.action_queue.get()
try:
self.log.info(f"Running action {action}")
res = process_action(action, PCU)
action.deferred.callback(res)
except Exception as e:
self.log.error(f"Error processing action: {action}")
action.deferred.errback(e)

def _get_and_publish_data(self, PCU: pcu.PCU, session):
now = time.time()
data = {'timestamp': now,
'block_name': 'hwppcu',
'data': {}}
status = PCU.get_status()
data['data']['status'] = status
self.agent.publish_to_feed('hwppcu', data)
session.data = {'status': status, 'last_updated': now}

def main(self, session, params):
"""
with self.lock.acquire_timeout(3, job='get_status') as acquired:
if not acquired:
self.log.warn(
'Could not get status because {} is already running'.format(self.lock.job))
return False, 'Could not acquire lock'

self.status = self.PCU.get_status()

return True, 'Current status is ' + self.status

def acq(self, session, params):
"""acq()
**Process** - Start PCU data acquisition.
Notes:
The most recent data collected is stored in the session data in the
structure::
>>> response.session['data']
{'status': 'on_1',
'last_updated': 1649085992.719602}
**Process** - Main process for PCU agent.
"""
with self.lock.acquire_timeout(timeout=3, job='acq') as acquired:
if not acquired:
self.log.warn('Could not start pcu acq because {} is already running'
.format(self.lock.job))
return False, 'Could not acquire lock'

session.set_status('running')
last_release = time.time()
self.take_data = True

while self.take_data:
# Relinquish sampling lock occasionally.
if time.time() - last_release > 1.:
last_release = time.time()
if not self.lock.release_and_acquire(timeout=10):
self.log.warn(f"Failed to re-acquire sampling lock, "
f"currently held by {self.lock.job}.")
continue
PCU = pcu.PCU(port=self.port)
self.log.info('Connected to PCU')

data = {'timestamp': time.time(),
'block_name': 'hwppcu', 'data': {}}
session.set_status('running')
while not self.action_queue.empty():
action = self.action_queue.get()
action.deferred.errback(Exception("Action cancelled"))

# status = self.PCU.get_status()
status = self.status
data['data']['status'] = status
last_daq = 0
while session.status in ['starting', 'running']:
now = time.time()
if now - last_daq > 5:
self._get_and_publish_data(PCU, session)
last_daq = now

self.agent.publish_to_feed('hwppcu', data)
self._process_actions(PCU)
time.sleep(0.1)

session.data = {'status': status,
'last_updated': time.time()}
PCU.close()

time.sleep(5)

self.agent.feeds['hwppcu'].flush_buffer()
return True, 'Acqusition exited cleanly'

def _stop_acq(self, session, params):
def _stop_main(self, session, params):
"""
Stop acq process.
"""
if self.take_data:
self.PCU.close()
self.take_data = False
return True, 'requested to stop taking data'

return False, 'acq is not currently running'
session.set_status('stopping')
return True, 'Set main status to stopping'


def make_parser(parser=None):
Expand All @@ -217,10 +154,7 @@ def make_parser(parser=None):

# Add options specific to this agent
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--port', type=str, help="Path to USB node for the lakeshore")
pgroup.add_argument('--mode', type=str, default='acq',
choices=['init', 'acq'],
help="Starting operation for the Agent.")
pgroup.add_argument('--port', type=str, help="Path to USB node for the PCU")
return parser


Expand All @@ -230,21 +164,12 @@ def main(args=None):
parser=parser,
args=args)

init_params = False
if args.mode == 'init':
init_params = {'auto_acquire': False}
elif args.mode == 'acq':
init_params = {'auto_acquire': True}

agent, runner = ocs_agent.init_site_agent(args)
hwppcu_agent = HWPPCUAgent(agent,
port=args.port)
agent.register_task('init_connection', hwppcu_agent.init_connection,
startup=init_params)
agent.register_process('acq', hwppcu_agent.acq,
hwppcu_agent._stop_acq)
agent.register_task('send_command', hwppcu_agent.send_command)
agent.register_task('get_status', hwppcu_agent.get_status)
agent.register_task('send_command', hwppcu_agent.send_command, blocking=False)
agent.register_process(
'main', hwppcu_agent.main, hwppcu_agent._stop_main, startup=True)

runner.run(agent, auto_reconnect=True)

Expand Down
10 changes: 6 additions & 4 deletions socs/agents/hwp_pcu/drivers/hwp_pcu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class PCU:
port (str): Path to USB device in '/dev/'
Attributes:
status (str): The status of the unit (off/on_1/on_2/hold)
status (str): The status of the unit (off/on_1/on_2/stop)
"""

def __init__(self, port):
Expand Down Expand Up @@ -54,7 +54,7 @@ def get_status(self):
off: The compensation phase is zero.
on_1: The compensation phase is +120 deg.
on_2: The compensation phase is -120 deg.
hold: Stop the HWP spin.
stop: Stop the HWP spin.
"""
channel = [0, 1, 2, 5, 6, 7]
channel_switch = []
Expand All @@ -67,7 +67,9 @@ def get_status(self):
return 'on_1'
elif channel_switch == [True, True, True, True, True, True]:
return 'on_2'
elif channel_switch == [True, True, True, True, False, False]:
return 'hold'
elif channel_switch == [False, True, True, True, False, False]:
return 'stop'
elif -1 in channel_switch:
return 'failed'
else:
return 'undefined'

0 comments on commit 06ddfd4

Please sign in to comment.