Skip to content

Commit

Permalink
Automatic task registration from action
Browse files Browse the repository at this point in the history
  • Loading branch information
jlashner committed Sep 11, 2024
1 parent 74f6f98 commit d91f4d0
Showing 1 changed file with 102 additions and 96 deletions.
198 changes: 102 additions & 96 deletions socs/agents/lakeshore240/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import time
import traceback
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Generator, Optional, Tuple
import dataclasses
from dataclasses import dataclass, fields
from typing import Any, Dict, Generator, Optional, Tuple, Type

import txaio # type: ignore
from ocs import ocs_agent, site_config
Expand All @@ -29,20 +30,21 @@
class Actions:
"Namespace to hold action classes for the Lakeshore240 agent."

@dataclass
class BaseAction:
"Base class for all actions."

def __post_init__(self) -> None:
self.processed: bool = False
self.success: bool = False
self.traceback: Optional[str] = None
self.result: Any = None
self.result: ActionReturnType = None

def resolve_action(
self,
success: bool,
traceback: Optional[str] = None,
result: Optional[Any] = None
result: ActionReturnType = None
) -> None:
self.success = success
self.traceback = traceback
Expand All @@ -58,9 +60,16 @@ def sleep_until_processed(self, interval=0.2) -> None:

@dataclass
class UploadCalCurve(BaseAction):
"""
Action to upload a calibration curve to the lakeshore. For parameter
details, see the docstrings for the `upload_cal_curve` task.
"""upload_cal_curve(channel, filename)
**Task** - Upload a calibration curve to a channel.
Args
------
channel (int):
Channel number, 1-8.
filename (str):
Filename for calibration curve.
"""

channel: int
Expand All @@ -75,9 +84,38 @@ def process(self, module: Module) -> ActionReturnType:

@dataclass
class SetValues(BaseAction):
"""
Action to configure settings on the lakeshore. For parameter details,
see the docstrings for the `set_values` task.
"""set_values(channel, sensor=None, auto_range=None, range=None,\
current_reversal=None, units=None, enabled=None, name=None)
**Task** - Set sensor parameters for a Lakeshore240 Channel.
Args
---------
channel (int):
Channel number to set. Valid choices are 1-8.
sensor (int, optional):
Specifies sensor type. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible types.
auto_range (int, optional):
Specifies if channel should use autorange. Must be 0 or 1.
range (int, optional):
Specifies range if auto_range is false. Only settable for NTC
RTD. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible ranges.
current_reversal (int, optional):
Specifies if input current reversal is on or off.
Always 0 if input is a diode.
units (int, optional):
Specifies preferred units parameter, and sets the units for
alarm settings. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible units.
enabled (int, optional):
Sets if channel is enabled.
name (str, optional):
Sets name of channel.
"""

channel: int
Expand All @@ -104,6 +142,54 @@ def process(self, module: Module) -> ActionReturnType:
return None


def register_task_from_action(
agent: ocs_agent.OCSAgent,
name: str,
action_class: Type[Actions.BaseAction],
queue: queue.Queue[Actions.BaseAction]
) -> None:
"""
Registers an OCSTask from an Action type. This will define ocs_params based
on the dataclass fields, and set the task docstrings equal to the action
class docstrings.
"""

def task(
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]]=None
) -> OcsOpReturnType:
_params = {} if params is None else params
action = action_class(**_params)
queue.put(action)
action.sleep_until_processed()

if not action.success:
log.error("{name} failed to process...", name=name)
if action.traceback is not None:
log.error("traceback:\n{traceback}", traceback=action.traceback)
return False, f"{name} failed"

if action.result is not None:
session.data.update(action.result)

return True, f"{name} succeded"

task.__doc__ = action_class.__doc__

# Adds ocs parameters
for f in fields(action_class):
param_kwargs: Dict[str, Any] = {
'type': f.type,
}
if f.default != dataclasses.MISSING:
param_kwargs['default'] = f.default
if isinstance(f.metadata, dict):
param_kwargs.update(f.metadata.get('ocs_param_kwargs', {}))
task = ocs_agent.param(f.name, **param_kwargs)(task)

agent.register_task(name, task)


class LS240_Agent:
def __init__(
self,
Expand All @@ -117,8 +203,12 @@ def __init__(
self.action_queue: queue.Queue[Actions.BaseAction] = queue.Queue()

# Register Operaionts
agent.register_task("set_values", self.set_values)
agent.register_task("upload_cal_curve", self.upload_cal_curve)
register_task_from_action(
agent, 'set_values', Actions.SetValues, self.action_queue
)
register_task_from_action(
agent, 'upload_cal_curve', Actions.UploadCalCurve, self.action_queue
)
agent.register_process("main", self.main, self._stop_main, startup=True)

# Registers Temperature and Voltage feeds
Expand All @@ -129,89 +219,6 @@ def __init__(
"temperatures", record=True, agg_params=agg_params, buffer_time=1
)

def set_values(
self,
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument
) -> OcsOpReturnType:
"""set_values(channel, sensor=None, auto_range=None, range=None,\
current_reversal=None, units=None, enabled=None, name=None)
**Task** - Set sensor parameters for a Lakeshore240 Channel.
Args:
channel (int):
Channel number to set. Valid choices are 1-8.
sensor (int, optional):
Specifies sensor type. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible types.
auto_range (int, optional):
Specifies if channel should use autorange. Must be 0 or 1.
range (int, optional):
Specifies range if auto_range is false. Only settable for NTC
RTD. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible ranges.
current_reversal (int, optional):
Specifies if input current reversal is on or off.
Always 0 if input is a diode.
units (int, optional):
Specifies preferred units parameter, and sets the units for
alarm settings. See
:func:`socs.Lakeshore.Lakeshore240.Channel.set_values` for
possible units.
enabled (int, optional):
Sets if channel is enabled.
name (str, optional):
Sets name of channel.
"""
if params is None:
params = {}

action = Actions.SetValues(**params)
self.action_queue.put(action)
action.sleep_until_processed()

if not action.success:
if action.traceback is not None:
self.agent.log.error(action.traceback)
return False, "Set values action failed"

if action.result is not None:
session.data['result'] = action.result

return True, f"Set values for channel {action.channel}"

def upload_cal_curve(
self,
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument
) -> OcsOpReturnType:
"""upload_cal_curve(channel, filename)
**Task** - Upload a calibration curve to a channel.
Parameters:
channel (int): Channel number, 1-8.
filename (str): Filename for calibration curve.
"""
if params is None:
params = {}
action = Actions.UploadCalCurve(**params)
self.action_queue.put(action)
action.sleep_until_processed()

if not action.success:
if action.traceback is not None:
self.agent.log.error(action.traceback)
return False, "Action failed to process"

if action.result is not None:
session.data['result'] = action.result

return True, f"Uploaded curve to channel {action.channel}"

def _get_and_pub_temp_data(
self, module: Module, session: ocs_agent.OpSession
) -> Dict[str, Any]:
Expand Down Expand Up @@ -260,7 +267,6 @@ def _process_actions(self, module: Module) -> None:
except Exception: # pylint: disable=broad-except
log.error(f"Error processing action: {action}")
action.resolve_action(False, traceback=traceback.format_exc())
return None

def main(
self,
Expand Down

0 comments on commit d91f4d0

Please sign in to comment.