Skip to content

Commit

Permalink
Add helpers for external functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Willemsen committed Nov 5, 2024
1 parent 541ec39 commit 03ae83b
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 0 deletions.
7 changes: 7 additions & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ Reference documentation

fitting/*

.. toctree::
:maxdepth: 2
:caption: Plan stubs
:glob:

plan_stubs/*

.. toctree::
:maxdepth: 2
:caption: Preprocessors
Expand Down
66 changes: 66 additions & 0 deletions doc/plan_stubs/external_code.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# `call_sync` (calling external code)

All interaction with the "outside world" should be via bluesky messages, and **not** directly called from
within a plan. For example, the following is **bad**:

```python
import bluesky.plan_stubs as bps
from genie_python import genie as g

def bad_plan():
yield from bps.open_run()
g.cset("foo", 123) # This is bad - do not do this
yield from bps.close_run()
```

```{danger}
External I/O - including most `genie_python` or `inst` functions - should never be done directly in a plan,
as it will break:
- Rewindability (for example, the ability to interrupt a scan and then later seamlessly continue it)
- Simulation (the `cset` above would be executed during a simulation)
- Ability to emit documents
- Ability to use bluesky signals
- Error handling
- ...
```

In the above case, a good plan, which uses bluesky messages in a better way using
a bluesky-native `Block` object, would be:

```python
import bluesky.plan_stubs as bps
from ophyd_async.plan_stubs import ensure_connected
from ibex_bluesky_core.devices.block import block_rw

foo = block_rw(float, "foo")

def good_plan():
yield from ensure_connected(foo)
yield from bps.open_run()
yield from bps.mv(foo, 123)
yield from bps.close_run()
```

However, if the functionality you want to use is not yet available in bluesky, a fallback option is
available using a `call_sync` wrapper:

```python
import bluesky.plan_stubs as bps
from ibex_bluesky_core.plan_stubs import call_sync
from genie_python import genie as g

def good_plan():
yield from bps.open_run()
yield from call_sync(g.some_clever_function, 123, keyword_argument=456)
yield from bps.checkpoint()
yield from bps.close_run()
```

```{note}
`bps.checkpoint()` above instructs bluesky that this is a safe point from which to resume a plan.
`call_sync` always clears an active checkpoint, as the code it runs may have arbitrary external
side effects.
If a plan is interrupted with no checkpoint active, it cannot be resumed later (it effectively forces
the plan to abort rather than pause).
```
41 changes: 41 additions & 0 deletions src/ibex_bluesky_core/plan_stubs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,42 @@
"""Core plan stubs."""

from typing import Callable, Generator, ParamSpec, TypeVar

import bluesky.plan_stubs as bps
from bluesky.utils import Msg

P = ParamSpec("P")
T = TypeVar("T")


CALL_SYNC_MSG_KEY = "ibex_bluesky_core_call_sync"


def call_sync(
func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> Generator[Msg, None, None]:
"""Call a synchronous user function in a plan, and returns the result of that call.
Attempts to guard against the most common pitfalls of naive implementations, for example:
- Blocking the whole event loop
- Breaking keyboard interrupt handling
It does not necessarily guard against all possible cases, and as such it is *recommended* to
use native bluesky functionality wherever possible in preference to this.
The wrapped function will be run in a different thread.
This will clear any active checkpoints before running the external code, because
in general the external code is not safe to re-run later once it has started (e.g. it may have
done relative sets, or may have started some external process). This means that if a plan is
interrupted at any point between a call_sync and the next checkpoint, the plan cannot be
resumed.
Args:
func: A callable to run.
args: Arbitrary arguments to be passed to the wrapped function
kwargs: Arbitrary keyword arguments to be passed to the wrapped function
"""
yield from bps.clear_checkpoint()
return (yield Msg(CALL_SYNC_MSG_KEY, func, *args, **kwargs))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

__all__ = ["get_run_engine"]

from ibex_bluesky_core.plan_stubs import CALL_SYNC_MSG_KEY
from ibex_bluesky_core.preprocessors import add_rb_number_processor
from ibex_bluesky_core.run_engine._msg_handlers import call_sync_handler


class _DuringTask(DuringTask):
Expand Down Expand Up @@ -89,6 +91,8 @@ def get_run_engine() -> RunEngine:
log_callback = DocLoggingCallback()
RE.subscribe(log_callback)

RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler)

RE.preprocessors.append(functools.partial(bpp.plan_mutator, msg_proc=add_rb_number_processor))

return RE
72 changes: 72 additions & 0 deletions src/ibex_bluesky_core/run_engine/_msg_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Private helper module for run engine message handlers.
Not intended for user use.
"""

import ctypes
import threading
from asyncio import CancelledError, Event, get_running_loop
from typing import Any

from bluesky.utils import Msg


class _ExternalFunctionInterrupted(BaseException):
"""An external sync function running in a worker thread is being interruted."""


async def call_sync_handler(msg: Msg) -> Any: # noqa: ANN401
"""Handle ibex_bluesky_core.plan_stubs.call_sync."""
func = msg.obj
ret = None
done_event = Event()
loop = get_running_loop()
success = False

def _wrapper() -> Any: # noqa: ANN401
try:
nonlocal ret, success
ret = func(*msg.args, **msg.kwargs)
success = True
except _ExternalFunctionInterrupted:
pass # Suppress stack traces from our special interruption exception.
finally:
loop.call_soon_threadsafe(done_event.set)

worker_thread = threading.Thread(target=_wrapper, name="external_function_worker")
worker_thread.start()

try:
# Wait until done event is set.
# Ensure we're not blocking the whole event loop while waiting.
await done_event.wait()
except (KeyboardInterrupt, CancelledError):
# We got interrupted while the external function was running.
#
# A few options:
# - We could hang until the external function returns (not ideal, in principle it could
# be a rather long-running external function, so would prevent the interrupt from working)
# - Interrupt but don't actually kill the thread, this leads to a misleading result where
# a user gets the shell back but the task is still running in the background
# - Hack around with ctypes to inject an injection into the thread running the external
# function. This is generally frowned upon as a bad idea (tm), but may be the "least bad"
# solution to running potentially-blocking user code within a plan.
#
# A few notes on PyThreadState_SetAsyncExc:
# - It is used by bluesky here, for a similar case:
# https://github.com/bluesky/bluesky/blob/v1.13.0a4/src/bluesky/run_engine.py#L1074
# - The documentation for this function includes the line
# "To prevent naive misuse, you must write your own C extension to call this."
# (Like bluesky, I have cheated by using ctypes instead of C)
# - Choosing to raise KeyboardInterrupt rather than a custom exception, as that is the
# exception type which is most likely to have some kind
#
thread_id = worker_thread.ident
assert thread_id is not None, "Can't find worker thread to kill it"
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_ulong(thread_id), ctypes.py_object(_ExternalFunctionInterrupted)
)
raise
if not success:
raise IOError("External function did not complete successfully.")
return ret
53 changes: 53 additions & 0 deletions tests/test_plan_stubs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# pyright: reportMissingParameterType=false

import time
from asyncio import CancelledError
from unittest.mock import patch

import pytest
from bluesky.utils import Msg

from ibex_bluesky_core.plan_stubs import call_sync
from ibex_bluesky_core.run_engine._msg_handlers import call_sync_handler


def test_call_sync_returns_result(RE):
def f():
return 123

result = RE(call_sync(f))

assert result.plan_result == 123


@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_call_sync_throws_exception(RE):
def f():
raise ValueError("broke it")

with pytest.raises(IOError, match="did not complete successfully"):
RE(call_sync(f))


@pytest.mark.parametrize("err", [(KeyboardInterrupt,), (CancelledError,)])
async def test_call_sync_handler_blocking_function(err: type[BaseException]):
def f():
while True:
pass

with patch("ibex_bluesky_core.run_engine._msg_handlers.Event") as evt:
evt.return_value.wait.side_effect = err
msg = Msg("", f)
with pytest.raises(err):
await call_sync_handler(msg)


def test_call_sync_waits_for_completion(RE):
def f():
time.sleep(1)

start = time.monotonic()
RE(call_sync(f))
end = time.monotonic()

assert end - start == pytest.approx(1, abs=0.2)

0 comments on commit 03ae83b

Please sign in to comment.