Skip to content

Commit

Permalink
Move tasks into blocking threads
Browse files Browse the repository at this point in the history
  • Loading branch information
jlashner committed Sep 10, 2024
1 parent da7ccb0 commit 54854c9
Showing 1 changed file with 54 additions and 21 deletions.
75 changes: 54 additions & 21 deletions socs/agents/lakeshore240/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Generator, Optional, Tuple

import txaio
from twisted.internet import defer

txaio.use_twisted()
import txaio # type: ignore
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker

from socs.Lakeshore.Lakeshore240 import Module
txaio.use_twisted()


on_rtd = os.environ.get("READTHEDOCS") == "True"
if not on_rtd:
Expand All @@ -32,12 +29,31 @@ class Actions:
class BaseAction:
"Base class for all actions."

def __post_init__(self):
self.deferred = defer.Deferred()
def __post_init__(self) -> None:
self.processed: bool = False
self.success: bool = False
self.traceback: Optional[str] = None
self.result: Any = None

def resolve_action(
self,
success: bool,
traceback: Optional[str] = None,
result: Optional[Any] = None
) -> None:
self.success = success
self.traceback = traceback
self.result = result
self.processed = True

def process(self, module: Module) -> ActionReturnType:
raise NotImplementedError

def sleep_until_processed(self, interval=0.2) -> None:
while not self.processed:
time.sleep(interval)


@dataclass
class UploadCalCurve(BaseAction):
"""
Expand Down Expand Up @@ -96,11 +112,11 @@ def __init__(
self.agent: ocs_agent.OCSAgent = agent
self.port = port
self.f_sample = f_sample
self.action_queue: queue.Queue = queue.Queue()
self.action_queue: queue.Queue[Actions.BaseAction] = queue.Queue()

# Register Operaionts
agent.register_task("set_values", self.set_values, blocking=False)
agent.register_task("upload_cal_curve", self.upload_cal_curve, blocking=False)
agent.register_task("set_values", self.set_values)
agent.register_task("upload_cal_curve", self.upload_cal_curve)
agent.register_process("main", self.main, self._stop_main, startup=True)

# Registers Temperature and Voltage feeds
Expand All @@ -111,12 +127,11 @@ def __init__(
"temperatures", record=True, agg_params=agg_params, buffer_time=1
)

@defer.inlineCallbacks
def set_values(
self,
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument
) -> OcsInlineCallbackReturnType:
) -> OcsOpReturnType:
"""set_values(channel, sensor=None, auto_range=None, range=None,\
current_reversal=None, units=None, enabled=None, name=None)
Expand Down Expand Up @@ -151,17 +166,26 @@ def set_values(
"""
if params is None:
params = {}

action = Actions.SetValues(**params)
self.action_queue.put(action)
session.data = yield action.deferred
action.sleep_until_processed()

if not action.success:
if action.traceback is not None:
self.agent.log.error(action.traceback)
return False, "Set values action failed"

if action.result is not None:
session.data['result'] = action.result

return True, f"Set values for channel {action.channel}"

@defer.inlineCallbacks
def upload_cal_curve(
self,
session: ocs_agent.OpSession,
params: Optional[Dict[str, Any]] = None, # pylint: disable=unused-argument
) -> OcsInlineCallbackReturnType:
) -> OcsOpReturnType:
"""upload_cal_curve(channel, filename)
**Task** - Upload a calibration curve to a channel.
Expand All @@ -174,7 +198,16 @@ def upload_cal_curve(
params = {}
action = Actions.UploadCalCurve(**params)
self.action_queue.put(action)
session.data = yield action.deferred
action.sleep_until_processed()

if not action.success:
if action.traceback is not None:
self.agent.log.error(action.traceback)
return False, "Action failed to process"

if action.result is not None:
session.data['result'] = action.result

return True, f"Uploaded curve to channel {action.channel}"

def _get_and_pub_temp_data(
Expand Down Expand Up @@ -220,11 +253,11 @@ def _process_actions(self, module: Module) -> None:
action = self.action_queue.get()
try:
log.info(f"Running action {action}")
res = action.process(module)
action.deferred.callback(res)
result = action.process(module)
action.resolve_action(True, result=result)
except Exception as e: # pylint: disable=broad-except
log.error(f"Error processing action: {action}")
action.deferred.errback(e)
action.resolve_action(False, traceback=traceback.format_exc())
return None

def main(
Expand All @@ -242,7 +275,7 @@ def main(
# Clear pre-existing actions
while not self.action_queue.empty():
action = self.action_queue.get()
action.deferred.errback(Exception("Action cancelled"))
action.resolve_action(False, traceback="Aborted by main process")

exceptions_to_attempt_reconnect = (ConnectionError, TimeoutError)

Expand Down

0 comments on commit 54854c9

Please sign in to comment.