From 03ae83be233278a243e5f0cc0d685671cad69959 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Tue, 5 Nov 2024 15:20:23 +0000 Subject: [PATCH] Add helpers for external functions --- doc/index.rst | 7 ++ doc/plan_stubs/external_code.md | 66 +++++++++++++++++ src/ibex_bluesky_core/plan_stubs/__init__.py | 41 +++++++++++ .../{run_engine.py => run_engine/__init__.py} | 4 ++ .../run_engine/_msg_handlers.py | 72 +++++++++++++++++++ tests/test_plan_stubs.py | 53 ++++++++++++++ 6 files changed, 243 insertions(+) create mode 100644 doc/plan_stubs/external_code.md rename src/ibex_bluesky_core/{run_engine.py => run_engine/__init__.py} (94%) create mode 100644 src/ibex_bluesky_core/run_engine/_msg_handlers.py create mode 100644 tests/test_plan_stubs.py diff --git a/doc/index.rst b/doc/index.rst index d43636b..5f6b5d5 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -99,6 +99,13 @@ Reference documentation fitting/* +.. toctree:: + :maxdepth: 2 + :caption: Plan stubs + :glob: + + plan_stubs/* + .. toctree:: :maxdepth: 2 :caption: Preprocessors diff --git a/doc/plan_stubs/external_code.md b/doc/plan_stubs/external_code.md new file mode 100644 index 0000000..c6e4164 --- /dev/null +++ b/doc/plan_stubs/external_code.md @@ -0,0 +1,66 @@ +# `call_sync` (calling external code) + +All interaction with the "outside world" should be via bluesky messages, and **not** directly called from +within a plan. For example, the following is **bad**: + +```python +import bluesky.plan_stubs as bps +from genie_python import genie as g + +def bad_plan(): + yield from bps.open_run() + g.cset("foo", 123) # This is bad - do not do this + yield from bps.close_run() +``` + +```{danger} +External I/O - including most `genie_python` or `inst` functions - should never be done directly in a plan, +as it will break: +- Rewindability (for example, the ability to interrupt a scan and then later seamlessly continue it) +- Simulation (the `cset` above would be executed during a simulation) +- Ability to emit documents +- Ability to use bluesky signals +- Error handling +- ... +``` + +In the above case, a good plan, which uses bluesky messages in a better way using +a bluesky-native `Block` object, would be: + +```python +import bluesky.plan_stubs as bps +from ophyd_async.plan_stubs import ensure_connected +from ibex_bluesky_core.devices.block import block_rw + +foo = block_rw(float, "foo") + +def good_plan(): + yield from ensure_connected(foo) + yield from bps.open_run() + yield from bps.mv(foo, 123) + yield from bps.close_run() +``` + +However, if the functionality you want to use is not yet available in bluesky, a fallback option is +available using a `call_sync` wrapper: + +```python +import bluesky.plan_stubs as bps +from ibex_bluesky_core.plan_stubs import call_sync +from genie_python import genie as g + +def good_plan(): + yield from bps.open_run() + yield from call_sync(g.some_clever_function, 123, keyword_argument=456) + yield from bps.checkpoint() + yield from bps.close_run() +``` + +```{note} +`bps.checkpoint()` above instructs bluesky that this is a safe point from which to resume a plan. +`call_sync` always clears an active checkpoint, as the code it runs may have arbitrary external +side effects. + +If a plan is interrupted with no checkpoint active, it cannot be resumed later (it effectively forces +the plan to abort rather than pause). +``` diff --git a/src/ibex_bluesky_core/plan_stubs/__init__.py b/src/ibex_bluesky_core/plan_stubs/__init__.py index 6f7405f..1c15c61 100644 --- a/src/ibex_bluesky_core/plan_stubs/__init__.py +++ b/src/ibex_bluesky_core/plan_stubs/__init__.py @@ -1 +1,42 @@ """Core plan stubs.""" + +from typing import Callable, Generator, ParamSpec, TypeVar + +import bluesky.plan_stubs as bps +from bluesky.utils import Msg + +P = ParamSpec("P") +T = TypeVar("T") + + +CALL_SYNC_MSG_KEY = "ibex_bluesky_core_call_sync" + + +def call_sync( + func: Callable[P, T], *args: P.args, **kwargs: P.kwargs +) -> Generator[Msg, None, None]: + """Call a synchronous user function in a plan, and returns the result of that call. + + Attempts to guard against the most common pitfalls of naive implementations, for example: + - Blocking the whole event loop + - Breaking keyboard interrupt handling + + It does not necessarily guard against all possible cases, and as such it is *recommended* to + use native bluesky functionality wherever possible in preference to this. + + The wrapped function will be run in a different thread. + + This will clear any active checkpoints before running the external code, because + in general the external code is not safe to re-run later once it has started (e.g. it may have + done relative sets, or may have started some external process). This means that if a plan is + interrupted at any point between a call_sync and the next checkpoint, the plan cannot be + resumed. + + Args: + func: A callable to run. + args: Arbitrary arguments to be passed to the wrapped function + kwargs: Arbitrary keyword arguments to be passed to the wrapped function + + """ + yield from bps.clear_checkpoint() + return (yield Msg(CALL_SYNC_MSG_KEY, func, *args, **kwargs)) diff --git a/src/ibex_bluesky_core/run_engine.py b/src/ibex_bluesky_core/run_engine/__init__.py similarity index 94% rename from src/ibex_bluesky_core/run_engine.py rename to src/ibex_bluesky_core/run_engine/__init__.py index 06ac056..a0a64e8 100644 --- a/src/ibex_bluesky_core/run_engine.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -14,7 +14,9 @@ __all__ = ["get_run_engine"] +from ibex_bluesky_core.plan_stubs import CALL_SYNC_MSG_KEY from ibex_bluesky_core.preprocessors import add_rb_number_processor +from ibex_bluesky_core.run_engine._msg_handlers import call_sync_handler class _DuringTask(DuringTask): @@ -89,6 +91,8 @@ def get_run_engine() -> RunEngine: log_callback = DocLoggingCallback() RE.subscribe(log_callback) + RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler) + RE.preprocessors.append(functools.partial(bpp.plan_mutator, msg_proc=add_rb_number_processor)) return RE diff --git a/src/ibex_bluesky_core/run_engine/_msg_handlers.py b/src/ibex_bluesky_core/run_engine/_msg_handlers.py new file mode 100644 index 0000000..88af3a5 --- /dev/null +++ b/src/ibex_bluesky_core/run_engine/_msg_handlers.py @@ -0,0 +1,72 @@ +"""Private helper module for run engine message handlers. + +Not intended for user use. +""" + +import ctypes +import threading +from asyncio import CancelledError, Event, get_running_loop +from typing import Any + +from bluesky.utils import Msg + + +class _ExternalFunctionInterrupted(BaseException): + """An external sync function running in a worker thread is being interruted.""" + + +async def call_sync_handler(msg: Msg) -> Any: # noqa: ANN401 + """Handle ibex_bluesky_core.plan_stubs.call_sync.""" + func = msg.obj + ret = None + done_event = Event() + loop = get_running_loop() + success = False + + def _wrapper() -> Any: # noqa: ANN401 + try: + nonlocal ret, success + ret = func(*msg.args, **msg.kwargs) + success = True + except _ExternalFunctionInterrupted: + pass # Suppress stack traces from our special interruption exception. + finally: + loop.call_soon_threadsafe(done_event.set) + + worker_thread = threading.Thread(target=_wrapper, name="external_function_worker") + worker_thread.start() + + try: + # Wait until done event is set. + # Ensure we're not blocking the whole event loop while waiting. + await done_event.wait() + except (KeyboardInterrupt, CancelledError): + # We got interrupted while the external function was running. + # + # A few options: + # - We could hang until the external function returns (not ideal, in principle it could + # be a rather long-running external function, so would prevent the interrupt from working) + # - Interrupt but don't actually kill the thread, this leads to a misleading result where + # a user gets the shell back but the task is still running in the background + # - Hack around with ctypes to inject an injection into the thread running the external + # function. This is generally frowned upon as a bad idea (tm), but may be the "least bad" + # solution to running potentially-blocking user code within a plan. + # + # A few notes on PyThreadState_SetAsyncExc: + # - It is used by bluesky here, for a similar case: + # https://github.com/bluesky/bluesky/blob/v1.13.0a4/src/bluesky/run_engine.py#L1074 + # - The documentation for this function includes the line + # "To prevent naive misuse, you must write your own C extension to call this." + # (Like bluesky, I have cheated by using ctypes instead of C) + # - Choosing to raise KeyboardInterrupt rather than a custom exception, as that is the + # exception type which is most likely to have some kind + # + thread_id = worker_thread.ident + assert thread_id is not None, "Can't find worker thread to kill it" + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_ulong(thread_id), ctypes.py_object(_ExternalFunctionInterrupted) + ) + raise + if not success: + raise IOError("External function did not complete successfully.") + return ret diff --git a/tests/test_plan_stubs.py b/tests/test_plan_stubs.py new file mode 100644 index 0000000..0895b7d --- /dev/null +++ b/tests/test_plan_stubs.py @@ -0,0 +1,53 @@ +# pyright: reportMissingParameterType=false + +import time +from asyncio import CancelledError +from unittest.mock import patch + +import pytest +from bluesky.utils import Msg + +from ibex_bluesky_core.plan_stubs import call_sync +from ibex_bluesky_core.run_engine._msg_handlers import call_sync_handler + + +def test_call_sync_returns_result(RE): + def f(): + return 123 + + result = RE(call_sync(f)) + + assert result.plan_result == 123 + + +@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning") +def test_call_sync_throws_exception(RE): + def f(): + raise ValueError("broke it") + + with pytest.raises(IOError, match="did not complete successfully"): + RE(call_sync(f)) + + +@pytest.mark.parametrize("err", [(KeyboardInterrupt,), (CancelledError,)]) +async def test_call_sync_handler_blocking_function(err: type[BaseException]): + def f(): + while True: + pass + + with patch("ibex_bluesky_core.run_engine._msg_handlers.Event") as evt: + evt.return_value.wait.side_effect = err + msg = Msg("", f) + with pytest.raises(err): + await call_sync_handler(msg) + + +def test_call_sync_waits_for_completion(RE): + def f(): + time.sleep(1) + + start = time.monotonic() + RE(call_sync(f)) + end = time.monotonic() + + assert end - start == pytest.approx(1, abs=0.2)