diff --git a/socs/agents/lakeshore240/agent.py b/socs/agents/lakeshore240/agent.py index 3f18a4e46..3de593958 100644 --- a/socs/agents/lakeshore240/agent.py +++ b/socs/agents/lakeshore240/agent.py @@ -6,15 +6,12 @@ import warnings from dataclasses import dataclass from typing import Any, Dict, Generator, Optional, Tuple - -import txaio -from twisted.internet import defer - -txaio.use_twisted() +import txaio # type: ignore from ocs import ocs_agent, site_config from ocs.ocs_twisted import Pacemaker - from socs.Lakeshore.Lakeshore240 import Module +txaio.use_twisted() + on_rtd = os.environ.get("READTHEDOCS") == "True" if not on_rtd: @@ -32,12 +29,31 @@ class Actions: class BaseAction: "Base class for all actions." - def __post_init__(self): - self.deferred = defer.Deferred() + def __post_init__(self) -> None: + self.processed: bool = False + self.success: bool = False + self.traceback: Optional[str] = None + self.result: Any = None + + def resolve_action( + self, + success: bool, + traceback: Optional[str] = None, + result: Optional[Any] = None + ) -> None: + self.success = success + self.traceback = traceback + self.result = result + self.processed = True def process(self, module: Module) -> ActionReturnType: raise NotImplementedError + def sleep_until_processed(self, interval=0.2) -> None: + while not self.processed: + time.sleep(interval) + + @dataclass class UploadCalCurve(BaseAction): """ @@ -96,11 +112,11 @@ def __init__( self.agent: ocs_agent.OCSAgent = agent self.port = port self.f_sample = f_sample - self.action_queue: queue.Queue = queue.Queue() + self.action_queue: queue.Queue[Actions.BaseAction] = queue.Queue() # Register Operaionts - agent.register_task("set_values", self.set_values, blocking=False) - agent.register_task("upload_cal_curve", self.upload_cal_curve, blocking=False) + agent.register_task("set_values", self.set_values) + agent.register_task("upload_cal_curve", self.upload_cal_curve) agent.register_process("main", self.main, self._stop_main, startup=True) # Registers Temperature and Voltage feeds @@ -111,12 +127,11 @@ def __init__( "temperatures", record=True, agg_params=agg_params, buffer_time=1 ) - @defer.inlineCallbacks def set_values( self, session: ocs_agent.OpSession, params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument - ) -> OcsInlineCallbackReturnType: + ) -> OcsOpReturnType: """set_values(channel, sensor=None, auto_range=None, range=None,\ current_reversal=None, units=None, enabled=None, name=None) @@ -151,17 +166,26 @@ def set_values( """ if params is None: params = {} + action = Actions.SetValues(**params) self.action_queue.put(action) - session.data = yield action.deferred + action.sleep_until_processed() + + if not action.success: + if action.traceback is not None: + self.agent.log.error(action.traceback) + return False, "Set values action failed" + + if action.result is not None: + session.data['result'] = action.result + return True, f"Set values for channel {action.channel}" - @defer.inlineCallbacks def upload_cal_curve( self, session: ocs_agent.OpSession, params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument - ) -> OcsInlineCallbackReturnType: + ) -> OcsOpReturnType: """upload_cal_curve(channel, filename) **Task** - Upload a calibration curve to a channel. @@ -174,7 +198,16 @@ def upload_cal_curve( params = {} action = Actions.UploadCalCurve(**params) self.action_queue.put(action) - session.data = yield action.deferred + action.sleep_until_processed() + + if not action.success: + if action.traceback is not None: + self.agent.log.error(action.traceback) + return False, "Action failed to process" + + if action.result is not None: + session.data['result'] = action.result + return True, f"Uploaded curve to channel {action.channel}" def _get_and_pub_temp_data( @@ -220,11 +253,11 @@ def _process_actions(self, module: Module) -> None: action = self.action_queue.get() try: log.info(f"Running action {action}") - res = action.process(module) - action.deferred.callback(res) + result = action.process(module) + action.resolve_action(True, result=result) except Exception as e: # pylint: disable=broad-except log.error(f"Error processing action: {action}") - action.deferred.errback(e) + action.resolve_action(False, traceback=traceback.format_exc()) return None def main( @@ -242,7 +275,7 @@ def main( # Clear pre-existing actions while not self.action_queue.empty(): action = self.action_queue.get() - action.deferred.errback(Exception("Action cancelled")) + action.resolve_action(False, traceback="Aborted by main process") exceptions_to_attempt_reconnect = (ConnectionError, TimeoutError)